如何设置onFailure事件(spring,kafka)的超时时间?

我正在尝试在Spring

MVC中实现将消息发送到Kafka的异步REST方法。一切正常,但是当服务器不可用时,onFailure事件将处理很长时间。如何将ListenableFuture中的响应时间限制为例如三秒。

这是我的代码:

@Autowired

KafkaTemplate<String, String> kafkaTemplate;

@Value("${spring.kafka.topic}")

String topic;

@RequestMapping("/test")

DeferredResult<ResponseEntity<?>> test(

@RequestParam(value = "message") String message

) {

DeferredResult<ResponseEntity<?>> deferredResult = new DeferredResult<>();

ListenableFuture<SendResult<String, String>> future = kafkaTemplate.send(topic, "testKey", message);

future.addCallback(new ListenableFutureCallback<SendResult<String, String>>() {

@Override

public void onSuccess(SendResult<String, String> sendResult) {

ResponseEntity<String> responseEntity = new ResponseEntity<>("SUCCESS", HttpStatus.OK);

deferredResult.setResult(responseEntity);

}

@Override

public void onFailure(Throwable ex) {

ResponseEntity<String> responseEntity = new ResponseEntity<>("FAILURE", HttpStatus.OK);

deferredResult.setResult(responseEntity);

}

});

return deferredResult;

}

我试图使用REQUEST_TIMEOUT_MS_CONFIGKafka 的属性和.get(long timeout, TimeUnit

unit)ListenableFuture的方法,但没有得到想要的结果。

回答:

这是因为生产者会阻塞60秒(默认情况下)。

参见max.block.msKafkaDocumentation中的生产者配置。

max.block.ms配置控制了KafkaProducer.send()和KafkaProducer.partitionsFor()的阻塞时间,这些方法可以由于缓冲区已满或元数据不可用而被阻塞,用户提供的序列化器或分区器中的阻塞将不计入此超时时间。

以上是 如何设置onFailure事件(spring,kafka)的超时时间? 的全部内容, 来源链接: utcz.com/qa/432953.html

回到顶部