Kafka--JAVA代码样例

java

一、原生API

(一)生产者

  生产者的发送可以分为异步发送、异步回调发送和同步发送。除了三种发送方式外,还可以进行批量发送,也可以在发送时对发送者进行拦截进行特殊处理。

  1、异步发送

    异步发送就是生产者将消息发送到分区器后,就不再管后续的流程(分区器是否发送到broker成功)

      

    代码样例:

private static final long EXPIRE_INTERVAL = 10 * 1000;

// 第一个泛型:当前生产者所生产消息的key

// 第二个泛型:当前生产者所生产的消息本身

private KafkaProducer<Integer, String> producer;

public OneProducer() {

Properties properties = new Properties();

// 指定kafka集群

properties.put("bootstrap.servers", "192.168.124.15:9092,192.168.124.15:9093,192.168.124.15:9094");

// 指定key与value的序列化器

properties.put("key.serializer", "org.apache.kafka.common.serialization.IntegerSerializer");

properties.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");

properties.put("acks", "all");// 记录完整提交,最慢的但是最大可能的持久化

// properties.put("retries", 3);// 请求失败重试的次数

properties.put(ProducerConfig.RETRIES_CONFIG, 3);

// properties.put("retry.backoff.ms", 100);// 两次重试之间的时间间隔 默认为100ms

properties.put("batch.size", 16384);// batch的大小 producer将试图批处理消息记录,以减少请求次数。这将改善client与server之间的性能。默认是16384Bytes,即16kB,也就是一个batch满了16kB就发送出去

//如果batch太小,会导致频繁网络请求,吞吐量下降;如果batch太大,会导致一条消息需要等待很久才能被发送出去,而且会让内存缓冲区有很大压力,过多数据缓冲在内存里。

properties.put("linger.ms", 1);// 默认情况即使缓冲区有剩余的空间,也会立即发送请求,设置一段时间用来等待从而将缓冲区填的更多,单位为毫秒,producer发送数据会延迟1ms,可以减少发送到kafka服务器的请求数据

properties.put("buffer.memory", 33554432);// 提供给生产者缓冲内存总量,设置发送消息的缓冲区,默认值是33554432,就是32MB.

// 如果发送消息出去的速度小于写入消息进去的速度,就会导致缓冲区写满,此时生产消息就会阻塞住,所以说这里就应该多做一些压测,尽可能保证说这块缓冲区不会被写满导致生产行为被阻塞住

properties.put("partitioner.class","com.lcl.galaxy.kafka.nativeapi.producer.MyPartitioner");

/**

* 请求超时

* * ==max.request.size==

* * 这个参数用来控制发送出去的消息的大小,默认是1048576字节,也就1mb

* * 这个一般太小了,很多消息可能都会超过1mb的大小,所以需要自己优化调整,把他设置更大一些(企业一般设置成10M)

* * ==request.timeout.ms==

* * 这个就是说发送一个请求出去之后,他有一个超时的时间限制,默认是30秒

* * 如果30秒都收不到响应,那么就会认为异常,会抛出一个TimeoutException来让我们进行处理

*

*重试乱序

* max.in.flight.requests.per.connection

*

* * 每个网络连接已经发送但还没有收到服务端响应的请求个数最大值

* 消息重试是可能导致消息的乱序的,因为可能排在你后面的消息都发送出去了,你现在收到回调失败了才在重试,此时消息就会乱序,

* 所以可以使用“max.in.flight.requests.per.connection”参数设置为1,这样可以保证producer必须把一个请求发送的数据发送成功了再发送后面的请求。避免数据出现乱序

*/

this.producer = new KafkaProducer<Integer, String>(properties);

// ProducerRecord<Integer, String> record3 = new ProducerRecord<>("cities", "tianjin");

}

