DataFrame到RDD [(String,String)]转换

有人可以帮忙吗?

背景

(也欢迎一个更好的解决方案):我有一个Kafka流,经过一些步骤后,该流变成了2列数据帧。我想将其放入Redis缓存中,第一列作为键,第二列作为值。

更具体地说 ,输入的类型是:lastContacts: org.apache.spark.sql.DataFrame = [serialNumber:

string, lastModified: bigint]。我尝试放入Redis,如下所示:

sc.toRedisKV(lastContacts)(redisConfig)

错误消息如下所示:

notebook:20: error: type mismatch;

found : org.apache.spark.sql.DataFrame

(which expands to) org.apache.spark.sql.Dataset[org.apache.spark.sql.Row]

required: org.apache.spark.rdd.RDD[(String, String)]

sc.toRedisKV(lastContacts)(redisConfig)

我已经玩过一些想法(例如function .rdd),但是没有一个帮助。

回答:

如果要将行映射到其他RDD元素,可以使用df.map(row => …)将数据帧转换为RDD。

例如:

val df = Seq(("table1",432),

("table2",567),

("table3",987),

("table1",789)).

toDF("tablename", "Code").toDF()

df.show()

+---------+----+

|tablename|Code|

+---------+----+

| table1| 432|

| table2| 567|

| table3| 987|

| table1| 789|

+---------+----+

val rddDf = df.map(r => (r(0), r(1))).rdd // Type:RDD[(Any,Any)]

OR

val rdd = df.map(r => (r(0).toString, r(1).toString)).rdd //Type: RDD[(String,String)]

有关

请参阅https://community.hortonworks.com/questions/106500/error-in-spark-

streaming-kafka-integration-structu.html

您需要等待使用查询终止查询。 防止查询活动时退出进程。

以上是 DataFrame到RDD [(String,String)]转换 的全部内容, 来源链接: utcz.com/qa/414172.html

回到顶部