springboot项目中使用Kafka消息队列

编程

1、Kafka消息生产者

1)创建springboot-kafka-producer-demo项目,导入jar包。

<dependency>

<groupId>org.springframework.kafka</groupId>

<artifactId>spring-kafka</artifactId>

<version>2.4.5.RELEASE</version>

</dependency>

2)在application.properties添加Kafka producer配置项

spring.kafka.bootstrap-servers=127.0.0.1:9092

spring.kafka.template.default-topic=topic-test

spring.kafka.listener.missing-topics-fatal=false

spring.kafka.listener.concurrency= 3

spring.kafka.producer.client-id=${spring.application.name}

# 写入失败时,重试次数。当leader节点失效,一个repli节点会替代成为leader节点,此时可能出现写入失败,

# 当retris为0时,produce不会重复。retirs重发,此时repli节点完全成为leader节点,不会产生消息丢失。

spring.kafka.producer.retries=0

# 每次批量发送消息的数量,produce积累到一定数据,一次发送

spring.kafka.producer.batch-size=16384

# produce积累数据一次发送,缓存大小达到buffer.memory就发送数据

spring.kafka.producer.buffer-memory=33554432

spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializer

spring.kafka.producer.value-serializer=org.apache.kafka.common.serialization.StringSerializer

#request.required.acks有三个值 0 1 -1

#0:生产者不会等待broker的ack,这个延迟最低但是存储的保证最弱当server挂掉的时候就会丢数据

#1:服务端会等待ack值 leader副本确认接收到消息后发送ack但是如果leader挂掉后他不确保是否复制完成新leader也会导致数据丢失

#-1:同样在1的基础上 服务端会等所有的follower的副本受到数据后才会受到leader发出的ack,这样数据不会丢

spring.kafka.producer.acks=-1

3)Kafka消息生产者启动类中加入@EnableKafka注解

4)编写Kafka消息发送类-Kafka消息生产者

@Slf4j

@Component

public class KafkaSender {

@Autowired

private Globals globals;

@Resource

private KafkaTemplate<String,String> kafkaTemplate;

/**

* 发送消息方法

* @param msg

*/

public ResponseEntity send(String msg) {

log.info("发送消息,消息内容 : {}", msg);

try {

String uuid = UUID.randomUUID().toString();

String topic = globals.getTopic();

Message message = new Message();

message.setId(uuid);

message.setMsg(msg);

message.setSendTime(new Date());

ListenableFuture listenableFuture = kafkaTemplate.send(topic, uuid, JSON.toJSONString(message));

//发送成功后回调

SuccessCallback<SendResult<String,String>> successCallback = new SuccessCallback<SendResult<String,String>>() {

@Override

public void onSuccess(SendResult<String,String> result) {

log.info("发送消息成功");

}

};

//发送失败回调

FailureCallback failureCallback = new FailureCallback() {

@Override

public void onFailure(Throwable ex) {

log.error("发送消息失败", ex);

}

};

listenableFuture.addCallback(successCallback,failureCallback);

}catch (Exception e){

log.error("发送消息异常", e);

}

return new ResponseEntity("", HttpStatus.OK);

}

}

5)发送消息测试类

public class MsgSenderTest extends KafkaProducerApplicationTest {

@Autowired

MsgSender sender;

@Test

public void send() {

sender.send("这是Kafka发送的消息内容" + System.currentTimeMillis());

}

}

2、Kafka消息消费者

1)创建springboot-kafka-consumer-demo项目,导入jar包。

<dependency>

<groupId>org.springframework.kafka</groupId>

<artifactId>spring-kafka</artifactId>

<version>2.4.5.RELEASE</version>

</dependency>

2)在application.properties添加Kafka consumer配置项

#============== kafka ===================

# 指定kafka 代理地址,可以多个

spring.kafka.bootstrap-servers=127.0.0.1:9092

spring.kafka.template.default-topic=topic-test

spring.kafka.listener.missing-topics-fatal=false

spring.kafka.listener.concurrency=3

spring.cloud.bus.trace.enabled=true

#=============== consumer =======================

spring.kafka.consumer.client-id=${spring.application.name}

# 指定默认消费者group id --> 由于在kafka中,同一组中的consumer不会读取到同一个消息,依靠groud.id设置组名

spring.kafka.consumer.group-id=test

# smallest和largest才有效,如果smallest重新0开始读取,如果是largest从logfile的offset读取。一般情况下我们都是设置smalles

spring.kafka.consumer.auto-offset-reset=earliest

spring.kafka.consumer.enable-auto-commit=false

# 如果"enable.auto.commit"为true,则消费者偏移自动提交给Kafka的频率(以毫秒为单位),默认值为5000。

spring.kafka.consumer.auto-commit-interval=100

spring.kafka.consumer.max-poll-records=10

# 指定消息key和消息体的编解码方式

spring.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.StringDeserializer

spring.kafka.consumer.value-deserializer=org.apache.kafka.common.serialization.StringDeserializer

3)Kafka消息消费者启动类中加入@EnableKafka注解

4)用@KafkaListener创建Kafka消息消费者类

@Component

public class KafkaBatchConsumer {

protected final Logger logger = LoggerFactory.getLogger(this.getClass());

@KafkaListener(topics = {"${spring.kafka.template.default-topic}"},

containerFactory = "kafkaListenerContainerFactory")

public void listen(List<ConsumerRecord> records, Acknowledgment ack) {

try {

for (ConsumerRecord record : records) {

logger.info("接收消息: offset = {}, key = {}, value = {} ",

record.offset(), record.key(), record.value());

}

} catch (Exception e) {

logger.error("kafka接收消息异常",e);

} finally {

//手动提交偏移量

ack.acknowledge();

}

}

}

3、测试Kafka

先启动Kafka消息消费者springboot-kafka-consumer-demo项目

1)在Kafka消息生产者的MsgSenderTest类执行send()方法,如下图

2)切换到Kafka消息消费者查看消费结果

 

源码示例:https://gitee.com/lion123/springboot-kafka-demo

以上是 springboot项目中使用Kafka消息队列 的全部内容, 来源链接: utcz.com/z/515448.html

回到顶部