public void sendMsg() throws ExecutionException, InterruptedException {

// 创建消息记录(包含主题、消息本身) (String topic, V value)

// ProducerRecord<Integer, String> record = new ProducerRecord<>("cities", "tianjin");

// 创建消息记录(包含主题、key、消息本身) (String topic, K key, V value)

// ProducerRecord<Integer, String> record = new ProducerRecord<>("cities", 1, "tianjin");

// 创建消息记录(包含主题、partition、key、消息本身) (String topic, Integer partition, K key, V value)

ProducerRecord<Integer, String> record = new ProducerRecord<>("cities", 0, 1, "hangzhou");

producer.send(record);

}

  2、异步回调发送

    异步回调发送与异步发送的区别就很明显了,当发布完成后,会接受后续结果(分区器到broker的结果)的回调。

    代码样例:

public void sendMsg() {

// 创建消息记录(包含主题、消息本身) (String topic, V value)

// ProducerRecord<Integer, String> record = new ProducerRecord<>("cities", "tianjin");

// 创建消息记录(包含主题、key、消息本身) (String topic, K key, V value)

// ProducerRecord<Integer, String> record = new ProducerRecord<>("cities", 1, "tianjin");

// 创建消息记录(包含主题、partition、key、消息本身) (String topic, Integer partition, K key, V value)

ProducerRecord<Integer, String> record = new ProducerRecord<>("cities", 1, 1, "tianjin");

producer.send(record, (metadata, ex) -> {

System.out.println("ok");

System.out.println("topic = " + metadata.topic());

System.out.println("partition = " + metadata.partition());

System.out.println("offset = " + metadata.offset());

});

// producer.send(record, new Callback() {

// @Override

// public void onCompletion(RecordMetadata metadata, Exception exception) {

// System.out.println("ok");

// System.out.println("topic = " + metadata.topic());

// System.out.println("partition = " + metadata.partition());

// System.out.println("offset = " + metadata.offset());

// }

// });

}

  3、同步发送

    同步发送就是要等到分区器与broker的处理结果才会返回发送结果。

public void sendMsg() throws ExecutionException, InterruptedException {

for(int i=0;i<10;i++){

ProducerRecord<Integer, String> record = new ProducerRecord<>("cities", "beijing-"+i);

/**

* 同步发送或异步阻塞发送

* */

Future<RecordMetadata> send = producer.send(record);

RecordMetadata result = send.get(); //阻塞

System.out.println("partion:"+result.partition()+",offset:"+result.offset());

}

producer.close();

}

  4、批量发送

    Kafka使⽤异步批量的⽅式发送消息。当Producer⽣产⼀条消息时,并不会⽴刻发送到Broker,⽽是先放⼊到消息缓冲区,等到缓冲区满或者消息个数达到限制后,再批量发送到Broker。

    Producer端需要注意以下参数:

      buffer.memory参数:表示消息缓存区的⼤⼩,单位是字节。

      batch.size参数:batch的阈值。当kafka采⽤异步⽅式发送消息时,默认是按照batch模式发送。其中同⼀主题同⼀分区的消息会默认合并到⼀个batch内,当达到阈值后就会发送。

      linger.ms参数:表示消息的最⼤的延时时间,默认是0,表示不做停留直接发送。

private KafkaProducer<Integer, String> producer;

public SomeProducerBatch() {

Properties properties = new Properties();

// 指定kafka集群

properties.put("bootstrap.servers", "192.168.124.15:9092,192.168.124.15:9093,192.168.124.15:9094");

// 指定key与value的序列化器

properties.put("key.serializer", "org.apache.kafka.common.serialization.IntegerSerializer");

properties.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");

// 指定生产者每1024条消息向broker发送一次

properties.put("batch.size", 1024);

// 指定生产者每50ms向broker发送一次

properties.put("linger.ms", 50);

this.producer = new KafkaProducer<Integer, String>(properties);

}

public void sendMsg() {

for(int i=0; i<10; i++) {

String str = "beijing-"+i;

ProducerRecord<Integer, String> record = new ProducerRecord<>("cities", 0,str);

int k = i;

producer.send(record, (metadata, ex) -> {

System.out.println("i = " + k);

System.out.println("topic = " + metadata.topic());

System.out.println("partition = " + metadata.partition());

System.out.println("offset = " + metadata.offset());

});

}

}

  5、生产者拦截器

    ⽣产者拦截器既可以⽤来在消息发送前做以下准备⼯作,⽐如按照某个规则过滤不符合要求的消息、修改消息的内容等,也可⽤⽤来在发送回调逻辑前做⼀些定制化的需求。

    (1)创建一个拦截器,拦截生产者的发送、关闭等场景,例如在发送时修改发送的topic等信息

