如何在kafka中创建自定义序列化程序?

序列化器很少,例如

org.apache.kafka.common.serialization.StringSerializer

org.apache.kafka.common.serialization.StringSerializer

我们如何创建自己的自定义序列化程序?

回答:

在这里,您有一个示例,将自己的序列化器/解串器用于Kafka消息值。对于Kafka消息密钥是同一回事。

我们希望将MyMessage的序列化版本作为Kafka值发送,并再次将其反序列化为使用方的MyMessage对象。

在生产者端序列化MyMessage。

您应该创建一个实现org.apache.kafka.common.serialization.Serializer的序列化器类。

serialize() 方法可以完成工作,接收对象并以字节数组形式返回序列化的版本。

public class MyValueSerializer implements Serializer<MyMessage>

{

private boolean isKey;

@Override

public void configure(Map<String, ?> configs, boolean isKey)

{

this.isKey = isKey;

}

@Override

public byte[] serialize(String topic, MyMessage message)

{

if (message == null) {

return null;

}

try {

(serialize your MyMessage object into bytes)

return bytes;

} catch (IOException | RuntimeException e) {

throw new SerializationException("Error serializing value", e);

}

}

@Override

public void close()

{

}

}

final IntegerSerializer keySerializer = new IntegerSerializer();

final MyValueSerializer myValueSerializer = new MyValueSerializer();

final KafkaProducer<Integer, MyMessage> producer = new KafkaProducer<>(props, keySerializer, myValueSerializer);

int messageNo = 1;

int kafkaKey = messageNo;

MyMessage kafkaValue = new MyMessage();

ProducerRecord producerRecord = new ProducerRecord<>(topic, kafkaKey, kafkaValue);

producer.send(producerRecord, new DemoCallBack(logTag, startTime, messageNo, strValue));

在用户端反序列化MyMessage。

您应该创建一个实现org.apache.kafka.common.serialization.Deserializer的反序列化器类。

deserialize() 方法可以完成工作,以字节数组形式接收序列化的值并返回您的对象。

public class MyValueDeserializer implements Deserializer<MyMessage>

{

private boolean isKey;

@Override

public void configure(Map<String, ?> configs, boolean isKey)

{

this.isKey = isKey;

}

@Override

public MyMessage deserialize(String s, byte[] value)

{

if (value == null) {

return null;

}

try {

(deserialize value into your MyMessage object)

MyMessage message = new MyMessage();

return message;

} catch (IOException | RuntimeException e) {

throw new SerializationException("Error deserializing value", e);

}

}

@Override

public void close()

{

}

}

然后像这样使用它:

final IntegerDeserializer keyDeserializer = new IntegerDeserializer();

final MyValueDeserializer myValueDeserializer = new MyValueDeserializer();

final KafkaConsumer<Integer, MyMessage> consumer = new KafkaConsumer<>(props, keyDeserializer, myValueDeserializer);

ConsumerRecords<Integer, MyMessage> records = consumer.poll(1000);

for (ConsumerRecord<Integer, MyMessage> record : records) {

int kafkaKey = record.key();

MyMessage kafkaValue = record.value();

...

}

以上是 如何在kafka中创建自定义序列化程序? 的全部内容, 来源链接: utcz.com/qa/408390.html

回到顶部