java 访问 kerberos 认证的 kafka

java

 1 <?xml version="1.0" encoding="UTF-8"?>

2 <project xmlns="http://maven.apache.org/POM/4.0.0"

3 xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"

4 xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">

5 <modelVersion>4.0.0</modelVersion>

6

7 <groupId>com.ht</groupId>

8 <artifactId>kafkatest</artifactId>

9 <version>1.0-SNAPSHOT</version>

10 <build>

11 <plugins>

12 <plugin>

13 <groupId>org.apache.maven.plugins</groupId>

14 <artifactId>maven-compiler-plugin</artifactId>

15 <configuration>

16 <source>1.7</source>

17 <target>1.7</target>

18 </configuration>

19 </plugin>

20 </plugins>

21 </build>

22

23

24 <dependencies>

25 <dependency>

26 <groupId>org.apache.kafka</groupId>

27 <artifactId>kafka-clients</artifactId>

28 <version>0.10.0.0</version>

29 </dependency>

30 </dependencies>

31 </project>

java 代码

 1 import org.apache.kafka.clients.CommonClientConfigs;

2 import org.apache.kafka.clients.consumer.ConsumerRecord;

3 import org.apache.kafka.clients.consumer.ConsumerRecords;

4 import org.apache.kafka.clients.consumer.KafkaConsumer;

5

6 import java.util.Collections;

7 import java.util.Properties;

8

9 import static org.apache.kafka.clients.consumer.ConsumerConfig.*;

10

11 /**

12 * @author sunzq

13 * @since 2017/8/29

14 */

15 public class Application {

16 public static void main(String[] args) {

17

18 Properties props = new Properties();

19 props.put(BOOTSTRAP_SERVERS_CONFIG, "node1:6667,node2:6667,node3:6667,node4:6667");

20 props.put(ENABLE_AUTO_COMMIT_CONFIG, "true");

21 props.put(GROUP_ID_CONFIG, "test08291103");

22 // props.put(ConsumerConfig.CLIENT_ID_CONFIG, "test0829");

23 props.put(AUTO_COMMIT_INTERVAL_MS_CONFIG, 1000);

24 props.put(AUTO_OFFSET_RESET_CONFIG, "earliest");

25 props.put(KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");

26 props.put(VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");

27 props.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SASL_PLAINTEXT");

28

29 KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);

30 // topic name: test9

31 consumer.subscribe(Collections.singleton("test9"));

32 while (true) {

33 ConsumerRecords<String, String> records = consumer.poll(100);

34 for (ConsumerRecord<String, String> record : records)

35 System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());

36 }

37 }

38 }

启动参数

 -Djava.security.krb5.conf=c:\\app\\conf\\krb5.conf -Djava.security.auth.login.config=c:\\app\\conf\\kafka_jaas.conf  

windows 下记得用 \\ 

以上是 java 访问 kerberos 认证的 kafka 的全部内容, 来源链接: utcz.com/z/389949.html

回到顶部