public class ProducerInterceptorPrefix implements ProducerInterceptor<String,String> {

private volatile long sendSuccess = 0;

private volatile long sendFailure = 0;

/**

* KafkaProducer在将消息序列化和计算分区之前会调用生产者拦截器的onSend()方法来对消息进行

* 相应的定制化操作。一般来说最好不要修改ProducerRecord的topic、key和partition等消息。

* 如果要修改,需确保对其有准确性的判断,否则会预想的效果出现偏差。

* @param record

* @return

*/

@Override

public ProducerRecord<String, String> onSend(ProducerRecord<String, String> record) {

String modefiedValue = "prefix1-" + record.value();

return new ProducerRecord<>(record.topic(),

record.partition(),record.timestamp(),record.key(),

modefiedValue,record.headers());

}

/**

* Kafka会在消息被应答(Acknowledgement)之前或消息发送失败时调用生产者onAcknowledgement方法,

* 优先于用户设定的Callback之前执行。这个方法运行在Producer的I/O线程中,所以这个方法中实现的代码逻辑越简单越好

* 否则会影响消息的发送速度。

* @param metadata

* @param exception

*/

@Override

public void onAcknowledgement(RecordMetadata metadata, Exception exception) {

if(exception==null){

sendSuccess++;

}else {

sendFailure++;

}

}

/**

* close()方法主要用于关闭拦截器时执行一些资源的清理工作。

* 在这3个方法中抛出的异常都会被记录到日志中,但不会再向上传递

*/

@Override

public void close() {

double successRatio = (double) sendSuccess /(sendFailure + sendSuccess);

System.out.println("[INFO] 发送成功率="+String.format("%f",successRatio * 100)+"%");

}

@Override

public void configure(Map<String, ?> configs) {

}

}

    (2)设置消息发送的拦截器,并发送消息(红色注释的代码)

private KafkaProducer<Integer, String> producer;

public FiveProducer() {

Properties properties = new Properties();

// 指定kafka集群

properties.put("bootstrap.servers", "192.168.124.15:9092,192.168.124.15:9093,192.168.124.15:9094");

// 指定key与value的序列化器

properties.put("key.serializer", "org.apache.kafka.common.serialization.IntegerSerializer");

properties.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");

properties.put("acks", "all");// 记录完整提交,最慢的但是最大可能的持久化

// properties.put("retries", 3);// 请求失败重试的次数

properties.put(ProducerConfig.RETRIES_CONFIG, 3);

// properties.put("retry.backoff.ms", 100);// 两次重试之间的时间间隔 默认为100ms

properties.put("batch.size", 16384);// batch的大小 producer将试图批处理消息记录,以减少请求次数。这将改善client与server之间的性能。默认是16384Bytes,即16kB,也就是一个batch满了16kB就发送出去

//如果batch太小,会导致频繁网络请求,吞吐量下降;如果batch太大,会导致一条消息需要等待很久才能被发送出去,而且会让内存缓冲区有很大压力,过多数据缓冲在内存里。

properties.put("linger.ms", 1);// 默认情况即使缓冲区有剩余的空间,也会立即发送请求,设置一段时间用来等待从而将缓冲区填的更多,单位为毫秒,producer发送数据会延迟1ms,可以减少发送到kafka服务器的请求数据

properties.put("buffer.memory", 33554432);// 提供给生产者缓冲内存总量,设置发送消息的缓冲区,默认值是33554432,就是32MB.

// 如果发送消息出去的速度小于写入消息进去的速度,就会导致缓冲区写满,此时生产消息就会阻塞住,所以说这里就应该多做一些压测,尽可能保证说这块缓冲区不会被写满导致生产行为被阻塞住

properties.put(ProducerConfig.INTERCEPTOR_CLASSES_CONFIG, ProducerInterceptorPrefix.class.getName());

this.producer = new KafkaProducer<Integer, String>(properties);

}

