springboot项目中使用Kafka消息队列
1、Kafka消息生产者
1)创建springboot-kafka-producer-demo项目,导入jar包。
<dependency> <groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
<version>2.4.5.RELEASE</version>
</dependency>
2)在application.properties添加Kafka producer配置项
spring.kafka.bootstrap-servers=127.0.0.1:9092spring.kafka.template.default-topic=topic-test
spring.kafka.listener.missing-topics-fatal=false
spring.kafka.listener.concurrency= 3
spring.kafka.producer.client-id=${spring.application.name}
# 写入失败时,重试次数。当leader节点失效,一个repli节点会替代成为leader节点,此时可能出现写入失败,
# 当retris为0时,produce不会重复。retirs重发,此时repli节点完全成为leader节点,不会产生消息丢失。
spring.kafka.producer.retries=0
# 每次批量发送消息的数量,produce积累到一定数据,一次发送
spring.kafka.producer.batch-size=16384
# produce积累数据一次发送,缓存大小达到buffer.memory就发送数据
spring.kafka.producer.buffer-memory=33554432
spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializer
spring.kafka.producer.value-serializer=org.apache.kafka.common.serialization.StringSerializer
#request.required.acks有三个值 0 1 -1
#0:生产者不会等待broker的ack,这个延迟最低但是存储的保证最弱当server挂掉的时候就会丢数据
#1:服务端会等待ack值 leader副本确认接收到消息后发送ack但是如果leader挂掉后他不确保是否复制完成新leader也会导致数据丢失
#-1:同样在1的基础上 服务端会等所有的follower的副本受到数据后才会受到leader发出的ack,这样数据不会丢
spring.kafka.producer.acks=-1
3)Kafka消息生产者启动类中加入@EnableKafka注解
4)编写Kafka消息发送类-Kafka消息生产者
@Slf4j@Component
public class KafkaSender {
@Autowired
private Globals globals;
@Resource
private KafkaTemplate<String,String> kafkaTemplate;
/**
* 发送消息方法
* @param msg
*/
public ResponseEntity send(String msg) {
log.info("发送消息,消息内容 : {}", msg);
try {
String uuid = UUID.randomUUID().toString();
String topic = globals.getTopic();
Message message = new Message();
message.setId(uuid);
message.setMsg(msg);
message.setSendTime(new Date());
ListenableFuture listenableFuture = kafkaTemplate.send(topic, uuid, JSON.toJSONString(message));
//发送成功后回调
SuccessCallback<SendResult<String,String>> successCallback = new SuccessCallback<SendResult<String,String>>() {
@Override
public void onSuccess(SendResult<String,String> result) {
log.info("发送消息成功");
}
};
//发送失败回调
FailureCallback failureCallback = new FailureCallback() {
@Override
public void onFailure(Throwable ex) {
log.error("发送消息失败", ex);
}
};
listenableFuture.addCallback(successCallback,failureCallback);
}catch (Exception e){
log.error("发送消息异常", e);
}
return new ResponseEntity("", HttpStatus.OK);
}
}
5)发送消息测试类
public class MsgSenderTest extends KafkaProducerApplicationTest { @Autowired
MsgSender sender;
@Test
public void send() {
sender.send("这是Kafka发送的消息内容" + System.currentTimeMillis());
}
}
2、Kafka消息消费者
1)创建springboot-kafka-consumer-demo项目,导入jar包。
<dependency> <groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
<version>2.4.5.RELEASE</version>
</dependency>
2)在application.properties添加Kafka consumer配置项
#============== kafka ===================# 指定kafka 代理地址,可以多个
spring.kafka.bootstrap-servers=127.0.0.1:9092
spring.kafka.template.default-topic=topic-test
spring.kafka.listener.missing-topics-fatal=false
spring.kafka.listener.concurrency=3
spring.cloud.bus.trace.enabled=true
#=============== consumer =======================
spring.kafka.consumer.client-id=${spring.application.name}
# 指定默认消费者group id --> 由于在kafka中,同一组中的consumer不会读取到同一个消息,依靠groud.id设置组名
spring.kafka.consumer.group-id=test
# smallest和largest才有效,如果smallest重新0开始读取,如果是largest从logfile的offset读取。一般情况下我们都是设置smalles
spring.kafka.consumer.auto-offset-reset=earliest
spring.kafka.consumer.enable-auto-commit=false
# 如果"enable.auto.commit"为true,则消费者偏移自动提交给Kafka的频率(以毫秒为单位),默认值为5000。
spring.kafka.consumer.auto-commit-interval=100
spring.kafka.consumer.max-poll-records=10
# 指定消息key和消息体的编解码方式
spring.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.StringDeserializer
spring.kafka.consumer.value-deserializer=org.apache.kafka.common.serialization.StringDeserializer
3)Kafka消息消费者启动类中加入@EnableKafka注解
4)用@KafkaListener创建Kafka消息消费者类
@Componentpublic class KafkaBatchConsumer {
protected final Logger logger = LoggerFactory.getLogger(this.getClass());
@KafkaListener(topics = {"${spring.kafka.template.default-topic}"},
containerFactory = "kafkaListenerContainerFactory")
public void listen(List<ConsumerRecord> records, Acknowledgment ack) {
try {
for (ConsumerRecord record : records) {
logger.info("接收消息: offset = {}, key = {}, value = {} ",
record.offset(), record.key(), record.value());
}
} catch (Exception e) {
logger.error("kafka接收消息异常",e);
} finally {
//手动提交偏移量
ack.acknowledge();
}
}
}
3、测试Kafka
先启动Kafka消息消费者springboot-kafka-consumer-demo项目
1)在Kafka消息生产者的MsgSenderTest类执行send()方法,如下图
2)切换到Kafka消息消费者查看消费结果
源码示例:https://gitee.com/lion123/springboot-kafka-demo
以上是 springboot项目中使用Kafka消息队列 的全部内容, 来源链接: utcz.com/z/515448.html