kafka安装
1、下载解压
https://mirror.bit.edu.cn/apache/kafka/2.5.0/kafka_2.12-2.5.0.tgz
解压:
> tar -xzf kafka_2.12-2.5.0.tgz> cd kafka_2.12-2.5.0
2、 启动ZooKeeper服务器。因为Kafka 使用了 ZooKeeper,所以需要先启动一个ZooKeeper服务器。 可以单独下载zookeeper安装启动,也可以通过与kafka打包在一起的便捷脚本来快速简单地创建一个单节点ZooKeeper实例。
通过与kafka打包在一起的便捷脚本来快速简单地创建一个单节点ZooKeeper实例
2.1 ) 配置ZooKeeper服务器# 修改kafka_2.12-2.5.0/config/zookeeper.properties文件
dataDir=/Users/zhoulp/develop/kafka/zookeeperData # 数据目录
【window版】
dataDir=D:kafkazookeeperData # 数据目录
2.2 ) 启动ZooKeeper服务器
> ./bin/zookeeper-server-start.sh config/zookeeper.properties&
【window版】
D:kafka_2.12-2.5.0inwindows>zookeeper-server-start.bat D:kafka_2.12-2.5.0configzookeeper.properties
3、启动kafka服务器
2.1 ) 配置kafka服务器# 修改kafka_2.12-2.5.0/config/server.properties文件
log.dirs=/Users/zhoulp/develop/kafka/logs # 日志目录
【window版】
log.dirs=D:kafkalogs # 日志目录
2.2 ) 启动kafka服务器
> ./bin/kafka-server-start.sh config/server.properties
【window版】
D:kafka_2.12-2.5.0inwindows>kafka-server-start.bat D:kafka_2.12-2.5.0configserver.properties
windows版启动kafka时,会报以下错误:
[2020-07-30 17:59:41,316] INFO Opening socket connection to server localhost/0:0:0:0:0:0:0:1:2181. Will not attempt to authenticate using SASL (unknown error) (org.apache.zookeeper.ClientCnxn)[2020-07-30 17:59:41,320] INFO Socket connection established, initiating session, client: /0:0:0:0:0:0:0:1:58085, server: localhost/0:0:0:0:0:0:0:1:2181 (org.apache.zookeeper.ClientCnxn)
[2020-07-30 17:59:59,316] INFO [ZooKeeperClient Kafka server] Closing. (kafka.zookeeper.ZooKeeperClient)
[2020-07-30 17:59:59,322] WARN Client session timed out, have not heard from server in 18001ms for sessionid 0x0 (org.apache.zookeeper.ClientCnxn)
[2020-07-30 17:59:59,436] INFO Session: 0x0 closed (org.apache.zookeeper.ZooKeeper)
[2020-07-30 17:59:59,436] INFO EventThread shut down for session: 0x0 (org.apache.zookeeper.ClientCnxn)
[2020-07-30 17:59:59,438] INFO [ZooKeeperClient Kafka server] Closed. (kafka.zookeeper.ZooKeeperClient)
[2020-07-30 17:59:59,446] ERROR Fatal error during KafkaServer startup. Prepare to shutdown (kafka.server.KafkaServer)
kafka.zookeeper.ZooKeeperClientTimeoutException: Timed out waiting for connection while in state: CONNECTING
at kafka.zookeeper.ZooKeeperClient.$anonfun$waitUntilConnected$3(ZooKeeperClient.scala:262)
at kafka.zookeeper.ZooKeeperClient.waitUntilConnected(ZooKeeperClient.scala:258)
at kafka.zookeeper.ZooKeeperClient.<init>(ZooKeeperClient.scala:119)
at kafka.zk.KafkaZkClient$.apply(KafkaZkClient.scala:1863)
at kafka.server.KafkaServer.createZkClient$1(KafkaServer.scala:378)
at kafka.server.KafkaServer.initZkClient(KafkaServer.scala:403)
at kafka.server.KafkaServer.startup(KafkaServer.scala:210)
at kafka.server.KafkaServerStartable.startup(KafkaServerStartable.scala:44)
at kafka.Kafka$.main(Kafka.scala:82)
at kafka.Kafka.main(Kafka.scala)
[2020-07-30 17:59:59,450] INFO shutting down (kafka.server.KafkaServer)
[2020-07-30 17:59:59,457] INFO shut down completed (kafka.server.KafkaServer)
[2020-07-30 17:59:59,462] ERROR Exiting Kafka. (kafka.server.KafkaServerStartable)
[2020-07-30 17:59:59,463] INFO shutting down (kafka.server.KafkaServer)
4、创建一个 topic
1) 创建一个名为"demo"的topic,它只有一个分区和一个副本> ./bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic demo
2) 使用list命令查看topic列表
> ./bin/kafka-topics.sh --list --zookeeper localhost:2181
注:也可以配置在发布topic不存在的时,自动创建topic,而不是手动创建
5、发送消息
Kafka自带一个命令行客户端,它从文件或标准输入中获取输入,并将其作为message(消息)发送到Kafka集群。默认情况下,每行将作为单独的message发送。运行 producer,然后在控制台输入一些消息以发送到服务器。
> ./bin/kafka-console-producer.sh --broker-list localhost:9092 --topic demo
>what"s your nanme?
>what"s her name?
6、启动一个 consumer
Kafka 还有一个命令行consumer(消费者),将消息转储到标准输出。> ./bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic demo --from-beginning
what"s your nanme?
what"s her name?
如果您将上述命令在不同的终端中运行,那么现在就可以将消息输入到生产者终端中,并将它们在消费终端中显示出来。
所有的命令行工具都有其他选项;运行不带任何参数的命令将显示更加详细的使用信息。
以上是 kafka安装 的全部内容, 来源链接: utcz.com/z/518926.html