public void sendMsg() throws ExecutionException, InterruptedException {

ProducerRecord<Integer, String> record = new ProducerRecord<>("cities", "shanghai");

producer.send(record);

} 

(二)消费者

  消费者进行消费也可以分为自动提交()、同步手动提交、异步手动提交、同异步手动提交三种模式,除了这三种模式外,消费者也可以使用拦截器进行消费拦截。

  1、自动提交

    ⾃动提交 可能会出现消息重复消费的情况,使用时只需要设置enable.auto.commit为true即可。

private KafkaConsumer<Integer, String> consumer;

public SomeConsumer() {

// 两个参数:

// 1)指定当前消费者名称

// 2)指定消费过程是否会被中断

Properties properties = new Properties();

properties.put(ConsumerConfig.GROUP_ID_CONFIG,"KafkaConsumerTest");

String brokers = "192.168.124.15:9092,192.168.124.15:9093,192.168.124.15:9094";

// 指定kafka集群

properties.put("bootstrap.servers", brokers);

// 指定消费者组ID

properties.put("group.id", "cityGroup1");

// 开启自动提交,默认为true

properties.put("enable.auto.commit", "true");

// 设置一次poll()从broker读取多少条消息

properties.put("max.poll.records", "500");

// 指定自动提交的超时时限,默认5s

properties.put("auto.commit.interval.ms", "1000");

// 指定消费者被broker认定为挂掉的时限。若broker在此时间内未收到当前消费者发送的心跳,则broker

// 认为消费者已经挂掉。默认为10s

properties.put("session.timeout.ms", "30000");

// 指定两次心跳的时间间隔,默认为3s,一般不要超过session.timeout.ms的 1/3

properties.put("heartbeat.interval.ms", "10000");

// 当kafka中没有指定offset初值时,或指定的offset不存在时,从这里读取offset的值。其取值的意义为:

// earliest:指定offset为第一条offset

// latest: 指定offset为最后一条offset

properties.put("auto.offset.reset", "earliest");

// 指定key与value的反序列化器

properties.put("key.deserializer","org.apache.kafka.common.serialization.IntegerDeserializer");

properties.put("value.deserializer","org.apache.kafka.common.serialization.StringDeserializer");

this.consumer = new KafkaConsumer<Integer, String>(properties);

}

public void doWork() {

// 订阅消费主题

// consumer.subscribe(Collections.singletonList("cities"));

consumer.subscribe(Arrays.asList("cities", "test"));

// 从broker获取消息。参数表示,若buffer中没有消息,消费者等待消费的时间。

// 0,表示没有消息什么也不返回

// >0,表示当时间到后仍没有消息,则返回空

while (true) {

ConsumerRecords<Integer, String> records = consumer.poll(Duration.ofMillis(100));

for(ConsumerRecord record : records) {

System.out.println("topic = " + record.topic());

System.out.println("partition = " + record.partition());

System.out.println("key = " + record.key());

System.out.println("value = " + record.value());

}

}

}

  2、同步手动提交

    同步提交⽅式是,消费者向 broker 提交 offset 后等待 broker 成功响应。若没有收到响应,则会重新提交,直到获取到响应。⽽在这个等待过程中,消费者是阻塞的。其严重影响了消费者的吞吐量。

    构造器与前面的自动提交一样,只需要设置enable.auto.commit为false即可(不设置也可以,默认为false)。然后在消费消息时,手动提交。 手动提交的代码样例如下所示:

