javakafka生产者消费者demo

coding

一、修改kafka   server.porperties的ip是你本机的ip

listeners=PLAINTEXT://192.168.111.130:9092

二、生产者的例子

import org.apache.kafka.clients.producer.*;

import java.util.Properties;

publicclass KafkaProducerDemo {

privatefinal Producer<String, String> kafkaProdcer;

publicfinalstatic String TOPIC = "JAVA_TOPIC";

private KafkaProducerDemo() {

kafkaProdcer = createKafkaProducer();

}

private Producer<String, String> createKafkaProducer() {

Properties props = new Properties();

props.put("bootstrap.servers", "192.168.111.130:9092");

props.put("acks", "all");

props.put("retries", 0);

props.put("batch.size", 16384);

props.put("linger.ms", 1);

props.put("buffer.memory", 33554432);

props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");

props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");

Producer<String, String> kafkaProducer = new KafkaProducer<String, String>(props);

return kafkaProducer;

}

void produce() {

for (int i = 0; i < 10; i++) {

try {

Thread.sleep(1000);

} catch (InterruptedException e) {

e.printStackTrace();

}

final String key = "key" + i;

String data = "hello kafka message:" + key;

kafkaProdcer.send(new ProducerRecord<String, String>(TOPIC, key, data), new Callback() {

publicvoid onCompletion(RecordMetadata recordMetadata, Exception e) {

System.out.println("发送key" + key + "成功");

}

});

}

}

publicstaticvoid main(String[] args) {

KafkaProducerDemo kafkaProducerDemo = new KafkaProducerDemo();

kafkaProducerDemo.produce();

}

}

用properties构造一个Producer的实例,然后调用send方法,传入数据,还有一个回调函数。

可以看到数据已经进来了。

三、消费者例子

import org.apache.kafka.clients.consumer.ConsumerRecord;

import org.apache.kafka.clients.consumer.ConsumerRecords;

import org.apache.kafka.clients.consumer.KafkaConsumer;

import java.util.Arrays;

import java.util.Properties;

publicclass KafkaConsumerDemo {

privatefinal KafkaConsumer<String, String> consumer;

private KafkaConsumerDemo(){

Properties props = new Properties();

props.put("bootstrap.servers", "192.168.111.130:9092");

props.put("group.id", "test");

props.put("enable.auto.commit", "false");

props.put("auto.commit.interval.ms", "1000");

props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");

props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");

consumer = new KafkaConsumer<String, String>(props);

}

void consume(){

consumer.subscribe(Arrays.asList(KafkaProducerDemo.TOPIC));

while (true) {

ConsumerRecords<String, String> records = consumer.poll(100);

for (ConsumerRecord<String, String> record : records){

System.out.println("I'm coming");

System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());

}

}

}

publicstaticvoid main(String[] args) {

KafkaConsumerDemo kafkaConsumerDemo = new KafkaConsumerDemo();

kafkaConsumerDemo.consume();

}

}

正常启动是看不到东西的, 两个同时启动才有。消费者只看接下来有哪些生产者发来新的消息。

props.put("enable.auto.commit", "true");

这个的意思是,消费后自动改变偏移量。如果不添加这个,就会在服务器存的offset开始消费,并且不会改变offset的值。

如果为false, 可以看到不管消费几次,服务端存储的始终是offset的值为2。

如果想让consumer从头开始消费,可以设置:

props.put("auto.offset.reset", "earliest");

这个只对新建的组有效,如果一个组已经消费过,offset的值已经存在服务端了,这样设置不起作用的,只会从服务端存储的offset开始消费。不设置默认是latest,就是从最新的开始消费。

如果突然断电会不会有数据丢失?可以参考

https://stackoverflow.com/questions/39620911/can-we-lose-kafka-message-in-case-of-poweroff

以上是 javakafka生产者消费者demo 的全部内容, 来源链接: utcz.com/z/510043.html

回到顶部