Kafka生产消费APIJAVA实现

coding

Maven依赖:

<dependency>

<groupId>org.apache.kafka</groupId>

<artifactId>kafka-clients</artifactId>

<version>0.10.0.0</version>

</dependency>

<dependency>

<groupId>org.apache.kafka</groupId>

<artifactId>kafka_2.11</artifactId>

<version>0.10.0.0</version>

</dependency>

Kafka生产者简单接口JAVA实现:

import org.apache.kafka.clients.producer.*;

import java.util.Properties;

public class KafkaProducer {

public static void main(String[] args) throws Exception{

String topic = "";

String brokerList = "";

String message = "";

Properties props = new Properties();

props.put("bootstrap.servers", brokerList);

props.put("acks", "1");

props.put("retries", 0);

props.put("batch.size", 16384);

props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");

props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");

Producer<String, String> producer = new org.apache.kafka.clients.producer.KafkaProducer<String, String>(props);

ProducerRecord<String, String> record = new ProducerRecord<String, String>(topic,message);

producer.send(record, new Callback() {

public void onCompletion(RecordMetadata metadata, Exception e) {

if (e != null) {

e.printStackTrace();

}

}

});

}

}

Kafka消费者简单接口JAVA实现

import kafka.consumer.Consumer;

import kafka.consumer.ConsumerConfig;

import kafka.consumer.ConsumerIterator;

import kafka.consumer.KafkaStream;

import kafka.javaapi.consumer.ConsumerConnector;

import java.util.HashMap;

import java.util.List;

import java.util.Map;

import java.util.Properties;

public class KafkaConsumer {

public static void main(String[] args) {

String topic = "";

String zkConnect = "";

Properties prop = new Properties();

prop.put("zookeeper.connect", zkConnect);

prop.put("group.id", "group003");

prop.put("auto.offset.reset", "largest");

prop.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");

prop.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");

ConsumerConnector consumer = Consumer.createJavaConsumerConnector(new ConsumerConfig(prop));

Map<String, Integer> topicCountMap = new HashMap<String, Integer>();

topicCountMap.put(topic, 1);

Map<String, List<KafkaStream<byte[], byte[]>>> messageStreams = consumer.createMessageStreams(topicCountMap);

final KafkaStream<byte[], byte[]> kafkaStream = messageStreams.get(topic).get(0);

ConsumerIterator<byte[], byte[]> iterator = kafkaStream.iterator();

while (iterator.hasNext()) {

String msg = new String(iterator.next().message());

System.out.println("--------"+msg);

}

}

}

Kafka新消费者接口JAVA实现

import org.apache.kafka.clients.consumer.*;

import java.util.Arrays;

import java.util.Properties;

public class KafkaNewConsumer {

public static void main(String[] args) {

String topic = "";

String brokerList = "";

String group="";

Properties props = new Properties();

props.put("bootstrap.servers", brokerList);

props.put("group.id", group);

props.put("auto.offset.reset", "earliest");

props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");

props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");

org.apache.kafka.clients.consumer.KafkaConsumer<String, String> consumer = new org.apache.kafka.clients.consumer.KafkaConsumer<String, String>(props);

consumer.subscribe(Arrays.asList(topic));

while(true) {

ConsumerRecords<String, String> records = consumer.poll(10);

for(ConsumerRecord<String, String> record : records) {

System.out.println("fetched from partition " + record.partition() + ", offset: " + record.offset() + ", message: " + record.value());

}

}

}

}

以上是 Kafka生产消费APIJAVA实现 的全部内容, 来源链接: utcz.com/z/510042.html

回到顶部