Spring Kafka-手动确认

我有一个spring-

boot应用程序,它侦听Kafka流并将记录发送到某些服务以进行进一步处理。该服务有时可能会失败。注释中提到了异常情况。到目前为止,我自己模拟了服务成功和异常情况。

侦听器代码:

@Autowired

PlanitService service

@KafkaListener(

topics = "${app.topic}",

groupId = "notifGrp",

containerFactory = "storeKafkaListener")

public void processStoreNotify(StoreNotify store) throws RefrigAlarmNotifyException{

service.planitStoreNotification(store);

// Some other logic which throws custom exception

// RefrigAlarmNotifyException

}

}

消费者工厂配置如下:

@Bean

public ConsumerFactory<String, StoreNotify> storeConsumerFactory() {

Map<String, Object> config = new HashMap<>();

config.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaProperties.getConsumerBootstrapServers());

config.put(ConsumerConfig.GROUP_ID_CONFIG, "notifGrp");

config.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);

config.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JsonDeserializer.class);

config.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest");

try (ErrorHandlingDeserializer2<String> headerErrorHandlingDeserializer = new ErrorHandlingDeserializer2<>(

new StringDeserializer());

ErrorHandlingDeserializer2<StoreNotify> errorHandlingDeserializer = new ErrorHandlingDeserializer2<>(

new JsonDeserializer<>(StoreNotify.class, objectMapper()))) {

return new DefaultKafkaConsumerFactory<>(config, headerErrorHandlingDeserializer,

errorHandlingDeserializer);

}

}

@Bean

public ConcurrentKafkaListenerContainerFactory<String, StoreNotify> storeKafkaListener() {

ConcurrentKafkaListenerContainerFactory<String, StoreNotify> factory = new ConcurrentKafkaListenerContainerFactory<>();

factory.setConsumerFactory(storeConsumerFactory());

factory.getContainerProperties().setAckOnError(false);

factory.getContainerProperties().setAckMode(AckMode.RECORD);

//factory.setMessageConverter(new ByteArrayJsonMessageConverter());

DeadLetterPublishingRecoverer recoverer = new DeadLetterPublishingRecoverer(kafkaTemplate,

(r, e) -> {

LOGGER.error("Exception is of type: ", e);

if (e instanceof RestClientException) {

LOGGER.error("RestClientException while processing {} ", r.value(), e);

return new TopicPartition(storeDeadLtrTopic, r.partition());

}

else {

LOGGER.error("Generic exception while processing {} ", r.value(), e);

return new TopicPartition(storeErrorTopic, r.partition());

}

});

factory.setErrorHandler(new SeekToCurrentErrorHandler(recoverer, new FixedBackOff(0L, 0L)));

return factory;

}

当REST服务抛出RestClientException时,它应该进入上述 if 块。关于

,我不希望SeekToCurrentErrorHandler进行重试处理,因此我将第二个参数传递为0l。我只希望它发送具有指定主题的记录。如果我写错了,请纠正我。异常堆栈跟踪为

org.springframework.kafka.listener.ListenerExecutionFailedException: Listener method 'public void com.demo.ran.consumer.StoreKafkaConsumer.processStoreNotifMessage(com.demo.ran.model.StoreNotify) throws com.demo.ran.exception.RefrigAlarmNotifyException' threw exception; nested exception is org.springframework.web.client.RestClientException: Service exception; nested exception is org.springframework.web.client.RestClientException: Service exception

at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.decorateException(KafkaMessageListenerContainer.java:1742) ~[spring-kafka-2.3.5.RELEASE.jar:2.3.5.RELEASE]

at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeErrorHandler(KafkaMessageListenerContainer.java:1730) ~[spring-kafka-2.3.5.RELEASE.jar:2.3.5.RELEASE]

at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doInvokeRecordListener(KafkaMessageListenerContainer.java:1647) ~[spring-kafka-2.3.5.RELEASE.jar:2.3.5.RELEASE]

at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doInvokeWithRecords(KafkaMessageListenerContainer.java:1577) ~[spring-kafka-2.3.5.RELEASE.jar:2.3.5.RELEASE]

