javakafka生产者消费者demo
一、修改kafka server.porperties的ip是你本机的ip
listeners=PLAINTEXT://192.168.111.130:9092
二、生产者的例子
import org.apache.kafka.clients.producer.*;import java.util.Properties;publicclass KafkaProducerDemo {privatefinal Producer<String, String> kafkaProdcer;publicfinalstatic String TOPIC = "JAVA_TOPIC";private KafkaProducerDemo() { kafkaProdcer
= createKafkaProducer(); }
private Producer<String, String> createKafkaProducer() { Properties props
= new Properties(); props.put(
"bootstrap.servers", "192.168.111.130:9092"); props.put(
"acks", "all"); props.put(
"retries", 0); props.put(
"batch.size", 16384); props.put(
"linger.ms", 1); props.put(
"buffer.memory", 33554432); props.put(
"key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); props.put(
"value.serializer", "org.apache.kafka.common.serialization.StringSerializer"); Producer
<String, String> kafkaProducer = new KafkaProducer<String, String>(props);return kafkaProducer; }
void produce() {for (int i = 0; i < 10; i++) {try { Thread.sleep(
1000); }
catch (InterruptedException e) { e.printStackTrace();
}
final String key = "key" + i; String data
= "hello kafka message:" + key; kafkaProdcer.send(
new ProducerRecord<String, String>(TOPIC, key, data), new Callback() {publicvoid onCompletion(RecordMetadata recordMetadata, Exception e) { System.out.println(
"发送key" + key + "成功"); }
});
}
}
publicstaticvoid main(String[] args) { KafkaProducerDemo kafkaProducerDemo
= new KafkaProducerDemo(); kafkaProducerDemo.produce();
}
}
用properties构造一个Producer的实例,然后调用send方法,传入数据,还有一个回调函数。
可以看到数据已经进来了。
三、消费者例子
import org.apache.kafka.clients.consumer.ConsumerRecord;import org.apache.kafka.clients.consumer.ConsumerRecords;import org.apache.kafka.clients.consumer.KafkaConsumer;import java.util.Arrays;import java.util.Properties;publicclass KafkaConsumerDemo {privatefinal KafkaConsumer<String, String> consumer;private KafkaConsumerDemo(){ Properties props
= new Properties(); props.put(
"bootstrap.servers", "192.168.111.130:9092"); props.put(
"group.id", "test"); props.put(
"enable.auto.commit", "false"); props.put(
"auto.commit.interval.ms", "1000"); props.put(
"key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); props.put(
"value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); consumer
= new KafkaConsumer<String, String>(props); }
void consume(){ consumer.subscribe(Arrays.asList(KafkaProducerDemo.TOPIC));
while (true) { ConsumerRecords
<String, String> records = consumer.poll(100);for (ConsumerRecord<String, String> record : records){ System.out.println(
"I'm coming"); System.out.printf(
"offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value()); }
}
}
publicstaticvoid main(String[] args) { KafkaConsumerDemo kafkaConsumerDemo
= new KafkaConsumerDemo(); kafkaConsumerDemo.consume();
}
}
正常启动是看不到东西的, 两个同时启动才有。消费者只看接下来有哪些生产者发来新的消息。
props.put("enable.auto.commit", "true");
这个的意思是,消费后自动改变偏移量。如果不添加这个,就会在服务器存的offset开始消费,并且不会改变offset的值。
如果为false, 可以看到不管消费几次,服务端存储的始终是offset的值为2。
如果想让consumer从头开始消费,可以设置:
props.put("auto.offset.reset", "earliest");
这个只对新建的组有效,如果一个组已经消费过,offset的值已经存在服务端了,这样设置不起作用的,只会从服务端存储的offset开始消费。不设置默认是latest,就是从最新的开始消费。
如果突然断电会不会有数据丢失?可以参考
https://stackoverflow.com/questions/39620911/can-we-lose-kafka-message-in-case-of-poweroff
以上是 javakafka生产者消费者demo 的全部内容, 来源链接: utcz.com/z/510043.html