kafka安装

编程

1、下载解压

https://mirror.bit.edu.cn/apache/kafka/2.5.0/kafka_2.12-2.5.0.tgz

解压:

> tar -xzf kafka_2.12-2.5.0.tgz

> cd kafka_2.12-2.5.0

2、 启动ZooKeeper服务器。因为Kafka 使用了 ZooKeeper,所以需要先启动一个ZooKeeper服务器。 可以单独下载zookeeper安装启动,也可以通过与kafka打包在一起的便捷脚本来快速简单地创建一个单节点ZooKeeper实例。

通过与kafka打包在一起的便捷脚本来快速简单地创建一个单节点ZooKeeper实例

2.1 ) 配置ZooKeeper服务器

# 修改kafka_2.12-2.5.0/config/zookeeper.properties文件

dataDir=/Users/zhoulp/develop/kafka/zookeeperData # 数据目录

【window版】

dataDir=D:kafkazookeeperData # 数据目录

2.2 ) 启动ZooKeeper服务器

> ./bin/zookeeper-server-start.sh config/zookeeper.properties&

【window版】

D:kafka_2.12-2.5.0inwindows>zookeeper-server-start.bat D:kafka_2.12-2.5.0configzookeeper.properties

3、启动kafka服务器

2.1 ) 配置kafka服务器

# 修改kafka_2.12-2.5.0/config/server.properties文件

log.dirs=/Users/zhoulp/develop/kafka/logs # 日志目录

【window版】

log.dirs=D:kafkalogs # 日志目录

2.2 ) 启动kafka服务器

> ./bin/kafka-server-start.sh config/server.properties

【window版】

D:kafka_2.12-2.5.0inwindows>kafka-server-start.bat D:kafka_2.12-2.5.0configserver.properties

windows版启动kafka时,会报以下错误:

[2020-07-30 17:59:41,316] INFO Opening socket connection to server localhost/0:0:0:0:0:0:0:1:2181. Will not attempt to authenticate using SASL (unknown error) (org.apache.zookeeper.ClientCnxn)

[2020-07-30 17:59:41,320] INFO Socket connection established, initiating session, client: /0:0:0:0:0:0:0:1:58085, server: localhost/0:0:0:0:0:0:0:1:2181 (org.apache.zookeeper.ClientCnxn)

[2020-07-30 17:59:59,316] INFO [ZooKeeperClient Kafka server] Closing. (kafka.zookeeper.ZooKeeperClient)

[2020-07-30 17:59:59,322] WARN Client session timed out, have not heard from server in 18001ms for sessionid 0x0 (org.apache.zookeeper.ClientCnxn)

[2020-07-30 17:59:59,436] INFO Session: 0x0 closed (org.apache.zookeeper.ZooKeeper)

[2020-07-30 17:59:59,436] INFO EventThread shut down for session: 0x0 (org.apache.zookeeper.ClientCnxn)

[2020-07-30 17:59:59,438] INFO [ZooKeeperClient Kafka server] Closed. (kafka.zookeeper.ZooKeeperClient)

[2020-07-30 17:59:59,446] ERROR Fatal error during KafkaServer startup. Prepare to shutdown (kafka.server.KafkaServer)

kafka.zookeeper.ZooKeeperClientTimeoutException: Timed out waiting for connection while in state: CONNECTING

at kafka.zookeeper.ZooKeeperClient.$anonfun$waitUntilConnected$3(ZooKeeperClient.scala:262)

at kafka.zookeeper.ZooKeeperClient.waitUntilConnected(ZooKeeperClient.scala:258)

at kafka.zookeeper.ZooKeeperClient.<init>(ZooKeeperClient.scala:119)

at kafka.zk.KafkaZkClient$.apply(KafkaZkClient.scala:1863)

at kafka.server.KafkaServer.createZkClient$1(KafkaServer.scala:378)

at kafka.server.KafkaServer.initZkClient(KafkaServer.scala:403)

at kafka.server.KafkaServer.startup(KafkaServer.scala:210)

at kafka.server.KafkaServerStartable.startup(KafkaServerStartable.scala:44)

at kafka.Kafka$.main(Kafka.scala:82)

at kafka.Kafka.main(Kafka.scala)

[2020-07-30 17:59:59,450] INFO shutting down (kafka.server.KafkaServer)

[2020-07-30 17:59:59,457] INFO shut down completed (kafka.server.KafkaServer)

[2020-07-30 17:59:59,462] ERROR Exiting Kafka. (kafka.server.KafkaServerStartable)

[2020-07-30 17:59:59,463] INFO shutting down (kafka.server.KafkaServer)

4、创建一个 topic

1) 创建一个名为"demo"的topic,它只有一个分区和一个副本

> ./bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic demo

2) 使用list命令查看topic列表

> ./bin/kafka-topics.sh --list --zookeeper localhost:2181

注:也可以配置在发布topic不存在的时,自动创建topic,而不是手动创建

5、发送消息

Kafka自带一个命令行客户端,它从文件或标准输入中获取输入,并将其作为message(消息)发送到Kafka集群。默认情况下,每行将作为单独的message发送。

运行 producer,然后在控制台输入一些消息以发送到服务器。

> ./bin/kafka-console-producer.sh --broker-list localhost:9092 --topic demo

>what"s your nanme?

>what"s her name?

6、启动一个 consumer

Kafka 还有一个命令行consumer(消费者),将消息转储到标准输出。

> ./bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic demo --from-beginning

what"s your nanme?

what"s her name?

如果您将上述命令在不同的终端中运行,那么现在就可以将消息输入到生产者终端中,并将它们在消费终端中显示出来。

所有的命令行工具都有其他选项;运行不带任何参数的命令将显示更加详细的使用信息。

以上是 kafka安装 的全部内容, 来源链接: utcz.com/z/518926.html

回到顶部