kafka java客户端编程

java


kafka_2.10-0.8.1.1


maven

<dependencies>
  <dependency>
    <groupId>org.apache.kafka</groupId>
    <artifactId>kafka_2.10</artifactId>
    <version>0.8.1.1</version>
    <exclusions>
      <exclusion>
        <artifactId>jmxtools</artifactId>
        <groupId>com.sun.jdmk</groupId>
      </exclusion>
      <exclusion>
        <artifactId>jmxri</artifactId>
        <groupId>com.sun.jmx</groupId>
      </exclusion>
      <exclusion>
        <artifactId>jms</artifactId>
        <groupId>javax.jms</groupId>
      </exclusion>
    </exclusions>
  </dependency>
  <dependency>
    <groupId>log4j</groupId>
    <artifactId>log4j</artifactId>
    <version>1.2.15</version>
    <exclusions>
      <exclusion>
        <artifactId>jmxtools</artifactId>
        <groupId>com.sun.jdmk</groupId>
      </exclusion>
      <exclusion>
        <artifactId>jmxri</artifactId>
        <groupId>com.sun.jmx</groupId>
      </exclusion>
      <exclusion>
        <artifactId>jms</artifactId>
        <groupId>javax.jms</groupId>
      </exclusion>
      <exclusion>
        <artifactId>mail</artifactId>
        <groupId>javax.mail</groupId>
      </exclusion>
     </exclusions>
  </dependency>
  <dependency>
    <groupId>junit</groupId>
    <artifactId>junit</artifactId>
    <version>4.11</version>
    <scope>test</scope>
  </dependency>
</dependencies>


producer

 1 package org.admln.kafka.test;

2

3 import java.util.Properties;

4

5 import kafka.javaapi.producer.Producer;

6 import kafka.producer.KeyedMessage;

7 import kafka.producer.ProducerConfig;

8

9 public class Producertest {

10

11 public static void main(String[] args) {

12 Properties props = new Properties();

13 //props.put("zk.connect", "192.168.1.110:2181");

14 // serializer.class为消息的序列化类

15 props.put("serializer.class", "kafka.serializer.StringEncoder");

16 // 配置metadata.broker.list, 为了高可用, 最好配两个broker实例

17 props.put("metadata.broker.list", "192.168.1.113:9092");

18 // 设置Partition类, 对队列进行合理的划分

19 //props.put("partitioner.class", "idoall.testkafka.Partitionertest");

20 // ACK机制, 消息发送需要kafka服务端确认

21 props.put("request.required.acks", "1");

22

23 props.put("num.partitions", "2");

24 ProducerConfig config = new ProducerConfig(props);

25 Producer<String, String> producer = new Producer<String, String>(config);

26 for (int i = 0; i < 10; i++)

27 {

28 String msg = "hello" + i;

29 producer.send(new KeyedMessage<String, String>("test",msg));

30 System.out.println("i:"+i+" msg:"+msg);

31 }

32 }

33 }


 consumer

 运行consumer一直接收不到消息,还没找到原因


以上是 kafka java客户端编程 的全部内容, 来源链接: utcz.com/z/392972.html

回到顶部