星火群组rdd由配对RDD上的钥匙和群组组成,并从每个群组中挑选最新的
Spark和Scala的新手。试图达到以下。我的消息看起来像以下(钥匙,ID,版本,dataObject时)星火群组rdd由配对RDD上的钥匙和群组组成,并从每个群组中挑选最新的
val transformedRDD = processedMessages.flatMap(message => { message.isProcessed match {
case true => Some(message.key, message.id, message.version, message)
case false => None
}
}).groupByKey
我想组由ID对每个消息并获得最新版本的消息,然后groupbykey,然后调用它看起来像下面
预定方法Ingest(key,RDD[dataObject])
回答:
在大多数情况下,您应该避免groupByKey
,因为它可能导致重新洗牌,这可能非常昂贵。在您的使用案例中,您不需要groupByKey
,而是可以使用reduceByKey
。
val transformedRDD = processedMessages // notice that we will have Rdd[(String, Message)] or PairRdd after this flatMap
.flatMap(message => message.isProcessed match {
case true => Some((message.id, message))
case false => None
})
// after this reduction we will have latest message for each id
.reduceByKey((m1: Message, m2: Message) => m1.version >= m2.version match {
case true => m1
case false => m2
})
// now we just want to keep message
.map({ case (id, message) => message })
以上是 星火群组rdd由配对RDD上的钥匙和群组组成,并从每个群组中挑选最新的 的全部内容, 来源链接: utcz.com/qa/261275.html