星火群组rdd由配对RDD上的钥匙和群组组成,并从每个群组中挑选最新的

Spark和Scala的新手。试图达到以下。我的消息看起来像以下(钥匙,ID,版本,dataObject时)星火群组rdd由配对RDD上的钥匙和群组组成,并从每个群组中挑选最新的

val transformedRDD = processedMessages.flatMap(message => { 

message.isProcessed match {

case true => Some(message.key, message.id, message.version, message)

case false => None

}

}).groupByKey

我想组由ID对每个消息并获得最新版本的消息,然后groupbykey,然后调用它看起来像下面

预定方法

Ingest(key,RDD[dataObject]) 

回答:

在大多数情况下,您应该避免groupByKey,因为它可能导致重新洗牌,这可能非常昂贵。在您的使用案例中,您不需要groupByKey,而是可以使用reduceByKey

val transformedRDD = processedMessages 

// notice that we will have Rdd[(String, Message)] or PairRdd after this flatMap

.flatMap(message => message.isProcessed match {

case true => Some((message.id, message))

case false => None

})

// after this reduction we will have latest message for each id

.reduceByKey((m1: Message, m2: Message) => m1.version >= m2.version match {

case true => m1

case false => m2

})

// now we just want to keep message

.map({ case (id, message) => message })

以上是 星火群组rdd由配对RDD上的钥匙和群组组成,并从每个群组中挑选最新的 的全部内容, 来源链接: utcz.com/qa/261275.html

回到顶部