如何为Kafka 2.2实现FlinkKafkaProducer序列化程序

我一直在努力更新从Kafka读取然后写入Kafka的Flink处理器(Flink 1.9版)。我们已经将此处理器编写为可以朝着Kafka

0.10.2集群运行,现在我们已经部署了一个运行2.2版的新Kafka集群。因此,我着手更新处理器以使用最新的FlinkKafkaConsumer和FlinkKafkaProducer(由Flink文档建议)。但是我遇到了卡夫卡制片人的一些问题。我无法使用不赞成使用的构造函数来将其序列化数据(不足为奇),并且我无法在线找到有关如何实现序列化程序的任何实现或示例(所有示例都使用较旧的Kafka连接器)

当前的实现(对于Kafka 0.10.2)如下

FlinkKafkaProducer010<String> eventBatchFlinkKafkaProducer = new FlinkKafkaProducer010<String>(

"playerSessions",

new SimpleStringSchema(),

producerProps,

(FlinkKafkaPartitioner) null

);

尝试实施以下FlinkKafkaProducer时

FlinkKafkaProducer<String> eventBatchFlinkKafkaProducer = new FlinkKafkaProducer<String>(

"playerSessions",

new SimpleStringSchema(),

producerProps,

null

);

我收到以下错误:

Exception in thread "main" java.lang.NullPointerException

at org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.<init>(FlinkKafkaProducer.java:525)

at org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.<init>(FlinkKafkaProducer.java:483)

at org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.<init>(FlinkKafkaProducer.java:357)

at com.ebs.flink.sessionprocessor.SessionProcessor.main(SessionProcessor.java:122)

而且我还无法弄清楚为什么。FlinkKafkaProducer的构造函数也已被弃用,当我尝试实现未弃用的构造函数时,我不知道如何序列化数据。以下是它的外观:

FlinkKafkaProducer<String> eventBatchFlinkKafkaProducer = new FlinkKafkaProducer<String>(

"playerSessions",

new KafkaSerializationSchema<String>() {

@Override

public ProducerRecord<byte[], byte[]> serialize(String s, @Nullable Long aLong) {

return null;

}

},

producerProps,

FlinkKafkaProducer.Semantic.EXACTLY_ONCE

);

但是我不明白如何实现KafkaSerializationSchema,并且在网上或Flink文档中都找不到此示例。

是否有人对实现此目标有任何经验,或者对为什么FlinkProducer在步骤中获得NullPointerException有任何提示?

回答:

如果您只是将String发送给Kafka:

public class ProducerStringSerializationSchema implements KafkaSerializationSchema<String>{

private String topic;

public ProducerStringSerializationSchema(String topic) {

super();

this.topic = topic;

}

@Override

public ProducerRecord<byte[], byte[]> serialize(String element, Long timestamp) {

return new ProducerRecord<byte[], byte[]>(topic, element.getBytes(StandardCharsets.UTF_8));

}

}

发送Java对象:

import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonProcessingException;

import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper;

import org.apache.flink.streaming.connectors.kafka.KafkaSerializationSchema;

import org.apache.kafka.clients.producer.ProducerRecord;

public class ObjSerializationSchema implements KafkaSerializationSchema<MyPojo>{

private String topic;

private ObjectMapper mapper;

public ObjSerializationSchema(String topic) {

super();

this.topic = topic;

}

@Override

public ProducerRecord<byte[], byte[]> serialize(MyPojo obj, Long timestamp) {

byte[] b = null;

if (mapper == null) {

mapper = new ObjectMapper();

}

try {

b= mapper.writeValueAsBytes(obj);

} catch (JsonProcessingException e) {

// TODO

}

return new ProducerRecord<byte[], byte[]>(topic, b);

}

}

在你的代码中

.addSink(new FlinkKafkaProducer<>(producerTopic, new ObjSerializationSchema(producerTopic), 

params.getProperties(), FlinkKafkaProducer.Semantic.EXACTLY_ONCE));

以上是 如何为Kafka 2.2实现FlinkKafkaProducer序列化程序 的全部内容, 来源链接: utcz.com/qa/399838.html

回到顶部