at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeRecordListener(KafkaMessageListenerContainer.java:1485) ~[spring-kafka-2.3.5.RELEASE.jar:2.3.5.RELEASE]

at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeListener(KafkaMessageListenerContainer.java:1235) ~[spring-kafka-2.3.5.RELEASE.jar:2.3.5.RELEASE]

at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.pollAndInvoke(KafkaMessageListenerContainer.java:985) ~[spring-kafka-2.3.5.RELEASE.jar:2.3.5.RELEASE]

at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.run(KafkaMessageListenerContainer.java:905) ~[spring-kafka-2.3.5.RELEASE.jar:2.3.5.RELEASE]

at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) ~[na:1.8.0_241]

at java.util.concurrent.FutureTask.run(FutureTask.java:266) ~[na:1.8.0_241]

at java.lang.Thread.run(Thread.java:748) ~[na:1.8.0_241]

Caused by: org.springframework.web.client.RestClientException: Service exception

at com.demo.ran.service.PlanitService.planitStoreNotification(PlanitService.java:53) ~[classes/:na]

at com.demo.ran.consumer.StoreKafkaConsumer.processStoreNotifMessage(StoreKafkaConsumer.java:48) ~[classes/:na]

at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) ~[na:1.8.0_241]

at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) ~[na:1.8.0_241]

at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) ~[na:1.8.0_241]

at java.lang.reflect.Method.invoke(Method.java:498) ~[na:1.8.0_241]

at org.springframework.messaging.handler.invocation.InvocableHandlerMethod.doInvoke(InvocableHandlerMethod.java:171) ~[spring-messaging-5.2.3.RELEASE.jar:5.2.3.RELEASE]

at org.springframework.messaging.handler.invocation.InvocableHandlerMethod.invoke(InvocableHandlerMethod.java:120) ~[spring-messaging-5.2.3.RELEASE.jar:5.2.3.RELEASE]

at org.springframework.kafka.listener.adapter.HandlerAdapter.invoke(HandlerAdapter.java:48) ~[spring-kafka-2.3.5.RELEASE.jar:2.3.5.RELEASE]

at org.springframework.kafka.listener.adapter.MessagingMessageListenerAdapter.invokeHandler(MessagingMessageListenerAdapter.java:326) ~[spring-kafka-2.3.5.RELEASE.jar:2.3.5.RELEASE]

at org.springframework.kafka.listener.adapter.RecordMessagingMessageListenerAdapter.onMessage(RecordMessagingMessageListenerAdapter.java:86) ~[spring-kafka-2.3.5.RELEASE.jar:2.3.5.RELEASE]

at org.springframework.kafka.listener.adapter.RecordMessagingMessageListenerAdapter.onMessage(RecordMessagingMessageListenerAdapter.java:51) ~[spring-kafka-2.3.5.RELEASE.jar:2.3.5.RELEASE]

at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doInvokeOnMessage(KafkaMessageListenerContainer.java:1696) ~[spring-kafka-2.3.5.RELEASE.jar:2.3.5.RELEASE]

at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeOnMessage(KafkaMessageListenerContainer.java:1679) ~[spring-kafka-2.3.5.RELEASE.jar:2.3.5.RELEASE]

at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doInvokeRecordListener(KafkaMessageListenerContainer.java:1634) ~[spring-kafka-2.3.5.RELEASE.jar:2.3.5.RELEASE]

... 8 common frames omitted

回答:

在此用例中,您无需使用手动工具。只需配置a

SeekToCurrentErrorHandler并将异常抛出到容器;它将丢弃未处理的记录,执行查找并重新传递失败的消息。

请参阅文档。

您可以使用来配置错误处理程序,DeadLetterPublishingRecoverer该程序可用于在重试一些次数后将记录发送到一个死信主题。

您可以配置可重试的异常。

        } catch (Exception exception) {

LOGGER.error("Exception while calling the service ", exception);

// Ignore the record

}

您不能像这样“吃掉”异常,让它传播到容器中。

使用MANUAL Acknowledgmentack时,必须添加作为参数并确认。

以上是 Spring Kafka-手动确认 的全部内容, 来源链接: utcz.com/qa/433082.html

回到顶部