java实现Kafka生产者示例

java

使用java实现Kafka的生产者

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

29

30

31

32

33

34

35

36

37

38

39

40

41

42

43

44

45

46

47

48

49

50

51

52

53

54

55

56

57

58

59

60

61

62

63

64

65

66

67

68

69

packagecom.lisg.kafkatest;

 

importjava.util.Properties;

 

importkafka.javaapi.producer.Producer;

importkafka.producer.KeyedMessage;

importkafka.producer.Partitioner;

importkafka.producer.ProducerConfig;

importkafka.serializer.StringEncoder;

 

/**

 * Kafka生产者

 * @author lisg

 *

 */

publicclassKafkaProducer {

 

    publicstaticvoidmain(String[] args) {

         

        Properties props = newProperties();

        //根据这个配置获取metadata,不必是kafka集群上的所有broker,但最好至少有两个

        props.put("metadata.broker.list", "vm1:9092,vm2:9092");

        //消息传递到broker时的序列化方式

        props.put("serializer.class", StringEncoder.class.getName());

        //zk集群

        props.put("zookeeper.connect", "vm1:2181");

        //是否获取反馈

        //0是不获取反馈(消息有可能传输失败)

        //1是获取消息传递给leader后反馈(其他副本有可能接受消息失败)

        //-1是所有in-sync replicas接受到消息时的反馈

        props.put("request.required.acks", "1");

//      props.put("partitioner.class", MyPartition.class.getName());

         

        //创建Kafka的生产者, key是消息的key的类型, value是消息的类型

        Producer<Integer, String> producer = newProducer<Integer, String>(

                newProducerConfig(props));

         

        intcount = 0;

        while(true) {

            String message = "message-"+ ++count;

            //消息主题是test

            KeyedMessage<Integer, String> keyedMessage = newKeyedMessage<Integer, String>("test", message);

            //message可以带key, 根据key来将消息分配到指定区, 如果没有key则随机分配到某个区

//          KeyedMessage<Integer, String> keyedMessage = new KeyedMessage<Integer, String>("test", 1, message);

            producer.send(keyedMessage);

            System.out.println("send: "+ message);

            try{

                Thread.sleep(1000);

            } catch(InterruptedException e) {

                e.printStackTrace();

            }

        }

         

//      producer.close();

    }

 

}

 

/**

 * 自定义分区类

 *

 */

classMyPartition implementsPartitioner {

 

    publicintpartition(Object key, intnumPartitions) {

        returnkey.hashCode()%numPartitions;

    }

     

}



来自为知笔记(Wiz)



附件列表

    以上是 java实现Kafka生产者示例 的全部内容, 来源链接: utcz.com/z/391784.html

    回到顶部