public void doWork() {

// 订阅消费主题

consumer.subscribe(Collections.singletonList("cities"));

// 从broker摘取消费。参数表示,若buffer中没有消费,消费者等待消费的时间。

// 0,表示没有消息什么也不返回

// >0,表示当时间到后仍没有消息,则返回空

while(true){

ConsumerRecords<Integer, String> records = consumer.poll(Duration.ofMillis(1000));

for(ConsumerRecord record : records) {

System.out.println("topic2 = " + record.topic());

System.out.println("partition2 = " + record.partition());

System.out.println("key2 = " + record.key());

System.out.println("value2 = " + record.value());

// 如果业务执行成功,手动同步提交。反之,不执行提交,达到失败的场景,可以进行重试的效果

consumer.commitSync();

}

}

}

  3、异步手动提交

    ⼿动同步提交⽅式需要等待 broker 的成功响应,效率太低,影响消费者的吞吐量。异步提交⽅式是,消费者向 broker 提交 offset 后不⽤等待成功响应,所以其增加了消费者的吞吐量。

    构造器与同步手动提交一致,只是消费时不一样,消费代码如下所示:

   public void doWork() {

while(true){

// 订阅消费主题

consumer.subscribe(Collections.singletonList("cities"));

// 从broker摘取消费。参数表示,若buffer中没有消费,消费者等待消费的时间。

// 0,表示没有消息什么也不返回

// >0,表示当时间到后仍没有消息,则返回空

ConsumerRecords<Integer, String> records = consumer.poll(Duration.ofMillis(1000));

for(ConsumerRecord record : records) {

System.out.println("topic = " + record.topic());

System.out.println("partition = " + record.partition());

System.out.println("key = " + record.key());

System.out.println("value = " + record.value());

}

consumer.commitAsync();

// 手动异步提交

// consumer.commitAsync();

consumer.commitAsync((offsets, ex) -> {

if(ex != null) {

System.out.print("提交失败,offsets = " + offsets);

System.out.println(", exception = " + ex);

}

});

}

}

  4、同异步手动提交

    同异步提交,即同步提交与异步提交组合使⽤。⼀般情况下,若偶尔出现提交失败,其也不会影响消费者的消费。因为后续提交最终会将这次提交失败的 offset 给提交了。但异步提交会产⽣重复消费,为了防⽌重复消费,可以将同步提交与异常提交联合使⽤。

    public void doWork() {

// 订阅消费主题

consumer.subscribe(Collections.singletonList("cities"));

// 从broker摘取消费。参数表示,若buffer中没有消费,消费者等待消费的时间。

// 0,表示没有消息什么也不返回

// >0,表示当时间到后仍没有消息,则返回空

while(true){

ConsumerRecords<Integer, String> records = consumer.poll(Duration.ofMillis(1000));

for(ConsumerRecord record : records) {

System.out.println("topic = " + record.topic());

System.out.println("partition = " + record.partition());

System.out.println("key = " + record.key());

System.out.println("value = " + record.value());

}

// 手动异步提交

consumer.commitAsync((offsets, ex) -> {

if(ex != null) {

System.out.print("提交失败,offsets = " + offsets);

System.out.println(", exception = " + ex);

// 同步提交

consumer.commitSync();

}

});

}

}

  5、消费者拦截器

    消费者拦截器需要⾃定义实现org.apache.kafka.clients.consumer.ConsumerInterceptor接⼝。此接⼝包含3个⽅法:

      KafkaConsumer会在poll()⽅法返回之前调⽤拦截器的onConsumer()⽅法来对消息进⾏相应的定制化操作,⽐如修改返回的消息内容、按照某种规则过滤消息(可能会减少poll()⽅法返回的消息个数)。如果onConsumer()⽅法中抛出异常,那么被捕获并记录到⽇志中,但是异常不会再向上传递。

      KafkaConsumer会在提交完消费位移之后调⽤拦截器的onCommit()⽅法,可以使⽤这个⽅法来记录跟踪所提交的位移信息,⽐如消费者使⽤commitSync的⽆参数⽅法是,我们不知道提交的消费位移的具体细节,⽽使⽤拦截器的conCommit()⽅法可以做到这⼀点。

    (1)创建拦截器

