apache-spark PairDStreamFunctions.updateStateByKey

示例

updateState按键可以用于DStream基于即将到来的数据创建有状态。它需要一个功能:

object UpdateStateFunctions {

  def updateState(current: Seq[Double], previous: Option[StatCounter]) = {

    previous.map(s => s.merge(current)).orElse(Some(StatCounter(current)))

  }

}

其中采用一系列current值,即Option先前状态的,并返回Option更新状态的。全部放在一起:

import org.apache.spark._

import org.apache.spark.streaming.dstream.DStream

import scala.collection.mutable.Queue

import org.apache.spark.util.StatCounter

import org.apache.spark.streaming._

object UpdateStateByKeyApp {

  def main(args: Array[String]) {

    val sc = new SparkContext("local", "updateStateByKey", new SparkConf())

    val ssc = new StreamingContext(sc, Seconds(10))

    ssc.checkpoint("/tmp/chk")

    val queue = Queue(

      sc.parallelize(Seq(("foo", 5.0), ("bar", 1.0))),

      sc.parallelize(Seq(("foo", 1.0), ("foo", 99.0))),

      sc.parallelize(Seq(("bar", 22.0), ("foo", 1.0))),

      sc.emptyRDD[(String, Double)],

      sc.emptyRDD[(String, Double)],

      sc.emptyRDD[(String, Double)],

      sc.parallelize(Seq(("foo", 1.0), ("bar", 1.0)))

    )

    val inputStream: DStream[(String, Double)] = ssc.queueStream(queue)

    inputStream.updateStateByKey(UpdateStateFunctions.updateState _).print()

    ssc.start()

    ssc.awaitTermination()

    ssc.stop()

  }

}

           

以上是 apache-spark PairDStreamFunctions.updateStateByKey 的全部内容, 来源链接: utcz.com/z/315764.html

回到顶部