如何将新的Struct列添加到DataFrame

我目前正在尝试从MongoDB中提取数据库,并使用Spark来将其提取到ElasticSearch中geo_points

Mongo数据库具有纬度和经度值,但是ElasticSearch要求将它们强制转换为geo_point类型。

Spark中是否可以将latand lon列复制到arrayor 的新列struct

任何帮助表示赞赏!

回答:

我假设您从某种平面模式开始,如下所示:

root

|-- lat: double (nullable = false)

|-- long: double (nullable = false)

|-- key: string (nullable = false)

首先让我们创建示例数据:

import org.apache.spark.sql.Row

import org.apache.spark.sql.functions.{col, udf}

import org.apache.spark.sql.types._

val rdd = sc.parallelize(

Row(52.23, 21.01, "Warsaw") :: Row(42.30, 9.15, "Corte") :: Nil)

val schema = StructType(

StructField("lat", DoubleType, false) ::

StructField("long", DoubleType, false) ::

StructField("key", StringType, false) ::Nil)

val df = sqlContext.createDataFrame(rdd, schema)

一种简单的方法是使用udf和case类:

case class Location(lat: Double, long: Double)

val makeLocation = udf((lat: Double, long: Double) => Location(lat, long))

val dfRes = df.

withColumn("location", makeLocation(col("lat"), col("long"))).

drop("lat").

drop("long")

dfRes.printSchema

我们得到

root

|-- key: string (nullable = false)

|-- location: struct (nullable = true)

| |-- lat: double (nullable = false)

| |-- long: double (nullable = false)

一种困难的方法是转换数据并随后应用模式:

val rddRes = df.

map{case Row(lat, long, key) => Row(key, Row(lat, long))}

val schemaRes = StructType(

StructField("key", StringType, false) ::

StructField("location", StructType(

StructField("lat", DoubleType, false) ::

StructField("long", DoubleType, false) :: Nil

), true) :: Nil

)

sqlContext.createDataFrame(rddRes, schemaRes).show

我们得到了预期的输出

+------+-------------+

| key| location|

+------+-------------+

|Warsaw|[52.23,21.01]|

| Corte| [42.3,9.15]|

+------+-------------+

从头开始创建嵌套模式可能很繁琐,因此,如果可以的话,我建议您采用第一种方法。如果需要更复杂的结构,可以轻松扩展它:

case class Pin(location: Location)

val makePin = udf((lat: Double, long: Double) => Pin(Location(lat, long))

df.

withColumn("pin", makePin(col("lat"), col("long"))).

drop("lat").

drop("long").

printSchema

我们得到预期的输出:

root

|-- key: string (nullable = false)

|-- pin: struct (nullable = true)

| |-- location: struct (nullable = true)

| | |-- lat: double (nullable = false)

| | |-- long: double (nullable = false)

不幸的是,您无法控制nullable字段,因此如果对您的项目很重要,则必须指定架构。

最后,您可以使用struct1.4中引入的功能:

import org.apache.spark.sql.functions.struct

df.select($"key", struct($"lat", $"long").alias("location"))

以上是 如何将新的Struct列添加到DataFrame 的全部内容, 来源链接: utcz.com/qa/410910.html

回到顶部