java实现Kafka生产者示例
使用java实现Kafka的生产者
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
package
com.lisg.kafkatest;
import
java.util.Properties;
import
kafka.javaapi.producer.Producer;
import
kafka.producer.KeyedMessage;
import
kafka.producer.Partitioner;
import
kafka.producer.ProducerConfig;
import
kafka.serializer.StringEncoder;
/**
* Kafka生产者
* @author lisg
*
*/
public
class
KafkaProducer {
public
static
void
main(String[] args) {
Properties props =
new
Properties();
//根据这个配置获取metadata,不必是kafka集群上的所有broker,但最好至少有两个
props.put(
"metadata.broker.list"
,
"vm1:9092,vm2:9092"
);
//消息传递到broker时的序列化方式
props.put(
"serializer.class"
, StringEncoder.
class
.getName());
//zk集群
props.put(
"zookeeper.connect"
,
"vm1:2181"
);
//是否获取反馈
//0是不获取反馈(消息有可能传输失败)
//1是获取消息传递给leader后反馈(其他副本有可能接受消息失败)
//-1是所有in-sync replicas接受到消息时的反馈
props.put(
"request.required.acks"
,
"1"
);
// props.put("partitioner.class", MyPartition.class.getName());
//创建Kafka的生产者, key是消息的key的类型, value是消息的类型
Producer<Integer, String> producer =
new
Producer<Integer, String>(
new
ProducerConfig(props));
int
count =
0
;
while
(
true
) {
String message =
"message-"
+ ++count;
//消息主题是test
KeyedMessage<Integer, String> keyedMessage =
new
KeyedMessage<Integer, String>(
"test"
, message);
//message可以带key, 根据key来将消息分配到指定区, 如果没有key则随机分配到某个区
// KeyedMessage<Integer, String> keyedMessage = new KeyedMessage<Integer, String>("test", 1, message);
producer.send(keyedMessage);
System.out.println(
"send: "
+ message);
try
{
Thread.sleep(
1000
);
}
catch
(InterruptedException e) {
e.printStackTrace();
}
}
// producer.close();
}
}
/**
* 自定义分区类
*
*/
class
MyPartition
implements
Partitioner {
public
int
partition(Object key,
int
numPartitions) {
return
key.hashCode()%numPartitions;
}
}
来自为知笔记(Wiz)
附件列表
以上是 java实现Kafka生产者示例 的全部内容, 来源链接: utcz.com/z/391784.html