java实现Kafka生产者示例

使用java实现Kafka的生产者
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
packagecom.lisg.kafkatest;
importjava.util.Properties;
importkafka.javaapi.producer.Producer;
importkafka.producer.KeyedMessage;
importkafka.producer.Partitioner;
importkafka.producer.ProducerConfig;
importkafka.serializer.StringEncoder;
/**
* Kafka生产者
* @author lisg
*
*/
publicclassKafkaProducer {
publicstaticvoidmain(String[] args) {
Properties props = newProperties();
//根据这个配置获取metadata,不必是kafka集群上的所有broker,但最好至少有两个
props.put("metadata.broker.list", "vm1:9092,vm2:9092");
//消息传递到broker时的序列化方式
props.put("serializer.class", StringEncoder.class.getName());
//zk集群
props.put("zookeeper.connect", "vm1:2181");
//是否获取反馈
//0是不获取反馈(消息有可能传输失败)
//1是获取消息传递给leader后反馈(其他副本有可能接受消息失败)
//-1是所有in-sync replicas接受到消息时的反馈
props.put("request.required.acks", "1");
// props.put("partitioner.class", MyPartition.class.getName());
//创建Kafka的生产者, key是消息的key的类型, value是消息的类型
Producer<Integer, String> producer = newProducer<Integer, String>(
newProducerConfig(props));
intcount = 0;
while(true) {
String message = "message-"+ ++count;
//消息主题是test
KeyedMessage<Integer, String> keyedMessage = newKeyedMessage<Integer, String>("test", message);
//message可以带key, 根据key来将消息分配到指定区, 如果没有key则随机分配到某个区
// KeyedMessage<Integer, String> keyedMessage = new KeyedMessage<Integer, String>("test", 1, message);
producer.send(keyedMessage);
System.out.println("send: "+ message);
try{
Thread.sleep(1000);
} catch(InterruptedException e) {
e.printStackTrace();
}
}
// producer.close();
}
}
/**
* 自定义分区类
*
*/
classMyPartition implementsPartitioner {
publicintpartition(Object key, intnumPartitions) {
returnkey.hashCode()%numPartitions;
}
}
来自为知笔记(Wiz)
附件列表
以上是 java实现Kafka生产者示例 的全部内容, 来源链接: utcz.com/z/391784.html

