RabbitMQ入门教程(PHP)实现延迟功能
php 使用rabbitmq-delayed-message-exchange插件实现延迟功能
1.安装
3.6.x下载地址
3.7.x下载地址
下载后解压,并将其拷贝至(使用Linux Debian/RPM部署)rabbitmq服务器目录:/usr/local/rabbitmq/plugins中( windows安装目录abbitmq_server-versionplugins )。
2.启用插件
使用命令rabbitmq-plugins enable rabbitmq_delayed_message_exchang启用插件
rabbitmq-plugins enable rabbitmq_delayed_message_exchang
输出如下:
The following plugins have been enabled: rabbitmq_delayed_message_exchange
通过rabbitmq-plugins list查看已安装列表,如下:
[ ] rabbitmq_delayed_message_exchange 20171215-3.6.x
3.机制解释
安装插件后会生成新的Exchange类型x-delayed-message,该类型消息支持延迟投递机制,接收到消息后并未立即将消息投递至目标队列中,而是存储在mnesia(一个分布式数据系统)表中,检测消息延迟时间,如达到可投递时间时并将其通过x-delayed-type类型标记的交换机类型投递至目标队列。
4.php实现过程
消费者 delay_consumer2.php:
<?php//header("Content-Type:text/html;charset=utf8;");
$params = array(
"exchangeName" => "delayed_exchange_test",
"queueName" => "delayed_queue_test",
"routeKey" => "delayed_route_test",
);
$connectConfig = array(
"host" => "localhost",
"port" => 5672,
"login" => "guest",
"password" => "guest",
"vhost" => "/"
);
//var_dump(extension_loaded("amqp"));
//exit();
try {
$conn = new AMQPConnection($connectConfig);
$conn->connect();
if (!$conn->isConnected()) {
//die("Conexiune esuata");
//TODO 记录日志
echo"rabbit-mq 连接错误:", json_encode($connectConfig);
exit();
}
$channel = new AMQPChannel($conn);
if (!$channel->isConnected()) {
// die("Connection through channel failed");
//TODO 记录日志
echo"rabbit-mq Connection through channel failed:", json_encode($connectConfig);
exit();
}
$exchange = new AMQPExchange($channel);
//$exchange->setFlags(AMQP_DURABLE);//声明一个已存在的交换器的,如果不存在将抛出异常,这个一般用在consume端
$exchange->setName($params["exchangeName"]);
$exchange->setType("x-delayed-message"); //x-delayed-message类型
/*RabbitMQ常用的Exchange Type有三种:fanout、direct、topic。
fanout:把所有发送到该Exchange的消息投递到所有与它绑定的队列中。
direct:把消息投递到那些binding key与routing key完全匹配的队列中。
topic:将消息路由到binding key与routing key模式匹配的队列中。*/
$exchange->setArgument("x-delayed-type","direct");
$exchange->declareExchange();
//$channel->startTransaction();
$queue = new AMQPQueue($channel);
$queue->setName($params["queueName"]);
$queue->setFlags(AMQP_DURABLE);
$queue->declareQueue();
//绑定
$queue->bind($params["exchangeName"], $params["routeKey"]);
} catch(Exception $e) {
echo$e->getMessage();
exit();
}
function callback(AMQPEnvelope $message) {
global $queue;
if ($message) {
$body = $message->getBody();
echo"接收时间:".date("Y-m-d H:i:s", time()). PHP_EOL;
echo"接收内容:".$body . PHP_EOL;
//为了防止接收端在处理消息时down掉,只有在消息处理完成后才发送ack消息
$queue->ack($message->getDeliveryTag());
} else {
echo"no message" . PHP_EOL;
}
}
//$queue->consume("callback"); 第一种消费方式,但是会阻塞,程序一直会卡在此处
//第二种消费方式,非阻塞
/*$start = time();
while(true)
{
$message = $queue->get();
if(!empty($message))
{
echo$message->getBody();
$queue->ack($message->getDeliveryTag()); //应答,代表该消息已经消费
$end = time();
echo"<br>" . ($end - $start);
exit();
}
else
{
//echo"message not found" . PHP_EOL;
}
}*/
//注意:这里需要注意的是这个方法:$queue->consume,queue对象有两个方法可用于取消息:consume和get。前者是阻塞的,无消息时会被挂起,适合循环中使用;后者则是非阻塞的,取消息时有则取,无则返回false。
//就是说用了consume之后,会同步阻塞,该程序常驻内存,不能用nginx,apache调用。
$action = "2";
if($action == "1"){
$queue->consume("callback"); //第一种消费方式,但是会阻塞,程序一直会卡在此处
}else{
//第二种消费方式,非阻塞
$start = time();
while(true)
{
$message = $queue->get();
if(!empty($message))
{
echo"接收时间:".date("Y-m-d H:i:s", time()). PHP_EOL;
echo"接收内容:".$message->getBody().PHP_EOL;
$queue->ack($message->getDeliveryTag()); //应答,代表该消息已经消费
$end = time();
echo"运行时间:".($end - $start)."秒".PHP_EOL;
//exit();
}
else
{
//echo"message not found" . PHP_EOL;
}
}
}
生产者delay_publisher2.php:
<?php//header("Content-Type:text/html;charset=utf-8;");
$params = array(
"exchangeName" => "delayed_exchange_test",
"queueName" => "delayed_queue_test",
"routeKey" => "delayed_route_test",
);
$connectConfig = array(
"host" => "localhost",
"port" => 5672,
"login" => "guest",
"password" => "guest",
"vhost" => "/"
);
//var_dump(extension_loaded("amqp")); 判断是否加载amqp扩展
//exit();
try {
$conn = new AMQPConnection($connectConfig);
$conn->connect();
if (!$conn->isConnected()) {
//die("Conexiune esuata");
//TODO 记录日志
echo"rabbit-mq 连接错误:", json_encode($connectConfig);
exit();
}
$channel = new AMQPChannel($conn);
if (!$channel->isConnected()) {
// die("Connection through channel failed");
//TODO 记录日志
echo"rabbit-mq Connection through channel failed:", json_encode($connectConfig);
exit();
}
$exchange = new AMQPExchange($channel);
$exchange->setName($params["exchangeName"]);
$exchange->setType("x-delayed-message"); //x-delayed-message类型
/*RabbitMQ常用的Exchange Type有三种:fanout、direct、topic。
fanout:把所有发送到该Exchange的消息投递到所有与它绑定的队列中。
direct:把消息投递到那些binding key与routing key完全匹配的队列中。
topic:将消息路由到binding key与routing key模式匹配的队列中。*/
$exchange->setArgument("x-delayed-type","direct");
$exchange->declareExchange();
//$channel->startTransaction();
//RabbitMQ不容许声明2个相同名称、配置不同的Queue,否则报错
$queue = new AMQPQueue($channel);
$queue->setName($params["queueName"]);
$queue->setFlags(AMQP_DURABLE);
$queue->declareQueue();
//绑定队列和交换机
$queue->bind($params["exchangeName"], $params["routeKey"]);
//$channel->commitTransaction();
} catch(Exception $e) {
}
for($i=5;$i>0;$i--){
//生成消息
echo"发送时间:".date("Y-m-d H:i:s", time()).PHP_EOL;
echo"i=".$i.",延迟".$i."秒".PHP_EOL;
$message = json_encode(["order_id"=>time(),"i"=>$i]);
$exchange->publish($message, $params["routeKey"], AMQP_NOPARAM, ["headers"=>["x-delay"=> 1000*$i]]);
sleep(2);
}
$conn->disconnect();
对于代码来讲,首先对于消费者核心代码
$exchange->setType("x-delayed-message"); //x-delayed-message类型$exchange->setArgument("x-delayed-type","direct");
生产者核心代码
$exchange = new AMQPExchange($channel);$exchange->setName($params["exchangeName"]);
$exchange->setType("x-delayed-message"); //x-delayed-message类型
$exchange->setArgument("x-delayed-type","direct");
$exchange->declareExchange();
使用方法:先运行delay_consumer1.php,再运行delay_publisher1.php
运行效果:
以上是 RabbitMQ入门教程(PHP)实现延迟功能 的全部内容, 来源链接: utcz.com/z/515516.html