php中RabbitMQ的使用

编程

什么是队列

消息队列(Message Queue)是一种应用间的通信方式,消息发送后可以立即返回。消息使用者再从MQ中取消息进行逻辑处理。对于消耗较大的请求,可以立马返回处理结果。减少服务器压力。为各个子系统之间解耦和异步处理。

rabbitmq的整体结构

rmq简单来说就是一个(生产/消费)的模型结构。消息生产者把数据丢到队列中,消息消费者从队列中取出数据进行逻辑处理。

那么如何确保,生产者添加的数据,能够到达指定的队列中呢?

rmq(消息队列)主要提供了三个概念(中间件?)来确保消息的分发,Exchange(交换机)、RoutingKey(路由)、Queue(队列)。

从上面的图也可以看出来, 处理消息的接收、分发,主要在Broker模块中。

Exchange 所有生产消息的入口都是到交换机这里。

exchange通过进来的路由(RoutingKey),去和已binding的规则进行匹配,找到指定的队列。

RoutingKey 我的理解,这里相当于一把钥匙。而binding的操作相当于一把锁头。

Queue 消息的存放区域,等到消费者来取。

Binding Exchange和Queue之间的一个绑定。

从这些概念来看,影响规则的主要是依赖Exchange。

 

那rmq提供了哪些类型,都有什么特点呢?

exchange类型

RabbitMQ提供了四种Exchange类型

  1. direct
  2. fanout
  3. topic
  4. header(header类型在实际使用中较少,所以在这里就不进行说明。)

Direct Exchange

direct 的规则比较简单。

在发布消息前,需要把exchange和queue做一个绑定。

如果发布消息的时候,RoutingKey 和绑定的值(key)一致。则将消息投递到该队列中。

如果不存在对应的队列,则消息会被丢弃。 (这时候访问rmq管理web时。可以看到消息进来,但是队列中没有值)

 

Fanout Exchange

fanout 类型则更简单一些。 只要exchange和队列做了绑定。发布的消息都会到队列中去。

 

Topic Exchange

相对来说 topic类型要复杂一些。 和direct类型相比。topic相当于模糊匹配,而direct为全等。类似mysql中 ‘like’关键词。

针对direct 类型写一个实例

实例分两部分 生产者、消费者(回调函数)

因为我的代码,对mq的部分做了封装,懒得拆分出来。 所以我只贴业务代码和封装的核心方法。

生产者代码

$mqModel = new Rabbitmq();   // 初始化(rmq连接操作)

$newResult = ["tom","bill","jack"];

if ($mqModel) {

$mqRoute = "push_data_to_crm_routing"; // 路由

$mqExchange = "push_data_to_crm_exchange"; // 交换机

$mqQuery = "push_data_to_crm_queue"; // 队列

// 建立连接,设置交换机,设置队列

$mqModel->setChannel()->setExchange($mqExchange,AMQP_EX_TYPE_DIRECT,AMQP_DURABLE)->setQueue($mqQuery,AMQP_DURABLE,$mqExchange,$mqRoute);

foreach ($newResult as $k => $v){

$push_data = $v;

$mqModel->publishMessage($push_data,$mqRoute); // 消息推送

}

}

消费者代码

$mqModel = new Rabbitmq();

// $mqRoute = "push_data_to_crm_routing"; 消费者用不上路由,因为不需要指定。 只要想取队列,消费即可。

$mqExchange = "push_data_to_crm_exchange";

$mqQuery = "push_data_to_crm_queue";

$mqModel->setChannel()->setExchange($mqExchange,"", AMQP_PASSIVE)->setQueue($mqQuery, AMQP_PASSIVE);

$zmq->consume(function($msg){

var_dump($msg);

return true;

});

封装类中的核心方法

//设置交换机

public function setExchange($changeName = "", $changeType = "", $flags = false) {

$errorMsg = "";

try{

if(!$this->channel){

throw new AMQPQueueException("Error channel on method setExchange", 1);

}

$this->exchange = new AMQPExchange($this->channel);

if($changeName){

$this->changeName = $changeName; // 交换机名称

$this->exchange->setName($changeName); // 设置名称

$changeType = $changeType ? $changeType : AMQP_EX_TYPE_DIRECT; // 交换机类型

}else{

$this->changeName = "";

}

if($changeType){

$this->changeType = $changeType;

$this->exchange->settype($changeType); // 设置交换机类型

}else{

$this->changeType = "";

}

if($flags){

$this->exchange->setFlags($flags); //交换机标志

}

if($changeType || $flags){

$this->exchange->declareExchange(); // 创建

}

} catch(AMQPQueueException $ex) {

$errorMsg = "AMQPQueueException error exchange: {$ex->getMessage()},

line: {$ex->getLine()}

";

} catch(Exception $ex) {

$errorMsg = "Exception error exchange: {$ex->getMessage()},

line: {$ex->getLine()}

";

}

if($errorMsg){

throw new Exception($errorMsg, 1);

}

return $this;

}

