在Spark DataFrame中按数组值过滤

我正在使用带有Elasticsearch的Apache Spark 1.5数据帧,我尝试从包含ID列表(数组)的列中过滤ID。

例如,elasticsearch列的映射如下所示:

    {

"people":{

"properties":{

"artist":{

"properties":{

"id":{

"index":"not_analyzed",

"type":"string"

},

"name":{

"type":"string",

"index":"not_analyzed",

}

}

}

}

}

示例数据格式如下

{

"people": {

"artist": {

[

{

"id": "153",

"name": "Tom"

},

{

"id": "15389",

"name": "Cok"

}

]

}

}

},

{

"people": {

"artist": {

[

{

"id": "369",

"name": "Carl"

},

{

"id": "15389",

"name": "Cok"

},

{

"id": "698",

"name": "Sol"

}

]

}

}

}

在火花我尝试这样做:

val peopleId  = 152

val dataFrame = sqlContext.read

.format("org.elasticsearch.spark.sql")

.load("index/type")

dataFrame.filter(dataFrame("people.artist.id").contains(peopleId))

.select("people_sequence.artist.id")

我得到了包含152的所有id,例如1523,152978,但不仅id == 152

然后我尝试

dataFrame.filter(dataFrame("people.artist.id").equalTo(peopleId))

.select("people.artist.id")

我变得空虚,我明白为什么,这是因为我有很多人。artist.id

有人可以告诉我如何在有ID列表时进行过滤吗?

回答:

在Spark 1.5+中,您可以使用array_contains功能:

df.where(array_contains($"people.artist.id", "153"))

如果您使用的是较早版本,则可以尝试这样的UDF:

val containsId = udf(

(rs: Seq[Row], v: String) => rs.map(_.getAs[String]("id")).exists(_ == v))

df.where(containsId($"people.artist", lit("153")))

以上是 在Spark DataFrame中按数组值过滤 的全部内容, 来源链接: utcz.com/qa/418650.html

回到顶部