javawebkafka的简单使用

编程

一.依赖类库

kafka和springboot的版本需要对应,都使用最新版本即可.

<parent>

<groupId>org.springframework.boot</groupId>

<artifactId>spring-boot-starter-parent</artifactId>

<version>2.2.1.RELEASE</version>

</parent>

<!-- https://mvnrepository.com/artifact/org.springframework.kafka/spring-kafka -->

<dependency>

<groupId>org.springframework.kafka</groupId>

<artifactId>spring-kafka</artifactId>

<version>2.3.3.RELEASE</version>

</dependency>

<!-- https://mvnrepository.com/artifact/org.apache.kafka/kafka -->

<dependency>

<groupId>org.apache.kafka</groupId>

<artifactId>kafka_2.11</artifactId>

<version>2.3.1</version>

</dependency>

二.生产者

public static final String TOPIC="smartdm_log";

private void sendMessageToKafka(RequestLogEntity requestLogEntity){

Properties kafkaProps = new Properties();

kafkaProps.put("bootstrap.servers","192.168.9.11:9092");

kafkaProps.put("key.serializer",

"org.apache.kafka.common.serialization.StringSerializer");

kafkaProps.put("value.serializer",

"org.apache.kafka.common.serialization.StringSerializer") ;

KafkaProducer producer = new KafkaProducer<String,String>(kafkaProps);

String data = sglobalGson.toJson(requestLogEntity);

producer.send(new ProducerRecord<String, String>(TOPIC,requestLogEntity.getModel(),data));

}

 

三.消费者

在yml文件中添加配置

spring:

kafka:

consumer:

enable-auto-commit: true

group-id: smartdm_log

auto-offset-reset: latest

bootstrap-servers: 192.168.9.11:9092

import org.springframework.beans.factory.annotation.Autowired;

import org.springframework.kafka.annotation.KafkaListener;

import org.springframework.stereotype.Component;

@Component

public class KafkaLogConsumer {

@KafkaListener(topics = {"smartdm_log"})

public void receive(String message) {

System.out.println("app_log--消费消息:" + message);

}

}

以上是 javawebkafka的简单使用 的全部内容, 来源链接: utcz.com/z/511526.html

回到顶部