spring集成kafka

编程

1、引入依赖jar包

<dependency>

<groupId>org.springframework.kafka</groupId>

<artifactId>spring-kafka</artifactId>

</dependency>

2、配置kafka信息

spring:

kafka:

bootstrap-servers: localhost:9092

consumer:

group-id: group1

listener:

missing-topics-fatal: false

启动报错,需要配置missing-topics-fatal为false

org.springframework.context.ApplicationContextException: Failed to start bean "org.springframework.kafka.config.internalKafkaListenerEndpointRegistry"; nested exception is java.lang.IllegalStateException: Topic(s) [topic1] is/are not present and missingTopicsFatal is true

at org.springframework.context.support.DefaultLifecycleProcessor.doStart(DefaultLifecycleProcessor.java:185)

at org.springframework.context.support.DefaultLifecycleProcessor.access$200(DefaultLifecycleProcessor.java:53)

at org.springframework.context.support.DefaultLifecycleProcessor$LifecycleGroup.start(DefaultLifecycleProcessor.java:360)

at org.springframework.context.support.DefaultLifecycleProcessor.startBeans(DefaultLifecycleProcessor.java:158)

at org.springframework.context.support.DefaultLifecycleProcessor.onRefresh(DefaultLifecycleProcessor.java:122)

at org.springframework.context.support.AbstractApplicationContext.finishRefresh(AbstractApplicationContext.java:894)

at org.springframework.boot.web.servlet.context.ServletWebServerApplicationContext.finishRefresh(ServletWebServerApplicationContext.java:162)

at org.springframework.context.support.AbstractApplicationContext.refresh(AbstractApplicationContext.java:553)

at org.springframework.boot.web.servlet.context.ServletWebServerApplicationContext.refresh(ServletWebServerApplicationContext.java:141)

at org.springframework.boot.SpringApplication.refresh(SpringApplication.java:747)

at org.springframework.boot.SpringApplication.refreshContext(SpringApplication.java:397)

at org.springframework.boot.SpringApplication.run(SpringApplication.java:315)

at org.springframework.boot.SpringApplication.run(SpringApplication.java:1226)

at org.springframework.boot.SpringApplication.run(SpringApplication.java:1215)

at com.zhoulp.SyncMessageWebApplication.main(SyncMessageWebApplication.java:24)

Caused by: java.lang.IllegalStateException: Topic(s) [topic1] is/are not present and missingTopicsFatal is true

at org.springframework.kafka.listener.AbstractMessageListenerContainer.checkTopics(AbstractMessageListenerContainer.java:383)

at org.springframework.kafka.listener.ConcurrentMessageListenerContainer.doStart(ConcurrentMessageListenerContainer.java:144)

at org.springframework.kafka.listener.AbstractMessageListenerContainer.start(AbstractMessageListenerContainer.java:340)

at org.springframework.kafka.config.KafkaListenerEndpointRegistry.startIfNecessary(KafkaListenerEndpointRegistry.java:312)

at org.springframework.kafka.config.KafkaListenerEndpointRegistry.start(KafkaListenerEndpointRegistry.java:257)

at org.springframework.context.support.DefaultLifecycleProcessor.doStart(DefaultLifecycleProcessor.java:182)

... 14 common frames omitted

3、实现生产者

package com.zhoulp.producer;

import javax.inject.Inject;

import org.slf4j.Logger;

import org.slf4j.LoggerFactory;

import org.springframework.kafka.core.KafkaTemplate;

import org.springframework.stereotype.Component;

/**

*

* @author zhoulp

* @date 2020-08-03

*

*/

@Component("kafkaProducer")

public class KafkaProducer {

private static Logger log = LoggerFactory.getLogger(KafkaProducer.class);

@Inject

private KafkaTemplate<String, String> template;

public void sendMessage(String topic, String data) {

log.info("send: topic = {}, data = {}", topic, data);

template.send(topic, data);

}

}

4、实现消费者

package com.zhoulp.consumer;

import org.apache.kafka.clients.consumer.ConsumerRecord;

import org.slf4j.Logger;

import org.slf4j.LoggerFactory;

import org.springframework.kafka.annotation.KafkaListener;

import org.springframework.stereotype.Component;

/**

*

* @author zhoulp

* @date 2020-08-03

*

*/

@Component("kafkaConsumer")

public class KafkaConsumer {

private static Logger log = LoggerFactory.getLogger(KafkaConsumer.class);

@KafkaListener(topics = "topic1")

public void listenTopic1(ConsumerRecord<String, String> consumerRecord) {

log.info("listenTopic1");

log.info(consumerRecord.toString());

log.info(consumerRecord.topic());

log.info(consumerRecord.value());

}

}

以上是 spring集成kafka 的全部内容, 来源链接: utcz.com/z/519060.html

回到顶部