使用Flink和基于事件时间的流计算平均值
我想在基于历史事件的流中计算Flink中基于窗口的平均值(或由我定义的任何其他函数),因此流必须是事件时间(不处理基于时间):使用Flink和基于事件时间的流计算平均值
val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
我已经找到了如何在摄入添加时间戳:
ctx.collectWithTimestamp(Datapoint(instrument, bid, ask), time.getMillis)
但是,当我做计算(一个应用函数),它不工作时我只是按照我没有使用EventTime的方式进行操作。我已经读了一些关于我必须设置的水印:
val avg = stream .keyBy("instrument")
.timeWindow(Time.seconds(10))
.apply((key: Tuple, window: TimeWindow, values: Iterable[Datapoint], out: Collector[Datapoint])=>{
val avg = values.map(_.val).sum/values.size
val dp = Datapoint(key.getField[String](0), avg)
out.collect(dp)
})
avg.print()
env.execute()
有人有一个简单的Scala例子吗?
问候,
安德烈亚斯
回答:
水印是一种有效地与早期的时间戳的所有事件都(可能)已经抵达断言时间戳。基于事件时间的Windows依赖水印来知道窗口何时完成。到目前为止,最常见的水印策略是假定事件以一定的有限延迟到达。
如果要发射的数据源水印(服用时),见Source Functions with Timestamps and Watermarks,但它是那样简单
ctx.emitWatermark(new Watermark(datapoint.getWatermarkTime))
如果,另一方面,要解决这个问题之外来源,见Timestamp Assigners/Watermark Generators和Assigners allowing a fixed amount of lateness。你可以简单地做这样的事情:
stream .assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor[Datapoint](Time.seconds(10))(_.getTimestamp))
.keyBy("instrument")
...
我链接到的文档有更详细的例子在斯卡拉。
以上是 使用Flink和基于事件时间的流计算平均值 的全部内容, 来源链接: utcz.com/qa/259960.html