RabbitMQ入门教程(PHP)实现延迟功能

编程

php 使用rabbitmq-delayed-message-exchange插件实现延迟功能

1.安装

  • 3.6.x下载地址

  • 3.7.x下载地址

下载后解压,并将其拷贝至(使用Linux Debian/RPM部署)rabbitmq服务器目录:/usr/local/rabbitmq/plugins中( windows安装目录abbitmq_server-versionplugins )。

2.启用插件

使用命令rabbitmq-plugins enable rabbitmq_delayed_message_exchang启用插件

rabbitmq-plugins enable rabbitmq_delayed_message_exchang

输出如下:

The following plugins have been enabled:

rabbitmq_delayed_message_exchange

通过rabbitmq-plugins list查看已安装列表,如下:

[ ] rabbitmq_delayed_message_exchange 20171215-3.6.x

3.机制解释

安装插件后会生成新的Exchange类型x-delayed-message,该类型消息支持延迟投递机制,接收到消息后并未立即将消息投递至目标队列中,而是存储在mnesia(一个分布式数据系统)表中,检测消息延迟时间,如达到可投递时间时并将其通过x-delayed-type类型标记的交换机类型投递至目标队列。

 

4.php实现过程

消费者 delay_consumer2.php:

<?php

//header("Content-Type:text/html;charset=utf8;");

$params = array(

"exchangeName" => "delayed_exchange_test",

"queueName" => "delayed_queue_test",

"routeKey" => "delayed_route_test",

);

$connectConfig = array(

"host" => "localhost",

"port" => 5672,

"login" => "guest",

"password" => "guest",

"vhost" => "/"

);

//var_dump(extension_loaded("amqp"));

//exit();

try {

$conn = new AMQPConnection($connectConfig);

$conn->connect();

if (!$conn->isConnected()) {

//die("Conexiune esuata");

//TODO 记录日志

echo"rabbit-mq 连接错误:", json_encode($connectConfig);

exit();

}

$channel = new AMQPChannel($conn);

if (!$channel->isConnected()) {

// die("Connection through channel failed");

//TODO 记录日志

echo"rabbit-mq Connection through channel failed:", json_encode($connectConfig);

exit();

}

$exchange = new AMQPExchange($channel);

//$exchange->setFlags(AMQP_DURABLE);//声明一个已存在的交换器的,如果不存在将抛出异常,这个一般用在consume端

$exchange->setName($params["exchangeName"]);

$exchange->setType("x-delayed-message"); //x-delayed-message类型

/*RabbitMQ常用的Exchange Type有三种:fanout、direct、topic。

fanout:把所有发送到该Exchange的消息投递到所有与它绑定的队列中。

direct:把消息投递到那些binding key与routing key完全匹配的队列中。

topic:将消息路由到binding key与routing key模式匹配的队列中。*/

$exchange->setArgument("x-delayed-type","direct");

$exchange->declareExchange();

//$channel->startTransaction();

$queue = new AMQPQueue($channel);

$queue->setName($params["queueName"]);

$queue->setFlags(AMQP_DURABLE);

$queue->declareQueue();

//绑定

$queue->bind($params["exchangeName"], $params["routeKey"]);

} catch(Exception $e) {

echo$e->getMessage();

exit();

}

function callback(AMQPEnvelope $message) {

global $queue;

if ($message) {

$body = $message->getBody();

echo"接收时间:".date("Y-m-d H:i:s", time()). PHP_EOL;

echo"接收内容:".$body . PHP_EOL;

//为了防止接收端在处理消息时down掉,只有在消息处理完成后才发送ack消息

$queue->ack($message->getDeliveryTag());

} else {

echo"no message" . PHP_EOL;

}

}

//$queue->consume("callback"); 第一种消费方式,但是会阻塞,程序一直会卡在此处

//第二种消费方式,非阻塞

/*$start = time();

while(true)

{

$message = $queue->get();

if(!empty($message))

{

echo$message->getBody();

$queue->ack($message->getDeliveryTag()); //应答,代表该消息已经消费

$end = time();

echo"<br>" . ($end - $start);

exit();

}

else

{

//echo"message not found" . PHP_EOL;

}

}*/

