PHP处理kafka消息队列

编程

安装PHP—kafka扩展后,就可以开始编写 php 消费消息的脚本了,php-rdkafka 扩展提供了几种消息处理的方式

低级方式(Low level)

这种方式没有消费组的概念

<?php

$rk=newRdKafkaConsumer();

$rk->setLogLevel(LOG_DEBUG);

// 指定 broker 地址,多个地址用"," 分割

$rk->addBrokers("192.168.33.1:9092");

$topic=$rk->newTopic("test");

$topic->consumeStart(0,RD_KAFKA_OFFSET_BEGINNING);

while(true){

// 第一个参数是分区号

// 第二个参数是超时时间

$msg=$topic->consume(0,1000);

if($msg->err){

echo$msg->errstr(),"

";

break;

}else{

echo$msg->payload,"

";

}

}

高级方式 (High level)

这种方式可以指定消费组,一个消费组内,一个consumer 进程只能读取一个分区,

<?php

$conf=newRdKafkaConf();

// Set a rebalance callback to log partition assignments (optional)

// 当有新的消费进程加入或者退出消费组时,kafka 会自动重新分配分区给消费者进程,这里注册了一个回调函数,当分区被重新分配时触发

$conf->setRebalanceCb(function(RdKafkaKafkaConsumer$kafka,$err,array$partitions=null){

switch($err){

caseRD_KAFKA_RESP_ERR__ASSIGN_PARTITIONS:

echo"Assign: ";

var_dump($partitions);

$kafka->assign($partitions);

break;

caseRD_KAFKA_RESP_ERR__REVOKE_PARTITIONS:

echo"Revoke: ";

var_dump($partitions);

$kafka->assign(NULL);

break;

default:

thrownewException($err);

}

});

// 配置groud.id 具有相同 group.id 的consumer 将会处理不同分区的消息,所以同一个组内的消费者数量如果订阅了一个topic, 那么消费者进程的数量多于 多于这个topic 分区的数量是没有意义的。

$conf->set("group.id","myConsumerGroup1");

//添加 kafka集群服务器地址

$conf->set("metadata.broker.list","192.168.33.1:9092");

$topicConf=newRdKafkaTopicConf();

// Set where to start consuming messages when there is no initial offset in

// offset store or the desired offset is out of range.

// "smallest": start from the beginning

//当没有初始偏移量时,从哪里开始读取

$topicConf->set("auto.offset.reset","smallest");

// Set the configuration to use for subscribed/assigned topics

$conf->setDefaultTopicConf($topicConf);

$consumer=newRdKafkaKafkaConsumer($conf);

// 让消费者订阅log 主题

$consumer->subscribe(["log"]);

while(true){

$message=$consumer->consume(120*1000);

switch($message->err){

caseRD_KAFKA_RESP_ERR_NO_ERROR:

var_dump($message);

break;

caseRD_KAFKA_RESP_ERR__PARTITION_EOF:

echo"No more messages; will wait for more

";

break;

caseRD_KAFKA_RESP_ERR__TIMED_OUT:

echo"Timed out

";

break;

default:

thrownewException($message->errstr(),$message->err);

break;

}

}

?>

 

以上是 PHP处理kafka消息队列 的全部内容, 来源链接: utcz.com/z/516573.html

回到顶部