一次机房停电引发的思考kafka相关

编程

版本信息

  • spring-boot:2.0.6.RELEASE        
  • spring-kafka:2.1.2.RELEASE        
  • kafka-clients:1.0.2

为什么阻塞了60s?

首先我们知道kafkaTemplat.send底层是调用KafkaProducer的send方法

public Future<RecordMetadata> send(ProducerRecord<K, V> record, Callback callback) {}

根据文档的说明它是一个异步的发送方法,按道理不管如何它都不应该阻塞主线程,但实际中某些情况下会出现阻塞线程,比如broker未正确运行,topic未创建等情况。具体得源码分析参考 https://www.cnblogs.com/felixzh/p/11849296.html

查询官方文档http://kafka.apache.org/10/documentation.html得知,具体阻塞多久是由max.block.ms 参数决定的,由于我们的业务场景是高容忍消息丢失,低容忍阻塞请求,所以需要进行优化,下面简单介绍一下2种优化方案。

!!!注意,以下方案只适用于高容忍消息丢失,低容忍阻塞请求业务场景

优化方案

方案1:参数调优

  • max.block.ms调整到100ms,这个参数有以下2个作用

    1. 用于配置send数据或partitionFor函数得到对应的leader时,最大的等待时间,默认值为60秒
    2. 控制生产者可用的缓存总量,如果消息发送速度比其传输到服务器的快,将会耗尽buffer.memory这个缓存空间。当缓存空间耗尽,其他发送调用将被阻塞,阻塞时间的阈值通过max.block.ms设定, 之后它将抛出一个TimeoutException。
  • 关闭自动重试 retries=0 默认就是0
  • 其他
  • acks=0,acks有4个选项[all, -1, 0, 1] 。这里不确定会不会阻塞send方法,但是高容忍消息丢失,低容忍阻塞请求的业务场景配置成0就好了

    • 0:不保证消息的到达确认,只管发送,低延迟但是会出现消息的丢失,在某个server失败的情况下,有点像TCP
    • 1:发送消息,并会等待leader 收到确认后,一定的可靠性
    • -1或all:发送消息,等待leader收到确认,并进行复制操作后,才返回,最高的可靠性
  • 其他参数参考 http://kafka.apache.org/10/documentation.html

虽然调整一些参数,但是kafka集群不可用或请求量过大时,还是对主流程有短暂的阻塞

方案2:真异步

kafkaTemplat.send方法其实是个假异步方法,所以需要自己实现真异步,这里构造一个公用的线程池来处理就可以了,下面为参考代码

package com.qiaofang.tortoise.gateway.component;

import com.qiaofang.tortoise.gateway.config.KafkaAsyncProperties;

import org.springframework.kafka.core.KafkaTemplate;

import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;

import java.util.concurrent.ThreadPoolExecutor;

/**

* kafka异步操作工具类

*

* @author chenhao

* @version 1.0

* @date 2020/7/2 3:47 下午

*/

public class KafkaAsyncUtil {

private final KafkaTemplate kafkaTemplate;

private final KafkaAsyncProperties kafkaAsyncProperties;

public KafkaAsyncUtil(KafkaTemplate kafkaTemplate, KafkaAsyncProperties kafkaAsyncProperties) {

this.kafkaTemplate = kafkaTemplate;

this.kafkaAsyncProperties = kafkaAsyncProperties;

init();

}

private ThreadPoolTaskExecutor executor;

private void init() {

executor = new ThreadPoolTaskExecutor();

executor.setCorePoolSize(kafkaAsyncProperties.getThreadPoolCoreThreads());

executor.setMaxPoolSize(kafkaAsyncProperties.getThreadPoolMaxThreads());

executor.setQueueCapacity(kafkaAsyncProperties.getThreadPoolQueueSize());

executor.setThreadNamePrefix("kafka-async-util-pool-");

//高容忍消息丢失场景,工作队列满了之后直接丢弃

executor.setRejectedExecutionHandler(new ThreadPoolExecutor.DiscardPolicy());

executor.initialize();

}

/**

* 发送消息

*

* @param topic

* @param data

*/

public void send(String topic, Object data) {

executor.execute(() -> kafkaTemplate.send(topic, data));

}

}

/**

* kafka异步操作相关配置

* @author chenhao

* @version 1.0

* @date 2020/7/2 3:47 下午

*/

@Data

@ConfigurationProperties(prefix = "tortoise.kafka.async")

public class KafkaAsyncProperties {

/**

* core

*/

private Integer threadPoolCoreThreads = 3;

/**

* max

*/

private Integer threadPoolMaxThreads = 3;

/**

* queue大小

*/

private Integer threadPoolQueueSize = 10000;

}

有文章《关于高并发下kafka producer send异步发送耗时问题的分析》说多线程高并发下producer.send的损耗比较严重,这个还要等到后续压测之后再更新文章吧

参考文章

站在巨人的肩膀上

  • Kafka producer异步发送在某些情况会阻塞主线程,使用时候慎重
  • HAVENT原创 Spring Boot + Spring-Kafka 异步配置
  • 关于高并发下kafka producer send异步发送耗时问题的分析
  • http://kafka.apache.org/10/documentation.html

 

以上是 一次机房停电引发的思考kafka相关 的全部内容, 来源链接: utcz.com/z/517999.html

回到顶部