java 访问 kerberos 认证的 kafka
1 <?xml version="1.0" encoding="UTF-8"?>2 <project xmlns="http://maven.apache.org/POM/4.0.0"
3 xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
4 xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
5 <modelVersion>4.0.0</modelVersion>
6
7 <groupId>com.ht</groupId>
8 <artifactId>kafkatest</artifactId>
9 <version>1.0-SNAPSHOT</version>
10 <build>
11 <plugins>
12 <plugin>
13 <groupId>org.apache.maven.plugins</groupId>
14 <artifactId>maven-compiler-plugin</artifactId>
15 <configuration>
16 <source>1.7</source>
17 <target>1.7</target>
18 </configuration>
19 </plugin>
20 </plugins>
21 </build>
22
23
24 <dependencies>
25 <dependency>
26 <groupId>org.apache.kafka</groupId>
27 <artifactId>kafka-clients</artifactId>
28 <version>0.10.0.0</version>
29 </dependency>
30 </dependencies>
31 </project>
java 代码
1 import org.apache.kafka.clients.CommonClientConfigs;2 import org.apache.kafka.clients.consumer.ConsumerRecord;
3 import org.apache.kafka.clients.consumer.ConsumerRecords;
4 import org.apache.kafka.clients.consumer.KafkaConsumer;
5
6 import java.util.Collections;
7 import java.util.Properties;
8
9 import static org.apache.kafka.clients.consumer.ConsumerConfig.*;
10
11 /**
12 * @author sunzq
13 * @since 2017/8/29
14 */
15 public class Application {
16 public static void main(String[] args) {
17
18 Properties props = new Properties();
19 props.put(BOOTSTRAP_SERVERS_CONFIG, "node1:6667,node2:6667,node3:6667,node4:6667");
20 props.put(ENABLE_AUTO_COMMIT_CONFIG, "true");
21 props.put(GROUP_ID_CONFIG, "test08291103");
22 // props.put(ConsumerConfig.CLIENT_ID_CONFIG, "test0829");
23 props.put(AUTO_COMMIT_INTERVAL_MS_CONFIG, 1000);
24 props.put(AUTO_OFFSET_RESET_CONFIG, "earliest");
25 props.put(KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
26 props.put(VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
27 props.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SASL_PLAINTEXT");
28
29 KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
30 // topic name: test9
31 consumer.subscribe(Collections.singleton("test9"));
32 while (true) {
33 ConsumerRecords<String, String> records = consumer.poll(100);
34 for (ConsumerRecord<String, String> record : records)
35 System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());
36 }
37 }
38 }
启动参数
-Djava.security.krb5.conf=c:\\app\\conf\\krb5.conf -Djava.security.auth.login.config=c:\\app\\conf\\kafka_jaas.conf
windows 下记得用 \\
以上是 java 访问 kerberos 认证的 kafka 的全部内容, 来源链接: utcz.com/z/389949.html