将自定义Java对象发送到Kafka主题
我有我的自定义Java对象,希望利用JVM的内置序列化将其发送到Kafka主题,但是序列化失败并出现以下错误
org.apache.kafka.common.errors.SerializationException:无法将com.spring.kafka.Payload类的值转换为value.serializer中指定的org.apache.kafka.common.serialization.ByteArraySerializer类。
public class Payload implements Serializable { private static final long serialVersionUID = 123L;
private String name="vinod";
private int anInt = 5;
private Double aDouble = new Double("5.0");
public String getName() {
return name;
}
public void setName(String name) {
this.name = name;
}
public int getAnInt() {
return anInt;
}
public void setAnInt(int anInt) {
this.anInt = anInt;
}
public Double getaDouble() {
return aDouble;
}
public void setaDouble(Double aDouble) {
this.aDouble = aDouble;
}
}
在创建生产者的过程中,我设置了以下属性
<entry key="key.serializer" value="org.apache.kafka.common.serialization.ByteArraySerializer" />
<entry key="value.serializer"
value="org.apache.kafka.common.serialization.ByteArraySerializer" />
我的发送调用如下
kafkaProducer.send(new ProducerRecord<String, Payload>("test", new Payload()));
通过生产者向kafka主题发送自定义Java对象的正确方法是什么?
回答:
我们有以下两个选项
1)如果我们打算将自定义Java对象发送给生产者,则需要创建一个实现
的序列化器,并在创建生产者期间传递该Serializer类
下面的代码参考
public class PayloadSerializer implements org.apache.kafka.common.serialization.Serializer { public void configure(Map map, boolean b) {
}
public byte[] serialize(String s, Object o) {
try {
ByteArrayOutputStream baos = new ByteArrayOutputStream();
ObjectOutputStream oos = new ObjectOutputStream(baos);
oos.writeObject(o);
oos.close();
byte[] b = baos.toByteArray();
return b;
} catch (IOException e) {
return new byte[0];
}
}
public void close() {
}
}
并据此设置值序列化器
<entry key="value.serializer" value="com.spring.kafka.PayloadSerializer" />
2)无需创建自定义序列化器类。使用现有的ByteArraySerializer,但在发送过程中遵循该过程
Java对象->字符串(最好是JSON表示形式,而不是toString)-> byteArray
以上是 将自定义Java对象发送到Kafka主题 的全部内容, 来源链接: utcz.com/qa/424120.html