// 设置队列

public function setQueue($queueName = "", $flags = "", $exchange_name = "", $routing_key = "", $arguments=[] ){

$errorMsg = "";

try{

if(!$this->channel){

throw new AMQPQueueException("Error channel on method setQueue", 1);

}

$this->queue = new AMQPQueue($this->channel);

if(!$queueName){

return false;

}

$this->queueName = $queueName; // 队列名称

$this->queue->setName($queueName);

if($flags){

$this->queue->setFlags($flags); // 队列标志。与消息持久化有关。 这篇文字不涉及这一块的说明

}

if(is_array($arguments) && !empty($arguments)){

$this->queue->setArguments($arguments); // 参数配置

}

$this->queue->declareQueue(); // 创建一个队列

$exchange_name = $exchange_name === false ?

"" :

($exchange_name === true || !$exchange_name ? $this->changeName : $exchange_name);

$routing_key = $routing_key ? $routing_key : $this->queueName;

if($exchange_name && $routing_key ){

$this->queue->bind($exchange_name, $routing_key); // 交换机和队列的绑定操作

}

} catch(AMQPQueueException $ex) {

$errorMsg = "AMQPQueueException error queue: {$ex->getMessage()},

line: {$ex->getLine()}

";

} catch(Exception $ex) {

$errorMsg = "Exception error queue: {$ex->getMessage()},

line: {$ex->getLine()}

";

}

if($errorMsg){

throw new Exception($errorMsg, 1);

}

return $this;

}

// 发布消息

public function publishMessage($message = "", $routing_key = "", $flags = AMQP_NOPARAM, $attributes = []){

if(!$message){

return false;

}

$routing_key = $routing_key ? $routing_key : $this->queueName;

// 发布消息,带有路由key。如果需要,则会用于关联。

$this->exchange->publish($message, $routing_key, $flags, $attributes);

return true;

}

// 消费

public function consume($callback = null, $qos = 0, $isAct = true){

if($qos){

$this->channel->qos(0, $qos);

}

$errorMsg = "";

try{

if(!$this->queue){

throw new AMQPQueueException("Error queue on method consume", 1);

}

$this->callBackFnc = $callback;

$this->isAct = $isAct;

$callback = function($envelope, $queue){

if(is_callable($this->callBackFnc)){

call_user_func($this->callBackFnc, $envelope->getBody());

if($this->isAct){

$queue->ack($envelope->getDeliveryTag());

}else{

$queue->nack($envelope->getDeliveryTag());

}

}

};

$this->queue->consume($callback);

} catch(AMQPQueueException $ex) {

$errorMsg = "AMQPQueueException error queue: {$ex->getMessage()},

line: {$ex->getLine()}

";

} catch(Exception $ex) {

$errorMsg = "Exception error queue: {$ex->getMessage()},

line: {$ex->getLine()}

";

}

if($errorMsg){

throw new Exception($errorMsg, 1);

}

}

因为封装代码里写了很多 try catch 所以看起来特别乱。 还有部分兼容的逻辑。 看起来不舒服,就先删掉再看吧。

执行结果

先跑一遍生产者代码,这里可以用浏览器直接访问。 执行完了之后。 到rabbitmq 的web管理页面中查看。 发现消息已经正常添加到队列中。(web管理页面可查询别的文章开启)

这时候再执行消费者代码。 消费者代码需要在cli下执行。因为消费者为轮询等待,是死循环,无法在浏览器下执行。

终端输出结果,消息已经被消费掉了。 再次返回web管理页面,队列中的三条记录也没有了。

结尾

实际上,这样的一个流程,是可以直接在实际项目中使用的。

这就是我上一个项目上使用的rmq(仅限应用架构)。

只不过这是一个简略版的。为了确保子系统的独立。

消费者应该单独成一个小系统。

在回调函数上,使用curl请求业务应用。

这样两个系统分来。

业务逻辑还是在原应用中。

还没结束,到这里似乎缺少点人性化的东西。

因为,没建一个队列,都需要单独去运行一个消费者代码。

这样很繁琐,而且很多人都不会有权限直接操作服务器。

所以,这里还需要搭建一个平台,用来自动部署消费端代码运行,停止,更改。

如何去部署,那就需要思考了。

 

以上是 php中RabbitMQ的使用 的全部内容, 来源链接: utcz.com/z/515989.html

回到顶部