使用Flink和基于事件时间的流计算平均值

我想在基于历史事件的流中计算Flink中基于窗口的平均值(或由我定义的任何其他函数),因此流必须是事件时间(不处理基于时间):使用Flink和基于事件时间的流计算平均值

val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment 

env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)

我已经找到了如何在摄入添加时间戳

ctx.collectWithTimestamp(Datapoint(instrument, bid, ask), time.getMillis) 

但是,当我做计算(一个应用函数),它不工作时我只是按照我没有使用EventTime的方式进行操作。我已经读了一些关于我必须设置的水印:

val avg = stream 

.keyBy("instrument")

.timeWindow(Time.seconds(10))

.apply((key: Tuple, window: TimeWindow, values: Iterable[Datapoint], out: Collector[Datapoint])=>{

val avg = values.map(_.val).sum/values.size

val dp = Datapoint(key.getField[String](0), avg)

out.collect(dp)

})

avg.print()

env.execute()

有人有一个简单的Scala例子吗?

问候,
安德烈亚斯

回答:

水印是一种有效地与早期的时间戳的所有事件都(可能)已经抵达断言时间戳。基于事件时间的Windows依赖水印来知道窗口何时完成。到目前为止,最常见的水印策略是假定事件以一定的有限延迟到达。

如果要发射的数据源水印(服用时),见Source Functions with Timestamps and Watermarks,但它是那样简单

ctx.emitWatermark(new Watermark(datapoint.getWatermarkTime)) 

如果,另一方面,要解决这个问题之外来源,见Timestamp Assigners/Watermark Generators和Assigners allowing a fixed amount of lateness。你可以简单地做这样的事情:

stream 

.assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor[Datapoint](Time.seconds(10))(_.getTimestamp))

.keyBy("instrument")

...

我链接到的文档有更详细的例子在斯卡拉。

以上是 使用Flink和基于事件时间的流计算平均值 的全部内容, 来源链接: utcz.com/qa/259960.html

回到顶部