public class ConsumerInterceptorTTL implements ConsumerInterceptor<String,String> {

private static final long EXPIRE_INTERVAL = 10 * 1000;

@Override

public ConsumerRecords<String, String> onConsume(ConsumerRecords<String, String> records) {

long now = System.currentTimeMillis();

Map<TopicPartition, List<ConsumerRecord<String,String>>> newRecords =

new HashMap<>();

for(TopicPartition tp:records.partitions()){

List<ConsumerRecord<String,String>> tpRecords = records.records(tp);

List<ConsumerRecord<String,String>> newTpRecords = new ArrayList<>();

for (ConsumerRecord<String,String> record:tpRecords){

if(now - record.timestamp()<EXPIRE_INTERVAL){

newTpRecords.add(record);

}

}

if(!newTpRecords.isEmpty()){

newRecords.put(tp,newTpRecords);

}

}

return new ConsumerRecords<>(newRecords);

}

@Override

public void onCommit(Map<TopicPartition, OffsetAndMetadata> offsets) {

/*

offsets.forEach((tp,offset)->

System.out.println(tp+":"+offset.offset()));

*/

}

@Override

public void close() {}

@Override

public void configure(Map<String, ?> configs) {}

}

    (2)使用拦截器(红色标注的代码)

   private KafkaConsumer<Integer, String> consumer;

public SomeConsumer() {

// 两个参数:

// 1)指定当前消费者名称

// 2)指定消费过程是否会被中断

Properties properties = new Properties();

properties.put(ConsumerConfig.GROUP_ID_CONFIG,"KafkaConsumerTest");

String brokers = "192.168.124.15:9092,192.168.124.15:9093,192.168.124.15:9094";

// 指定kafka集群

properties.put("bootstrap.servers", brokers);

// 指定消费者组ID

properties.put("group.id", "cityGroup1");

// 开启自动提交,默认为true

properties.put("enable.auto.commit", "true");

// 设置一次poll()从broker读取多少条消息

properties.put("max.poll.records", "500");

// 指定自动提交的超时时限,默认5s

properties.put("auto.commit.interval.ms", "1000");

// 指定消费者被broker认定为挂掉的时限。若broker在此时间内未收到当前消费者发送的心跳,则broker

// 认为消费者已经挂掉。默认为10s

properties.put("session.timeout.ms", "30000");

// 指定两次心跳的时间间隔,默认为3s,一般不要超过session.timeout.ms的 1/3

properties.put("heartbeat.interval.ms", "10000");

// 当kafka中没有指定offset初值时,或指定的offset不存在时,从这里读取offset的值。其取值的意义为:

// earliest:指定offset为第一条offset

// latest: 指定offset为最后一条offset

properties.put("auto.offset.reset", "earliest");

// 指定key与value的反序列化器

properties.put("key.deserializer", "org.apache.kafka.common.serialization.IntegerDeserializer");

properties.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");

properties.put(ConsumerConfig.INTERCEPTOR_CLASSES_CONFIG, ConsumerInterceptorTTL.class.getName());

this.consumer = new KafkaConsumer<Integer, String>(properties);

}

public void doWork() {

// 订阅消费主题

// consumer.subscribe(Collections.singletonList("cities"));

List<String> topics = new ArrayList<>();

topics.add("cities");

topics.add("test");

consumer.subscribe(topics);

/**

* 一共发了3条消息:first-expire-data、normal-data、last-expire-data。

* 第一条、第三条被修改成超时了,那么此时消费者通过poll()方法只能拉取normal-data这一条消息,另外两条被过滤了

*/

while (true){

ConsumerRecords<Integer, String> records = consumer.poll(Duration.ofMillis(1000));

for(ConsumerRecord record : records) {

System.out.println("topic = " + record.topic());

System.out.println("partition = " + record.partition());

System.out.println("key = " + record.key());

System.out.println("value = " + record.value());

}

}

}

(三)多线程消费

    private final static String TOPIC_NAME = "cities";

public static void main(String[] args) throws InterruptedException {

String brokers = "192.168.124.15:9092,192.168.124.15:9093,192.168.124.15:9094";

String groupId = "test";

int workerNum = 5;

CunsumerExecutor consumers = new CunsumerExecutor(brokers, groupId, TOPIC_NAME);

consumers.execute(workerNum);

Thread.sleep(1000000);

consumers.shutdown();

}

