使用Spring Kafka反序列化来自同一Kafka主题的不同JSON有效负载

我正在尝试反序列化来自同一Kafka主题的不同JSON负载。在这里提出的其他问题使我第一次尝试,但是我无法使其运行。

正如Gary提到的,有一些提示(JsonSerializer.ADD_TYPE_INFO_HEADERS),但是当我发送和接收这两个消息时,我都会遇到异常。

org.springframework.kafka.listener.ListenerExecutionFailedException: Listener method could not be invoked with the incoming message

Endpoint handler details:

Method [public void com.foo.message.ConsumerImpl.consumeSelf(java.lang.String,java.lang.String,java.lang.String,java.lang.String,java.util.Map<java.lang.String, java.lang.Object>,com.foo.message.KafkaMessage,org.apache.kafka.clients.consumer.ConsumerRecord<java.lang.String, java.lang.Object>)]

Bean [com.foo.message.ConsumerImpl@6df2a206]; nested exception is org.springframework.messaging.converter.MessageConversionException: Cannot handle message; nested exception is org.springframework.messaging.converter.MessageConversionException: Cannot convert from [com.foo.message.KafkaMessageWithAdditionalField] to [com.foo.message.KafkaMessage] for GenericMessage [payload=com.foo.message.KafkaMessageWithAdditionalField@4e3168f7, headers={kafka_offset=22, kafka_consumer=org.apache.kafka.clients.consumer.KafkaConsumer@c0e2fcf, kafka_timestampType=CREATE_TIME, kafka_receivedMessageKey=null, kafka_receivedPartitionId=0, kafka_receivedTopic=fromBar, kafka_receivedTimestamp=1548310583481}], failedMessage=GenericMessage [payload=com.foo.message.KafkaMessageWithAdditionalField@4e3168f7, headers={kafka_offset=22, kafka_consumer=org.apache.kafka.clients.consumer.KafkaConsumer@c0e2fcf, kafka_timestampType=CREATE_TIME, kafka_receivedMessageKey=null, kafka_receivedPartitionId=0, kafka_receivedTopic=fromBar, kafka_receivedTimestamp=1548310583481}]

at org.springframework.kafka.listener.adapter.MessagingMessageListenerAdapter.invokeHandler(MessagingMessageListenerAdapter.java:292) ~[spring-kafka-2.2.2.RELEASE.jar:2.2.2.RELEASE]

at org.springframework.kafka.listener.adapter.RecordMessagingMessageListenerAdapter.onMessage(RecordMessagingMessageListenerAdapter.java:79) ~[spring-kafka-2.2.2.RELEASE.jar:2.2.2.RELEASE]

at org.springframework.kafka.listener.adapter.RecordMessagingMessageListenerAdapter.onMessage(RecordMessagingMessageListenerAdapter.java:50) ~[spring-kafka-2.2.2.RELEASE.jar:2.2.2.RELEASE]

at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doInvokeOnMessage(KafkaMessageListenerContainer.java:1207) [spring-kafka-2.2.2.RELEASE.jar:2.2.2.RELEASE]

LoggingErrorHandler已经在ConsumerRecord中提到了一个(正确的)值。

2019-01-24 07:16:27.630 ERROR 27204 --- [ntainer#2-0-C-1] o.s.kafka.listener.LoggingErrorHandler   : Error while processing: ConsumerRecord(topic = fromBar, partition = 0, offset = 22, CreateTime = 1548310583481, serialized key size = -1, serialized value size = 196, headers = RecordHeaders(headers = [], isReadOnly = false), key = null, value = com.foo.bar.message.KafkaMessageWithAdditionalField@4e3168f7)

首先我的配置:

@EnableKafka

@Configuration

public class KafkaConsumerConfig {

@Value("${spring.kafka.bootstrap-servers}")

private String bootstrapServers;

@Bean

public ConsumerFactory<String, KafkaMessage> consumerFactoryMessage()

{

Map<String, Object> props = new HashMap<>();

props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);

props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);

props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JsonDeserializer.class);

return new DefaultKafkaConsumerFactory<>(props, new StringDeserializer(),

new JsonDeserializer<>(KafkaMessage.class));

}

@Bean

public ConcurrentKafkaListenerContainerFactory<String, KafkaMessage> kafkaListenerMessageContainerFactory()

{

ConcurrentKafkaListenerContainerFactory<String, KafkaMessage> factory = new ConcurrentKafkaListenerContainerFactory<>();

factory.setConsumerFactory(consumerFactoryMessage());

return factory;

}

@Bean

public ConsumerFactory<String, KafkaMessageWithAdditionalField> consumerFactoryMessageWithAdditionalField()

{

Map<String, Object> props = new HashMap<>();

props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);

props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);

props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JsonDeserializer.class);

return new DefaultKafkaConsumerFactory<>(props, new StringDeserializer(),

new JsonDeserializer<>(KafkaMessageWithAdditionalField.class));

}

@Bean

public ConcurrentKafkaListenerContainerFactory<String, KafkaMessageWithAdditionalField> kafkaListenerMessageWithAdditionalFieldContainerFactory()

{

ConcurrentKafkaListenerContainerFactory<String, KafkaMessageWithAdditionalField> factory = new ConcurrentKafkaListenerContainerFactory<>();

factory.setConsumerFactory(consumerFactoryMessageWithAdditionalField());

return factory;

}

}

这是听众:

    @KafkaListener(topicPartitions = @TopicPartition(partitions = "0", topic = "${foo.kafka.topic-springBoot}"), containerFactory = "kafkaListenerMessageContainerFactory")

public void consumeSelf(@Headers Map<String, Object> map, KafkaMessage message, ConsumerRecord<String, Object> cr)

{

log.info("message received %s", message);

}

@KafkaListener(topicPartitions = @TopicPartition(partitions = "0", topic = "${foo.kafka.topic-springBoot}"), containerFactory = "kafkaListenerMessageWithAdditionalFieldContainerFactory")

public void consumeSelfAdd(@Headers Map<String, Object> map, KafkaMessageWithAdditionalField message, ConsumerRecord<String, Object> cr)

{

log.info("messageKafkaMessageWithAdditionalField received %s", message);

}

回答:

你不能那样做;您有2个不同的侦听器容器,其中的侦听器期望使用不同的对象。

对于接收不同类型的多个侦听器方法,您需要@KafkaListener在类级别和@KafkaHandler方法级别使用。

请参见有关类的@KafkaListener。

在类级别使用@KafkaListener时,请在方法级别指定@KafkaHandler。传递消息时,将使用转换后的消息有效负载类型来确定要调用的方法。

@KafkaListener(id = "multi", topics = "myTopic")

static class MultiListenerBean {

@KafkaHandler

public void listen(String foo) {

...

}

@KafkaHandler

public void listen(Integer bar) {

...

}

@KafkaHandler(isDefault = true`)

public void listenDefault(Object object) {

...

}

}

默认方法是可选的,用于未知的有效负载类型。

但这仅适用于智能解串器(知道如何转换为不同的有效负载)。

或者,可以将a添加RecordFilterStrategy到侦听器容器工厂中,以跳过每个侦听器中的其他记录。

以上是 使用Spring Kafka反序列化来自同一Kafka主题的不同JSON有效负载 的全部内容, 来源链接: utcz.com/qa/431024.html

回到顶部