PHP操作使用消息中间件Kafka
简单测试
环境:Centos6.4,PHP7,kafka服务器IP:192.168.9.154,PHP服务器:192.168.9.157
在192.168.9.157创建目录和文件。
//生产者<?php
require"./modules/kafka.php";
$rk=newkafka();
$rk->send(["hello my kafka"]);
echo"OK~";
//消费者<?php
require"./modules/kafka.php";
$rk=newkafka();
$rk->consumer();
//Kafka类<?php
classkafka
{
public$broker_list="192.168.9.154:9092";//现在是一个,也可以多个,用逗号隔开
public$topic="mytopic";//定义topic
public$partition=0;//定义topic的物理分组,这里是0
public$logFile="./kafkalog/info.log";//计划在消费的时候写日志,指定日志文件
protected$producer=null;//存储producer对象
protected$consumer=null;//存储consumer对象
publicfunction__construct()
{
if(empty($this->broker_list))
{
echo"broker not config";
}
$rk=newRdkafkaProducer();//实例化对象
if(empty($rk)){
echo"producer error1";
}
$rk->setLogLevel(LOG_DEBUG);//设置错误级别
if(!$rk->addBrokers($this->broker_list)){//设置Broker地址
echo"producer error2";
}
$this->producer=$rk;
}
//生产者的方法(生产者把日志向消息队列发送)
publicfunctionsend($message=[])
{
$topic=$this->producer->newTopic($this->topic);//创建topic
$topic->produce(RD_KAFKA_PARTITION_UA,$this->partition,json_encode([$message]);//生产
}
//消费者方法 (监听消息队列)
publicfunctionconsumer()
{
$conf=newRdkafkaConf();
$conf->set("group.id",0);
$conf->set("metadata.broker.list",$this->broker_list);
$topicConf=newRdkafkatopicConf();
$topicConf->set("auto.offset.reset","smallest");
$conf->setDefaultTopicConf($topicConf);
$consumer=newRdkafkakafkaConsumer($conf);
$consumer->subscribe([$this->topic]);//订阅
echo"wating for message....n";
while(true){
$message=$consumer->consume(0*1000);
switch($message->err){
caseRD_KAFKA_RESP_ERR_NO_ERROR:
echo"要处理消息了~~~";
$messageInfo=$message->payload;
// echo $messageInfo."n";
break;
}
sleep(1);
}
}
}
记住消费者PHP文件要在终端运行:php consumer.php。
这里就不测试了。
工作代码:
/** * 客户登陆信息 服务化接口调用client端
*/
class CustomerLoginInfoServiceClient {
public function add($topic, $msg) {
//直接进kafka不再调用java服务
EdjKafka::getInstance()->putin(array("topic" => $topic, "payload" => $msg));
}
}
class EdjKafka { private static $instance;
public static function getInstance($className=__CLASS__) {
if (empty(self::$instance)) {
self::$instance = new $className();
}
return self::$instance;
}
public function putin($params) {
$task = array(
"class" => __CLASS__,
"method" => "putin_job",
"params" => $params,
);
Queue::model()->putin($task, "phptokafka_0000");
}
public function putin_job($params) {
KafkaProducer::getInstance()->putin($params["topic"], $params["payload"]);
}
}
<?phprequire_once(Yii::app()->basePath."/vendors/kafka/autoload.php");//kafka包在最下面
classKafkaProducer{
privatestatic$instance;
private$producer;
private$partitionCountMap=array();
publicstaticfunctiongetInstance($className=__CLASS__){
if(empty(self::$instance)){
self::$instance=new$className();
}
returnself::$instance;
}
publicfunction__construct(){
$brokers=Yii::app()->params["kafka"]["brokers"];
$newProducer=KafkaProduce::getInstance($brokers,3000,$brokers);
$newProducer->setRequireAck(-1);
$this->producer=$newProducer;
}
privatefunctiongetPartitionCount($topic,$force=false){
$now=time();
//3分钟查询一次patition
if(!$force&&array_key_exists($topic,$this->partitionCountMap)&&$this->partitionCountMap[$topic]["expire"]>$now){
return$this->partitionCountMap[$topic]["count"];
}
//获取到topic下可用的partitions
$this->producer->getClient()->refreshMetadata();
$partitions=$this->producer->getAvailablePartitions($topic);
EdjLog::info(__METHOD__."|".$topic."|get partition|".json_encode($partitions));
$partitionCount=count($partitions);
if($partitionCount==0){
EdjLog::error(__METHOD__."|".$topic."|topic partitions count 0");
}
$this->partitionCountMap[$topic]=array("count"=>$partitionCount,"expire"=>$now+180);
return$partitionCount;
}
publicfunctionputin($topic,$payload){
if(empty($topic)){
return;
}
$partitionCount=$this->getPartitionCount($topic);
if($partitionCount!=0){
try{
$pid=time()%$partitionCount;
$this->producer->setMessages($topic,$pid,array($payload));
$result=$this->producer->send();
EdjLog::debugLog(__METHOD__."|".$topic."|".$pid);
}catch(KafkaException$e){
EdjLog::error(__METHOD__."|".$e->getMessage());
$this->getPartitionCount($topic,true);
}
}
}
}
<?phpreturnarray(
"brokers"=>"123.123.123.123:9092,123.123.123.123:9093,123.123.123.123:9094",//ip一样,端口不一样
//topic名的映射,推荐用class名字做key
//测试环境和线上用不同的配置文件
"topicmap"=>array(
"RDriverPositionToKafka"=>"driver_location_test",
"ROrderToKafka"=>"order_test",
"SubmitOrderAutoService_saveOrderInfoJob"=>"finished_order_picture",
"vip_customer_change"=>"vip_customer_change",
),
);
<?php/**
* 基于redis的queue队列
*/
classQueue{
privatestatic$_models;
public$queue_max_length=array(
);
publicstaticfunctionmodel($className=__CLASS__){
$model=null;
if(isset(self::$_models[$className]))
$model=self::$_models[$className];
else{
$model=self::$_models[$className]=new$className(null);
}
return$model;
}
//确定redis
privatefunctionselect_redis($type){
returnQueuePool::model()->get_zone($type);
}
privatefunctiontrim($queue_name){
$type=str_replace("queue_","",$queue_name);
$max=0;
if(isset($this->queue_max_length[$type])){
$max=intval($this->queue_max_length[$type]);
}
if($max>0){
$zone=$this->select_redis($type);
if($zone){
$zone["redis"]->lTrim($queue_name,0,$max-1);
}
else{
EdjLog::error("can not find zone, queue name: ".$type);
return;
}
}
}
/**
* 放入队列,统一队列对外暴露方法,增加类型默认放task队列,指定了就放对应的队列,同时如果不在指定类型内的,也放默认队列
*
* @author sunhongjing 2013-07-04
* @param unknown_type $params
* @param unknown_type $type
* @return mixed
*/
publicfunctionputin($params=null,$type){
$type=empty($type)?"error":strtolower($type);
$base_qname=QNameManagerService::model()->get_base_qname($type);
if(!empty($base_qname)){
$this->queue_name="queue_".$base_qname;
}else{
$this->queue_name="queue_error";
}
if($params===null){
return$this->get();
}else{
return$this->add($params);
}
}
/**
* 取一条队列数据,封装多个队列,统一调用方法
* @author sunhongjing 2013-07-09
* @param string $type
* @return array
*/
publicfunctiongetit($type="default")
{
$base_qname=QNameManagerService::model()->get_base_qname($type);
if(!empty($base_qname)){
$this->queue_name="queue_".$base_qname;
}else{
returnarray();
}
$zone=$this->select_redis($type);
if($zone){
if($zone["brpop"]){
$json="";
$result=$zone["redis"]->brPop($this->queue_name,$zone["brpop"]);
if(!empty($result)&&isset($result[1])){
$json=$result[1];
}
}
else{
$json=$zone["redis"]->rPop($this->queue_name);
}
}
else{
EdjLog::error("can not find zone, queue name: ".$type);
returnarray();
}
returnjson_decode($json,true);
}
/**
* 返回队列接收的类型列表
* @author sunhongjing 2013-07-04
* @return array
*/
publicfunctiongetQueueTypeList()
{
$list=QNameManager::model()->findall();
if($list){
return$list;
}
EdjLog::error("Error: get queue list from database");
returnarray();
}
/**
* 设置或者读取位置队列
* @param array $params
* @return mixed
*/
publicfunctionposition($params=null){
$this->queue_name="queue_position";
if($params===null){
return$this->get();
}else{
return$this->add($params);
}
}
/**
* 心跳队列
* @param string $params
* @return mixed
*/
publicfunctionheartbeat($params=null){
$this->queue_name="queue_heartbeat";
if($params===null){
return$this->get();
}else{
return$this->add($params);
}
}
/**
* 最高优先级队列
* @param string $params
* @return mixed
*/
publicfunctiontask($params=null){
$this->queue_name="queue_task";
if($params===null){
return$this->get();
}else{
return$this->add($params);
}
}
/**
* 保存日志到数据库
* @param string $params
* @return mixed
*/
publicfunctiondumplog($params=null){
$this->queue_name="queue_dumplog";
if($params===null){
return$this->get();
}else{
return$this->add($params);
}
}
/**
* 返回各个队列中的任务总数
*/
publicfunctionlength(){
$queue=$this->getQueueTypeList();
$queue_length=array();
$reg="/P[0-9]+$/";
foreach($queueas$item){
$base_qname=$item->base_qname;
$zone=$this->select_redis($base_qname);
$key="queue_".$base_qname;
if($zone){
$len=$zone["redis"]->lLen($key);
if(isset($item->max)&&$len>$item->max){
$key="!".$key;
}
$pkey="";
if(preg_match($reg,$zone["name"])){
$pkey=$key."@".$zone["name"];
}
else{
$pkey=$key."@".$zone["name"]."_P".$item->level;
}
$queue_length[$pkey]=$len;
}
else{
EdjLog::error("can not find zone, queue name: ".$key);
}
}
return$queue_length;
}
privatefunctionget(){
$type=str_replace("queue_","",$this->queue_name);
$zone=$this->select_redis($type);
if($zone){
if($zone["brpop"]){
$json="";
$result=$zone["redis"]->brPop($this->queue_name,$zone["brpop"]);
if(!empty($result)&&isset($result[1])){
$json=$result[1];
}
}
else{
$json=$zone["redis"]->rPop($this->queue_name);
}
}
else{
EdjLog::error("can not find zone, queue name: ".$type);
returnarray();
}
returnjson_decode($json,true);
}
privatefunctionadd($params){
$json=json_encode($params);
$type=str_replace("queue_","",$this->queue_name);
$zone=$this->select_redis($type);
$return=0;
if($zone){
try{
$return=$zone["redis"]->lPush($this->queue_name,$json);
}catch(Exception$e){
EdjLog::error("write redis error,msg:".$e->getMessage());
//echo $e->getMessage();
}
}
else{
EdjLog::error("can not find zone, queue name: ".$type);
}
return$return;
}
publicfunctionprocessTask($task){
if(!isset($task["method"],$task["params"])){
$task_content=json_encode($task);
EdjLog::error("can not run task due to no "method" or "params" specified, task is $task_content");
return;
}
$method=$task["method"];
$params=$task["params"];
$class=isset($task["class"])?$task["class"]:"QueueProcess";
EdjLog::info("REDIS_QUEUE_OUT CLASS:$class METHOD:$method PARAMS:".json_encode($params));
try{
//throw new Exception("Value must be 1 or below");
$queue_process=new$class();
// check this method is exist, if not throw ReflectionException
newReflectionMethod($queue_process,$method);
call_user_func_array(array($queue_process,$method),array($params));
}catch(Exception$e){
$errmsg=$e->getMessage();
EdjLog::error("execption queue_run method:$method err: $errmsg");
}
}
publicfunctiongetLengthByType($type){
$type=empty($type)?"error":strtolower($type);
$base_qname=QNameManagerService::model()->get_base_qname($type);
$zone=$this->select_redis($base_qname);
$key="queue_".$base_qname;
$len=0;
if($zone){
$len=$zone["redis"]->lLen($key);
}else{
EdjLog::error("can not find zone, queue name: ".$base_qname);
}
return$len;
}
}
以上是 PHP操作使用消息中间件Kafka 的全部内容, 来源链接: utcz.com/z/516793.html