// Consumer处理

public static class CunsumerExecutor{

private final KafkaConsumer<String, String> consumer;

private ExecutorService executors;

public CunsumerExecutor(String brokerList, String groupId, String topic) {

Properties props = new Properties();

props.put("bootstrap.servers", brokerList);

props.put("group.id", groupId);

props.put("enable.auto.commit", "true");

props.put("auto.commit.interval.ms", "1000");

props.put("session.timeout.ms", "30000");

props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");

props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");

consumer = new KafkaConsumer<>(props);

consumer.subscribe(Arrays.asList(topic));

}

public void execute(int workerNum) {

executors = new ThreadPoolExecutor(workerNum, workerNum, 0L, TimeUnit.MILLISECONDS,

new ArrayBlockingQueue<>(1000), new ThreadPoolExecutor.CallerRunsPolicy());

while (true) {

ConsumerRecords<String, String> records = consumer.poll(200);

for (final ConsumerRecord record : records) {

executors.submit(new ConsumerRecordWorker(record));

}

}

}

public void shutdown() {

if (consumer != null) {

consumer.close();

}

if (executors != null) {

executors.shutdown();

}

try {

if (!executors.awaitTermination(10, TimeUnit.SECONDS)) {

System.out.println("Timeout.... Ignore for this case");

}

} catch (InterruptedException ignored) {

System.out.println("Other thread interrupted this shutdown, ignore for this case.");

Thread.currentThread().interrupt();

}

}

}

// 记录处理

public static class ConsumerRecordWorker implements Runnable {

private ConsumerRecord<String, String> record;

public ConsumerRecordWorker(ConsumerRecord record) {

this.record = record;

}

@Override

public void run() {

// 假如说数据入库操作

System.out.println("Thread - "+ Thread.currentThread().getName());

System.err.printf("patition = %d , offset = %d, key = %s, value = %s%n",

record.partition(), record.offset(), record.key(), record.value());

}

}

二、Spring Boot Kafka

  使用SpringBoot与原生的API主要调整三点,分别是:producer和consumer等配置项直接放入配置文件、发送消息使用KafkaTemplate、消费者使用KafkaListener注解即可。

  1、配置项

kafka:

topic: cities

spring:

kafka:

bootstrap-servers: 192.168.206.131:9092,192.168.206.132:9092,192.168.206.133:9092

producer:

key-serializer: org.apache.kafka.common.serialization.IntegerSerializer

value-serializer: org.apache.kafka.common.serialization.StringSerializer

batch-size: 16384

consumer:

group-id: mygroup1

enable-auto-commit: false

key-deserializer: org.apache.kafka.common.serialization.IntegerDeserializer

value-deserializer: org.apache.kafka.common.serialization.StringDeserializer

max-poll-records: 10

  2、发送消息

@RestController

@RequestMapping("/boot")

@Slf4j

public class BootProducerApi {

@Autowired

private KafkaTemplate kafkaTemplate;

@Value("${kafka.topic}")

private String topic;

@GetMapping("/send")

public void sendMsg() throws Exception {

String cityName = "LY";

for(int i=0; i<50; i++) {

ProducerRecord<Integer,String> record = new ProducerRecord<>(topic,cityName + i*1000);

ListenableFuture<SendResult> future = kafkaTemplate.send(record);

RecordMetadata recordMetadata = future.get().getRecordMetadata();

log.info("producer=======【{}】=======【{}】", i+1, recordMetadata.offset());

log.info("producer=======【{}】=======【{}】", i+1, recordMetadata.partition());

log.info("producer=======【{}】=======【{}】", i+1, recordMetadata.timestamp());

log.info("producer=======【{}】=======【{}】", i+1, recordMetadata.topic());

}

}

}

  3、消费者

@Component

@Slf4j

public class BootConsumer {

@KafkaListener(topics = "${kafka.topic}")

public void onMsg(String msg){

log.info("consumer============msg=【{}】",msg);

}

}

以上是 Kafka--JAVA代码样例 的全部内容, 来源链接: utcz.com/z/392470.html

回到顶部