Python测试Kafka集群(pykafka)实例

生产者代码:

# -* coding:utf8 *-

from pykafka import KafkaClient

host = 'IP:9092, IP:9092, IP:9092'

client = KafkaClient(hosts = host)

print client.topics

# 生产者

topicdocu = client.topics['my-topic']

producer = topicdocu.get_producer()

for i in range(100):

print i

producer.produce('test message ' + str(i ** 2))

producer.stop()

消费者代码:

# -* coding:utf8 *-

from pykafka import KafkaClient

host = 'IP:9092, IP:9092, IP:9092'

client = KafkaClient(hosts = host)

print client.topics

# 消费者

topic = client.topics['my-topic']

consumer = topic.get_simple_consumer(consumer_group='test', auto_commit_enable=True, auto_commit_interval_ms=1,

consumer_id='test')

for message in consumer:

if message is not None:

print message.offset, message.value

以上这篇Python测试Kafka集群(pykafka)实例就是小编分享给大家的全部内容了,希望能给大家一个参考,也希望大家多多支持。

以上是 Python测试Kafka集群(pykafka)实例 的全部内容, 来源链接: utcz.com/z/328225.html

回到顶部