javawebkafka的简单使用
一.依赖类库
kafka和springboot的版本需要对应,都使用最新版本即可.
<parent> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-parent</artifactId> <version>2.2.1.RELEASE</version></parent>
<!-- https://mvnrepository.com/artifact/org.springframework.kafka/spring-kafka --><dependency> <groupId>org.springframework.kafka</groupId> <artifactId>spring-kafka</artifactId> <version>2.3.3.RELEASE</version></dependency>
<!-- https://mvnrepository.com/artifact/org.apache.kafka/kafka --><dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka_2.11</artifactId> <version>2.3.1</version></dependency>
二.生产者
public static final String TOPIC="smartdm_log";private void sendMessageToKafka(RequestLogEntity requestLogEntity){Properties kafkaProps = new Properties();
kafkaProps.put("bootstrap.servers","192.168.9.11:9092");
kafkaProps.put("key.serializer",
"org.apache.kafka.common.serialization.StringSerializer");
kafkaProps.put("value.serializer",
"org.apache.kafka.common.serialization.StringSerializer") ;
KafkaProducer producer = new KafkaProducer<String,String>(kafkaProps);
String data = sglobalGson.toJson(requestLogEntity);
producer.send(new ProducerRecord<String, String>(TOPIC,requestLogEntity.getModel(),data));
}
三.消费者
在yml文件中添加配置
spring:kafka:
consumer:
enable-auto-commit: true
group-id: smartdm_log
auto-offset-reset: latest
bootstrap-servers: 192.168.9.11:9092
import org.springframework.beans.factory.annotation.Autowired;import org.springframework.kafka.annotation.KafkaListener;import org.springframework.stereotype.Component;@Component
public class KafkaLogConsumer {
@KafkaListener(topics = {"smartdm_log"})
public void receive(String message) {
System.out.println("app_log--消费消息:" + message);
}
}
以上是 javawebkafka的简单使用 的全部内容, 来源链接: utcz.com/z/511526.html