如何设置onFailure事件(spring,kafka)的超时时间?
我正在尝试在Spring
MVC中实现将消息发送到Kafka的异步REST方法。一切正常,但是当服务器不可用时,onFailure事件将处理很长时间。如何将ListenableFuture中的响应时间限制为例如三秒。
这是我的代码:
@AutowiredKafkaTemplate<String, String> kafkaTemplate;
@Value("${spring.kafka.topic}")
String topic;
@RequestMapping("/test")
DeferredResult<ResponseEntity<?>> test(
@RequestParam(value = "message") String message
) {
DeferredResult<ResponseEntity<?>> deferredResult = new DeferredResult<>();
ListenableFuture<SendResult<String, String>> future = kafkaTemplate.send(topic, "testKey", message);
future.addCallback(new ListenableFutureCallback<SendResult<String, String>>() {
@Override
public void onSuccess(SendResult<String, String> sendResult) {
ResponseEntity<String> responseEntity = new ResponseEntity<>("SUCCESS", HttpStatus.OK);
deferredResult.setResult(responseEntity);
}
@Override
public void onFailure(Throwable ex) {
ResponseEntity<String> responseEntity = new ResponseEntity<>("FAILURE", HttpStatus.OK);
deferredResult.setResult(responseEntity);
}
});
return deferredResult;
}
我试图使用REQUEST_TIMEOUT_MS_CONFIG
Kafka 的属性和.get(long timeout, TimeUnit
unit)ListenableFuture的方法,但没有得到想要的结果。
回答:
这是因为生产者会阻塞60秒(默认情况下)。
参见max.block.ms
KafkaDocumentation中的生产者配置。
max.block.ms
配置控制了KafkaProducer.send()和KafkaProducer.partitionsFor()的阻塞时间,这些方法可以由于缓冲区已满或元数据不可用而被阻塞,用户提供的序列化器或分区器中的阻塞将不计入此超时时间。
以上是 如何设置onFailure事件(spring,kafka)的超时时间? 的全部内容, 来源链接: utcz.com/qa/432953.html