Python-使用Spark将列转置为行

我正在尝试将表的某些列转置为行。我正在使用Python和Spark 1.5.0。这是我的初始表:

+-----+-----+-----+-------+

| A |col_1|col_2|col_...|

+-----+-------------------+

| 1 | 0.0| 0.6| ... |

| 2 | 0.6| 0.7| ... |

| 3 | 0.5| 0.9| ... |

| ...| ...| ...| ... |

我想要这样的东西:

+-----+--------+-----------+

| A | col_id | col_value |

+-----+--------+-----------+

| 1 | col_1| 0.0|

| 1 | col_2| 0.6|

| ...| ...| ...|

| 2 | col_1| 0.6|

| 2 | col_2| 0.7|

| ...| ...| ...|

| 3 | col_1| 0.5|

| 3 | col_2| 0.9|

| ...| ...| ...|

有人知道我能做到吗?谢谢你的帮助。

回答:

使用基本的Spark SQL函数相对简单。

python

from pyspark.sql.functions import array, col, explode, struct, lit

df = sc.parallelize([(1, 0.0, 0.6), (1, 0.6, 0.7)]).toDF(["A", "col_1", "col_2"])

def to_long(df, by):

# Filter dtypes and split into column names and type description

cols, dtypes = zip(*((c, t) for (c, t) in df.dtypes if c not in by))

# Spark SQL supports only homogeneous columns

assert len(set(dtypes)) == 1, "All columns have to be of the same type"

# Create and explode an array of (column_name, column_value) structs

kvs = explode(array([

struct(lit(c).alias("key"), col(c).alias("val")) for c in cols

])).alias("kvs")

return df.select(by + [kvs]).select(by + ["kvs.key", "kvs.val"])

to_long(df, ["A"])

Scala:

import org.apache.spark.sql.DataFrame

import org.apache.spark.sql.functions.{array, col, explode, lit, struct}

val df = Seq((1, 0.0, 0.6), (1, 0.6, 0.7)).toDF("A", "col_1", "col_2")

def toLong(df: DataFrame, by: Seq[String]): DataFrame = {

val (cols, types) = df.dtypes.filter{ case (c, _) => !by.contains(c)}.unzip

require(types.distinct.size == 1, s"${types.distinct.toString}.length != 1")

val kvs = explode(array(

cols.map(c => struct(lit(c).alias("key"), col(c).alias("val"))): _*

))

val byExprs = by.map(col(_))

df

.select(byExprs :+ kvs.alias("_kvs"): _*)

.select(byExprs ++ Seq($"_kvs.key", $"_kvs.val"): _*)

}

toLong(df, Seq("A"))

以上是 Python-使用Spark将列转置为行 的全部内容, 来源链接: utcz.com/qa/432507.html

回到顶部