PHP操作使用消息中间件Kafka

编程

简单测试

环境:Centos6.4,PHP7,kafka服务器IP:192.168.9.154,PHP服务器:192.168.9.157

在192.168.9.157创建目录和文件。

//生产者

<?php

require"./modules/kafka.php";

$rk=newkafka();

$rk->send(["hello my kafka"]);

echo"OK~";

//消费者

<?php

require"./modules/kafka.php";

$rk=newkafka();

$rk->consumer();

//Kafka

<?php

classkafka

{

public$broker_list="192.168.9.154:9092";//现在是一个,也可以多个,用逗号隔开

public$topic="mytopic";//定义topic

public$partition=0;//定义topic的物理分组,这里是0

public$logFile="./kafkalog/info.log";//计划在消费的时候写日志,指定日志文件

protected$producer=null;//存储producer对象

protected$consumer=null;//存储consumer对象

publicfunction__construct()

{

if(empty($this->broker_list))

{

echo"broker not config";

}

$rk=newRdkafkaProducer();//实例化对象

if(empty($rk)){

echo"producer error1";

}

$rk->setLogLevel(LOG_DEBUG);//设置错误级别

if(!$rk->addBrokers($this->broker_list)){//设置Broker地址

echo"producer error2";

}

$this->producer=$rk;

}

//生产者的方法(生产者把日志向消息队列发送)

publicfunctionsend($message=[])

{

$topic=$this->producer->newTopic($this->topic);//创建topic

$topic->produce(RD_KAFKA_PARTITION_UA,$this->partition,json_encode([$message]);//生产

}

//消费者方法 (监听消息队列)

publicfunctionconsumer()

{

$conf=newRdkafkaConf();

$conf->set("group.id",0);

$conf->set("metadata.broker.list",$this->broker_list);

$topicConf=newRdkafkatopicConf();

$topicConf->set("auto.offset.reset","smallest");

$conf->setDefaultTopicConf($topicConf);

$consumer=newRdkafkakafkaConsumer($conf);

$consumer->subscribe([$this->topic]);//订阅

echo"wating for message....n";

while(true){

$message=$consumer->consume(0*1000);

switch($message->err){

caseRD_KAFKA_RESP_ERR_NO_ERROR:

echo"要处理消息了~~~";

$messageInfo=$message->payload;

// echo $messageInfo."n";

break;

}

sleep(1);

}

}

}

记住消费者PHP文件要在终端运行:php consumer.php。

这里就不测试了。

工作代码:

 

 

/**

* 客户登陆信息 服务化接口调用client端

*/

class CustomerLoginInfoServiceClient {

public function add($topic, $msg) {

//直接进kafka不再调用java服务

EdjKafka::getInstance()->putin(array("topic" => $topic, "payload" => $msg));

}

}

 

class EdjKafka {

private static $instance;

public static function getInstance($className=__CLASS__) {

if (empty(self::$instance)) {

self::$instance = new $className();

}

return self::$instance;

}

public function putin($params) {

$task = array(

"class" => __CLASS__,

"method" => "putin_job",

"params" => $params,

);

Queue::model()->putin($task, "phptokafka_0000");

}

public function putin_job($params) {

KafkaProducer::getInstance()->putin($params["topic"], $params["payload"]);

}

}

 

<?php

require_once(Yii::app()->basePath."/vendors/kafka/autoload.php");//kafka包在最下面

classKafkaProducer{

privatestatic$instance;

private$producer;

private$partitionCountMap=array();

publicstaticfunctiongetInstance($className=__CLASS__){

if(empty(self::$instance)){

self::$instance=new$className();

}

returnself::$instance;

}

publicfunction__construct(){

$brokers=Yii::app()->params["kafka"]["brokers"];

$newProducer=KafkaProduce::getInstance($brokers,3000,$brokers);

$newProducer->setRequireAck(-1);

$this->producer=$newProducer;

}

privatefunctiongetPartitionCount($topic,$force=false){

$now=time();

//3分钟查询一次patition

if(!$force&&array_key_exists($topic,$this->partitionCountMap)&&$this->partitionCountMap[$topic]["expire"]>$now){

return$this->partitionCountMap[$topic]["count"];

}

//获取到topic下可用的partitions

$this->producer->getClient()->refreshMetadata();

$partitions=$this->producer->getAvailablePartitions($topic);

EdjLog::info(__METHOD__."|".$topic."|get partition|".json_encode($partitions));

$partitionCount=count($partitions);

if($partitionCount==0){

EdjLog::error(__METHOD__."|".$topic."|topic partitions count 0");

}

$this->partitionCountMap[$topic]=array("count"=>$partitionCount,"expire"=>$now+180);

return$partitionCount;

}

publicfunctionputin($topic,$payload){

if(empty($topic)){

return;

}

$partitionCount=$this->getPartitionCount($topic);

if($partitionCount!=0){

try{

$pid=time()%$partitionCount;

$this->producer->setMessages($topic,$pid,array($payload));

$result=$this->producer->send();

EdjLog::debugLog(__METHOD__."|".$topic."|".$pid);

}catch(KafkaException$e){

EdjLog::error(__METHOD__."|".$e->getMessage());

$this->getPartitionCount($topic,true);

}

}

}

}

 

<?php

returnarray(

"brokers"=>"123.123.123.123:9092,123.123.123.123:9093,123.123.123.123:9094",//ip一样,端口不一样

//topic名的映射,推荐用class名字做key

//测试环境和线上用不同的配置文件

"topicmap"=>array(

"RDriverPositionToKafka"=>"driver_location_test",

"ROrderToKafka"=>"order_test",

"SubmitOrderAutoService_saveOrderInfoJob"=>"finished_order_picture",

"vip_customer_change"=>"vip_customer_change",

),

);

 

<?php

/**

* 基于redis的queue队列

*/

classQueue{

privatestatic$_models;

public$queue_max_length=array(

);

publicstaticfunctionmodel($className=__CLASS__){

$model=null;

if(isset(self::$_models[$className]))

$model=self::$_models[$className];

else{

$model=self::$_models[$className]=new$className(null);

}

return$model;

}

//确定redis

privatefunctionselect_redis($type){

returnQueuePool::model()->get_zone($type);

}

privatefunctiontrim($queue_name){

$type=str_replace("queue_","",$queue_name);

$max=0;

if(isset($this->queue_max_length[$type])){

$max=intval($this->queue_max_length[$type]);

}

if($max>0){

$zone=$this->select_redis($type);

if($zone){

$zone["redis"]->lTrim($queue_name,0,$max-1);

}

else{

EdjLog::error("can not find zone, queue name: ".$type);

return;

}

}

}

/**

* 放入队列,统一队列对外暴露方法,增加类型默认放task队列,指定了就放对应的队列,同时如果不在指定类型内的,也放默认队列

*

* @author sunhongjing 2013-07-04

* @param unknown_type $params

* @param unknown_type $type

* @return mixed

*/

publicfunctionputin($params=null,$type){

$type=empty($type)?"error":strtolower($type);

$base_qname=QNameManagerService::model()->get_base_qname($type);

if(!empty($base_qname)){

$this->queue_name="queue_".$base_qname;

}else{

$this->queue_name="queue_error";

}

if($params===null){

return$this->get();

}else{

return$this->add($params);

}

}

/**

* 取一条队列数据,封装多个队列,统一调用方法

* @author sunhongjing 2013-07-09

* @param string $type

* @return array

*/

publicfunctiongetit($type="default")

{

$base_qname=QNameManagerService::model()->get_base_qname($type);

if(!empty($base_qname)){

$this->queue_name="queue_".$base_qname;

}else{

returnarray();

}

$zone=$this->select_redis($type);

if($zone){

if($zone["brpop"]){

$json="";

$result=$zone["redis"]->brPop($this->queue_name,$zone["brpop"]);

if(!empty($result)&&isset($result[1])){

$json=$result[1];

}

}

else{

$json=$zone["redis"]->rPop($this->queue_name);

}

}

else{

EdjLog::error("can not find zone, queue name: ".$type);

returnarray();

}

returnjson_decode($json,true);

}

/**

* 返回队列接收的类型列表

* @author sunhongjing 2013-07-04

* @return array

*/

publicfunctiongetQueueTypeList()

{

$list=QNameManager::model()->findall();

if($list){

return$list;

}

EdjLog::error("Error: get queue list from database");

returnarray();

}

/**

* 设置或者读取位置队列

* @param array $params

* @return mixed

*/

publicfunctionposition($params=null){

$this->queue_name="queue_position";

if($params===null){

return$this->get();

}else{

return$this->add($params);

}

}

/**

* 心跳队列

* @param string $params

* @return mixed

*/

publicfunctionheartbeat($params=null){

$this->queue_name="queue_heartbeat";

if($params===null){

return$this->get();

}else{

return$this->add($params);

}

}

/**

* 最高优先级队列

* @param string $params

* @return mixed

*/

publicfunctiontask($params=null){

$this->queue_name="queue_task";

if($params===null){

return$this->get();

}else{

return$this->add($params);

}

}

/**

* 保存日志到数据库

* @param string $params

* @return mixed

*/

publicfunctiondumplog($params=null){

$this->queue_name="queue_dumplog";

if($params===null){

return$this->get();

}else{

return$this->add($params);

}

}

/**

* 返回各个队列中的任务总数

*/

publicfunctionlength(){

$queue=$this->getQueueTypeList();

$queue_length=array();

$reg="/P[0-9]+$/";

foreach($queueas$item){

$base_qname=$item->base_qname;

$zone=$this->select_redis($base_qname);

$key="queue_".$base_qname;

if($zone){

$len=$zone["redis"]->lLen($key);

if(isset($item->max)&&$len>$item->max){

$key="!".$key;

}

$pkey="";

if(preg_match($reg,$zone["name"])){

$pkey=$key."@".$zone["name"];

}

else{

$pkey=$key."@".$zone["name"]."_P".$item->level;

}

$queue_length[$pkey]=$len;

}

else{

EdjLog::error("can not find zone, queue name: ".$key);

}

}

return$queue_length;

}

privatefunctionget(){

$type=str_replace("queue_","",$this->queue_name);

$zone=$this->select_redis($type);

if($zone){

if($zone["brpop"]){

$json="";

$result=$zone["redis"]->brPop($this->queue_name,$zone["brpop"]);

if(!empty($result)&&isset($result[1])){

$json=$result[1];

}

}

else{

$json=$zone["redis"]->rPop($this->queue_name);

}

}

else{

EdjLog::error("can not find zone, queue name: ".$type);

returnarray();

}

returnjson_decode($json,true);

}

privatefunctionadd($params){

$json=json_encode($params);

$type=str_replace("queue_","",$this->queue_name);

$zone=$this->select_redis($type);

$return=0;

if($zone){

try{

$return=$zone["redis"]->lPush($this->queue_name,$json);

}catch(Exception$e){

EdjLog::error("write redis error,msg:".$e->getMessage());

//echo $e->getMessage();

}

}

else{

EdjLog::error("can not find zone, queue name: ".$type);

}

return$return;

}

publicfunctionprocessTask($task){

if(!isset($task["method"],$task["params"])){

$task_content=json_encode($task);

EdjLog::error("can not run task due to no "method" or "params" specified, task is $task_content");

return;

}

$method=$task["method"];

$params=$task["params"];

$class=isset($task["class"])?$task["class"]:"QueueProcess";

EdjLog::info("REDIS_QUEUE_OUT CLASS:$class METHOD:$method PARAMS:".json_encode($params));

try{

//throw new Exception("Value must be 1 or below");

$queue_process=new$class();

// check this method is exist, if not throw ReflectionException

newReflectionMethod($queue_process,$method);

call_user_func_array(array($queue_process,$method),array($params));

}catch(Exception$e){

$errmsg=$e->getMessage();

EdjLog::error("execption queue_run method:$method err: $errmsg");

}

}

publicfunctiongetLengthByType($type){

$type=empty($type)?"error":strtolower($type);

$base_qname=QNameManagerService::model()->get_base_qname($type);

$zone=$this->select_redis($base_qname);

$key="queue_".$base_qname;

$len=0;

if($zone){

$len=$zone["redis"]->lLen($key);

}else{

EdjLog::error("can not find zone, queue name: ".$base_qname);

}

return$len;

}

}

 

以上是 PHP操作使用消息中间件Kafka 的全部内容, 来源链接: utcz.com/z/516793.html

回到顶部