//注意:这里需要注意的是这个方法:$queue->consume,queue对象有两个方法可用于取消息:consume和get。前者是阻塞的,无消息时会被挂起,适合循环中使用;后者则是非阻塞的,取消息时有则取,无则返回false

//就是说用了consume之后,会同步阻塞,该程序常驻内存,不能用nginx,apache调用。

$action = "2";

if($action == "1"){

$queue->consume("callback"); //第一种消费方式,但是会阻塞,程序一直会卡在此处

}else{

//第二种消费方式,非阻塞

$start = time();

while(true)

{

$message = $queue->get();

if(!empty($message))

{

echo"接收时间:".date("Y-m-d H:i:s", time()). PHP_EOL;

echo"接收内容:".$message->getBody().PHP_EOL;

$queue->ack($message->getDeliveryTag()); //应答,代表该消息已经消费

$end = time();

echo"运行时间:".($end - $start)."秒".PHP_EOL;

//exit();

}

else

{

//echo"message not found" . PHP_EOL;

}

}

}

生产者delay_publisher2.php:

<?php

//header("Content-Type:text/html;charset=utf-8;");

$params = array(

"exchangeName" => "delayed_exchange_test",

"queueName" => "delayed_queue_test",

"routeKey" => "delayed_route_test",

);

$connectConfig = array(

"host" => "localhost",

"port" => 5672,

"login" => "guest",

"password" => "guest",

"vhost" => "/"

);

//var_dump(extension_loaded("amqp")); 判断是否加载amqp扩展

//exit();

try {

$conn = new AMQPConnection($connectConfig);

$conn->connect();

if (!$conn->isConnected()) {

//die("Conexiune esuata");

//TODO 记录日志

echo"rabbit-mq 连接错误:", json_encode($connectConfig);

exit();

}

$channel = new AMQPChannel($conn);

if (!$channel->isConnected()) {

// die("Connection through channel failed");

//TODO 记录日志

echo"rabbit-mq Connection through channel failed:", json_encode($connectConfig);

exit();

}

$exchange = new AMQPExchange($channel);

$exchange->setName($params["exchangeName"]);

$exchange->setType("x-delayed-message"); //x-delayed-message类型

/*RabbitMQ常用的Exchange Type有三种:fanout、direct、topic。

fanout:把所有发送到该Exchange的消息投递到所有与它绑定的队列中。

direct:把消息投递到那些binding key与routing key完全匹配的队列中。

topic:将消息路由到binding key与routing key模式匹配的队列中。*/

$exchange->setArgument("x-delayed-type","direct");

$exchange->declareExchange();

//$channel->startTransaction();

//RabbitMQ不容许声明2个相同名称、配置不同的Queue,否则报错

$queue = new AMQPQueue($channel);

$queue->setName($params["queueName"]);

$queue->setFlags(AMQP_DURABLE);

$queue->declareQueue();

//绑定队列和交换机

$queue->bind($params["exchangeName"], $params["routeKey"]);

//$channel->commitTransaction();

} catch(Exception $e) {

}

for($i=5;$i>0;$i--){

//生成消息

echo"发送时间:".date("Y-m-d H:i:s", time()).PHP_EOL;

echo"i=".$i.",延迟".$i."秒".PHP_EOL;

$message = json_encode(["order_id"=>time(),"i"=>$i]);

$exchange->publish($message, $params["routeKey"], AMQP_NOPARAM, ["headers"=>["x-delay"=> 1000*$i]]);

sleep(2);

}

$conn->disconnect();

对于代码来讲,首先对于消费者核心代码

$exchange->setType("x-delayed-message"); //x-delayed-message类型

$exchange->setArgument("x-delayed-type","direct");

生产者核心代码

$exchange = new AMQPExchange($channel);

$exchange->setName($params["exchangeName"]);

$exchange->setType("x-delayed-message"); //x-delayed-message类型

$exchange->setArgument("x-delayed-type","direct");

$exchange->declareExchange();

使用方法:先运行delay_consumer1.php,再运行delay_publisher1.php

运行效果:

 

 

以上是 RabbitMQ入门教程(PHP)实现延迟功能 的全部内容, 来源链接: utcz.com/z/515516.html

回到顶部