Flink键控流键为空

我试图在Flink中的KeyedStream上执行映射操作:

stream.map(new JsonToMessageObjectMapper())

.keyBy("keyfield")

.map(new MessageProcessorStateful())

JsonToObjectMapper运算符的输出是 类的POJO,它具有String字段“

”。然后,将流键入此字段。

MessageProcessorStateful是一个RichMapFunction,如下所示:

public class MessageAdProcessorStateful extends RichMapFunction<MessageObject, Tuple2<String, String>> {

private transient MapState<String, Tuple2<Tuple3<String, String, String>, Tuple2<Double, Long>>> state;

...

@Override

public void open(Configuration config) throws Exception {

MapStateDescriptor<String, Tuple2<Tuple3<String, String, String>, Tuple2<Double, Long>>> descriptor =

new MapStateDescriptor<>(

"state", // the state name

TypeInformation.of(new TypeHint<String>() {}),

TypeInformation.of(new TypeHint<Tuple2<Tuple3<String, String, String>, Tuple2<Double, Long>>>() {}) ); // type information

state = getRuntimeContext().getMapState(descriptor);

state.put(...); // Insert a key, value here. Exception here!

}

}

该代码引发NullPointer异常:

Caused by: java.lang.NullPointerException: No key set. This method should not be called outside of a keyed context.

at org.apache.flink.util.Preconditions.checkNotNull(Preconditions.java:75)

at org.apache.flink.runtime.state.heap.CopyOnWriteStateTable.checkKeyNamespacePreconditions(CopyOnWriteStateTable.java:528)

at org.apache.flink.runtime.state.heap.CopyOnWriteStateTable.computeHashForOperationAndDoIncrementalRehash(CopyOnWriteStateTable.java:722)

at org.apache.flink.runtime.state.heap.CopyOnWriteStateTable.get(CopyOnWriteStateTable.java:265)

at org.apache.flink.runtime.state.heap.CopyOnWriteStateTable.get(CopyOnWriteStateTable.java:306)

at org.apache.flink.runtime.state.heap.HeapMapState.put(HeapMapState.java:75)

at org.apache.flink.runtime.state.UserFacingMapState.put(UserFacingMapState.java:52)

at org.myorg.quickstart.MessageStreamProcessor$MessageAdProcessorStateful.open(MessageStreamProcessor.java:226)

at org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:36)

at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:102)

at org.apache.flink.streaming.runtime.tasks.StreamTask.openAllOperators(StreamTask.java:393)

at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:254)

at org.apache.flink.runtime.taskmanager.Task.run(Task.java:718)

at java.lang.Thread.run(Thread.java:748)

尽管我已经验证了’keyfield’始终是有效的字符串,但KeyedStream之一的keyedState中的键似乎为null。根据Flink文档,休息似乎是正确的。知道发生了什么吗?

回答:

问题是您尝试访问方法中的键控状态open()

键控状态为每个键维护一个状态实例。在您的示例中,您正在使用MapState。因此MapState,每个密钥都有一个实例。访问状态时,您将始终获得与当前处理记录的键对应的状态实例。在一个MapFunction(如您的示例中)这将是传递给该map()方法的记录。

由于open()未与记录一起调用,因此当前键open()null,因此无法访问键控状态。

以上是 Flink键控流键为空 的全部内容, 来源链接: utcz.com/qa/401966.html

回到顶部