【Java】SpringBoot整合kafka

一、背景

此处简单记录一下 SpringBootKafka 的整合。

二、实现步骤

1、引入jar包

<dependency>

<groupId>org.springframework.kafka</groupId>

<artifactId>spring-kafka</artifactId>

</dependency>

2、编写生产者和消费者的配置

3、生产者配置

spring.application.name=kafka-springboot

# 配置 kafka 服务器的地址,多个以逗号隔开

spring.kafka.bootstrap-servers=localhost:9092,localhost:9093,localhost:9094

# 生产者配置

spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializer

spring.kafka.producer.value-serializer=org.apache.kafka.common.serialization.StringSerializer

spring.kafka.producer.acks=1

spring.kafka.producer.retries=0

spring.kafka.producer.batch-size=16384

spring.kafka.producer.buffer-memory=33554432

4、消费者配置

# 消费者配置

# 关闭自动提交 ack

spring.kafka.consumer.enable-auto-commit=false

spring.kafka.consumer.auto-commit-interval=100

spring.kafka.consumer.auto-offset-reset=earliest

spring.kafka.consumer.max-poll-records=500

spring.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.StringDeserializer

spring.kafka.consumer.value-deserializer=org.apache.kafka.common.serialization.StringDeserializer

# 配置监听手动提交 ack ,消费一条数据完后,立即提交

spring.kafka.listener.ack-mode=manual_immediate

# 经测试也是批量提交的ack , 当消费完 spring.kafka.consumer.max-poll-records 这么多的数据时候,提交

#spring.kafka.listener.ack-mode=manual

spring.kafka.listener.poll-timeout=500S

5、消费者手动提交 ack

1、spring.kafka.consumer.enable-auto-commit 修改成 false

2、spring.kafka.listener.ack-mode 修改成

|- manual: 表示手动提交,但是测试下来发现是批量提交

|- manual_immediate: 表示手动提交,当调用 Acknowledgment#acknowledge之后立马提交。

3、编写生产者代码

@Component

public class KafkaProducer implements CommandLineRunner {

@Autowired

private KafkaTemplate<String, String> kafkaTemplate;

@Override

public void run(String... args) {

Executors.newSingleThreadScheduledExecutor().scheduleAtFixedRate(() ->

{

kafkaTemplate.send(KafkaConstant.TOPIC, String.valueOf(System.currentTimeMillis()))

.addCallback(new SuccessCallback<SendResult<String, String>>() {

@Override

public void onSuccess(SendResult<String, String> result) {

if (null != result.getRecordMetadata()) {

System.out.println("消费发送成功 offset:" + result.getRecordMetadata().offset());

return;

}

System.out.println("消息发送成功");

}

}, new FailureCallback() {

@Override

public void onFailure(Throwable throwable) {

System.out.println("消费发送失败:" + throwable.getMessage());

}

});

},

0, 1, TimeUnit.SECONDS);

}

}

1、消费的发送使用KafkaTemplate

2、根据发送的结果知道,消息发送成功还是失败。

4、编写消费者代码

@Component

public class KafkaConsumer {

@KafkaListener(topics = KafkaConstant.TOPIC, groupId = "kafka-springboot-001")

public void consumer(ConsumerRecord<String, String> record, Acknowledgment ack) throws InterruptedException {

System.out.println(LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss")) + "接收到kafka消息,partition:" + record.partition() + ",offset:" + record.offset() + "value:" + record.value());

TimeUnit.SECONDS.sleep(1);

ack.acknowledge();

}

}

KafkaListener:

topic: 表示需要监听的队列名称

groupId: 表示消费者组的id

三、运行结果

【Java】SpringBoot整合kafka

四、参考文档

1、https://docs.spring.io/spring-boot/docs/2.4.2/reference/htmlsingle/#boot-features-kafka

五、代码路径

https://gitee.com/huan1993/rabbitmq/tree/master/kafka-springboot/src/main/java/com/huan/study/kafka

以上是 【Java】SpringBoot整合kafka 的全部内容, 来源链接: utcz.com/a/111199.html

回到顶部