将自定义Java对象发送到Kafka主题

我有我的自定义Java对象,希望利用JVM的内置序列化将其发送到Kafka主题,但是序列化失败并出现以下错误

org.apache.kafka.common.errors.SerializationException:无法将com.spring.kafka.Payload类的值转换为value.serializer中指定的org.apache.kafka.common.serialization.ByteArraySerializer类。

public class Payload implements Serializable {

private static final long serialVersionUID = 123L;

private String name="vinod";

private int anInt = 5;

private Double aDouble = new Double("5.0");

public String getName() {

return name;

}

public void setName(String name) {

this.name = name;

}

public int getAnInt() {

return anInt;

}

public void setAnInt(int anInt) {

this.anInt = anInt;

}

public Double getaDouble() {

return aDouble;

}

public void setaDouble(Double aDouble) {

this.aDouble = aDouble;

}

}

在创建生产者的过程中,我设置了以下属性

<entry key="key.serializer"

value="org.apache.kafka.common.serialization.ByteArraySerializer" />

<entry key="value.serializer"

value="org.apache.kafka.common.serialization.ByteArraySerializer" />

我的发送调用如下

kafkaProducer.send(new ProducerRecord<String, Payload>("test", new Payload()));

通过生产者向kafka主题发送自定义Java对象的正确方法是什么?

回答:

我们有以下两个选项

1)如果我们打算将自定义Java对象发送给生产者,则需要创建一个实现

的序列化器,并在创建生产者期间传递该Serializer类

下面的代码参考

public class PayloadSerializer implements org.apache.kafka.common.serialization.Serializer {

public void configure(Map map, boolean b) {

}

public byte[] serialize(String s, Object o) {

try {

ByteArrayOutputStream baos = new ByteArrayOutputStream();

ObjectOutputStream oos = new ObjectOutputStream(baos);

oos.writeObject(o);

oos.close();

byte[] b = baos.toByteArray();

return b;

} catch (IOException e) {

return new byte[0];

}

}

public void close() {

}

}

并据此设置值序列化器

<entry key="value.serializer"

value="com.spring.kafka.PayloadSerializer" />

2)无需创建自定义序列化器类。使用现有的ByteArraySerializer,但在发送过程中遵循该过程

Java对象->字符串(最好是JSON表示形式,而不是toString)-> byteArray

以上是 将自定义Java对象发送到Kafka主题 的全部内容, 来源链接: utcz.com/qa/424120.html

回到顶部