DataFrame到RDD [(String,String)]转换
有人可以帮忙吗?
背景
(也欢迎一个更好的解决方案):我有一个Kafka流,经过一些步骤后,该流变成了2列数据帧。我想将其放入Redis缓存中,第一列作为键,第二列作为值。
更具体地说 ,输入的类型是:lastContacts: org.apache.spark.sql.DataFrame = [serialNumber:
string, lastModified: bigint]。我尝试放入Redis,如下所示:
sc.toRedisKV(lastContacts)(redisConfig)
错误消息如下所示:
notebook:20: error: type mismatch; found : org.apache.spark.sql.DataFrame
(which expands to) org.apache.spark.sql.Dataset[org.apache.spark.sql.Row]
required: org.apache.spark.rdd.RDD[(String, String)]
sc.toRedisKV(lastContacts)(redisConfig)
我已经玩过一些想法(例如function .rdd
),但是没有一个帮助。
回答:
如果要将行映射到其他RDD元素,可以使用df.map(row => …)将数据帧转换为RDD。
例如:
val df = Seq(("table1",432), ("table2",567),
("table3",987),
("table1",789)).
toDF("tablename", "Code").toDF()
df.show()
+---------+----+
|tablename|Code|
+---------+----+
| table1| 432|
| table2| 567|
| table3| 987|
| table1| 789|
+---------+----+
val rddDf = df.map(r => (r(0), r(1))).rdd // Type:RDD[(Any,Any)]
OR
val rdd = df.map(r => (r(0).toString, r(1).toString)).rdd //Type: RDD[(String,String)]
有关
请参阅https://community.hortonworks.com/questions/106500/error-in-spark-
streaming-kafka-integration-structu.html
您需要等待使用查询终止查询。 防止查询活动时退出进程。
以上是 DataFrame到RDD [(String,String)]转换 的全部内容, 来源链接: utcz.com/qa/414172.html