Flink应用函数onWindowsWindow
我正在做一个Flink项目。该项目的主要思想是读取JSON(网络日志)的数据流,将它们关联起来,并生成一个新的JSON,它是不同JSON信息的组合。Flink应用函数onWindowsWindow
此时,我可以读取JSON,生成一个KeyedStream(基于生成日志的机器),然后生成一个5秒的窗口流。
我想要执行的下一步是将apply函数应用到窗口并合并每个JSON的信息。我对如何去做有点困惑。
我现在有该代码是以下各项之一:
DataStream<Tuple2<String,JSONObject>> MetaAlert = events .flatMap(new JSONParser())
.keyBy(0)
.timeWindow(Time.seconds(5))
.apply(new generateMetaAlert());
public static class generateMetaAlert implements WindowFunction<Tuple2<String,JSONObject>, Tuple2<String,JSONObject>, String, Window> {
@Override
public void apply(String arg0, Window arg1, Iterable<Tuple2<String, JSONObject>> arg2,
Collector<Tuple2<String, JSONObject>> arg3) throws Exception {
}
的。适用(新generateMetaAlert())部分与下一个错误抱怨:
的方法,应用(窗函数,R,元组,TimeWindow>)不适用于参数(MetaAlertGenerator.generateMetaAlert)
其他任何代码结构提议都不同于我编写的代码吗?
预先感谢您的帮助
回答:
当您将keyBy
功能(不使用匿名类)在您的自定义WindowFunction
(第三场)键的类型应该是Tuple
因为编译器不能确定你的钥匙的类型。此代码编译没有任何错误(考虑到我想以填补空代码空白):
public class Test { public Test() {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
DataStream<String> events = env.readTextFile("datastream.log");
DataStream<Tuple2<String, JSONObject>> MetaAlert
= events
.flatMap(new JSONParser())
.keyBy(0)
.timeWindow(Time.seconds(5))
.apply(new GenerateMetaAlert());
}
public class JSONObject {
}
public class JSONParser implements FlatMapFunction<String, Tuple2<String, JSONObject>> {
@Override
public void flatMap(String s, Collector<Tuple2<String, JSONObject>> collector) throws Exception {
}
}
public class GenerateMetaAlert implements WindowFunction<Tuple2<String, JSONObject>, Tuple2<String, JSONObject>, Tuple, TimeWindow> {
@Override
public void apply(Tuple key, TimeWindow timeWindow, Iterable<Tuple2<String, JSONObject>> iterable, Collector<Tuple2<String, JSONObject>> collector) throws Exception {
}
}
}
但最直接的方法是使用匿名类,所以你可以保持String
类型:
DataStream<Tuple2<String, JSONObject>> MetaAlert = events
.flatMap(new JSONParser())
.keyBy(0)
.timeWindow(Time.seconds(5))
.apply(new WindowFunction<Tuple2<String, JSONObject>, Tuple2<String, JSONObject>, Tuple, TimeWindow>() {
@Override
public void apply(Tuple tuple, TimeWindow timeWindow, Iterable<Tuple2<String, JSONObject>> iterable, Collector<Tuple2<String, JSONObject>> collector) throws Exception {
// Your code here
}
});
最后,如果你想保留类,但你也想保持你的类型的关键,因为它是可以实现一个KeySelector
:
public class Test { public Test() {
DataStream<Tuple2<String, JSONObject>> MetaAlert
= events
.flatMap(new JSONParser())
.keyBy(new KeySelector<Tuple2<String,JSONObject>, String>() {
@Override
public String getKey(Tuple2<String, JSONObject> json) throws Exception {
return json.f0;
}
})
.timeWindow(Time.seconds(5))
.apply(new GenerateMetaAlert());
}
public class GenerateMetaAlert implements WindowFunction<Tuple2<String, JSONObject>, Tuple2<String, JSONObject>, String, TimeWindow> {
@Override
public void apply(String key, TimeWindow timeWindow, Iterable<Tuple2<String, JSONObject>> iterable, Collector<Tuple2<String, JSONObject>> collector) throws Exception {
}
}
}
以上是 Flink应用函数onWindowsWindow 的全部内容, 来源链接: utcz.com/qa/261723.html