PHP实现RabbitMQ消息队列

编程

先安装PHP对应的RabbitMQ,这里用的是 php_amqp 不同的扩展实现方式会有细微的差异.
php扩展地址: http://pecl.php.net/package/amqp
具体以官网为准 http://www.rabbitmq.com/getstarted.html

介绍

config.php 配置信息
BaseMQ.php MQ基类
ProductMQ.php 生产者类
ConsumerMQ.php 消费者类
Consumer2MQ.php 消费者2(可有多个)

config.php

<?php

return [

//配置

"host" => [

"host" => "127.0.0.1",

"port" => "5672",

"login" => "guest",

"password" => "guest",

"vhost"=>"/",

],

//交换机

"exchange"=>"word",

//路由

"routes" => [],

];

 BaseMQ.php

<?php

/**

* Created by PhpStorm.

* User: pc

* Date: 2018/12/13

* Time: 14:11

*/

namespace MyObjSummaryabbitMQ;

/** Member

* AMQPChannel

* AMQPConnection

* AMQPEnvelope

* AMQPExchange

* AMQPQueue

* Class BaseMQ

* @package MyObjSummaryabbitMQ

*/

class BaseMQ

{

/** MQ Channel

* @var AMQPChannel

*/

public $AMQPChannel ;

/** MQ Link

* @var AMQPConnection

*/

public $AMQPConnection ;

/** MQ Envelope

* @var AMQPEnvelope

*/

public $AMQPEnvelope ;

/** MQ Exchange

* @var AMQPExchange

*/

public $AMQPExchange ;

/** MQ Queue

* @var AMQPQueue

*/

public $AMQPQueue ;

/** conf

* @var

*/

public $conf ;

/** exchange

* @var

*/

public $exchange ;

/** link

* BaseMQ constructor.

* @throws AMQPConnectionException

*/

public function __construct()

{

$conf = require "config.php" ;

if(!$conf)

throw new AMQPConnectionException("config error!");

$this->conf = $conf["host"] ;

$this->exchange = $conf["exchange"] ;

$this->AMQPConnection = new AMQPConnection($this->conf);

if (!$this->AMQPConnection->connect())

throw new AMQPConnectionException("Cannot connect to the broker!

");

}

/**

* close link

*/

public function close()

{

$this->AMQPConnection->disconnect();

}

/** Channel

* @return AMQPChannel

* @throws AMQPConnectionException

*/

public function channel()

{

if(!$this->AMQPChannel) {

$this->AMQPChannel = new AMQPChannel($this->AMQPConnection);

}

return $this->AMQPChannel;

}

/** Exchange

* @return AMQPExchange

* @throws AMQPConnectionException

* @throws AMQPExchangeException

*/

public function exchange()

{

if(!$this->AMQPExchange) {

$this->AMQPExchange = new AMQPExchange($this->channel());

$this->AMQPExchange->setName($this->exchange);

}

return $this->AMQPExchange ;

}

/** queue

* @return AMQPQueue

* @throws AMQPConnectionException

* @throws AMQPQueueException

*/

public function queue()

{

if(!$this->AMQPQueue) {

$this->AMQPQueue = new AMQPQueue($this->channel());

}

return $this->AMQPQueue ;

}

/** Envelope

* @return AMQPEnvelope

*/

public function envelope()

{

if(!$this->AMQPEnvelope) {

$this->AMQPEnvelope = new AMQPEnvelope();

}

return $this->AMQPEnvelope;

}

}

 ProductMQ.php

<?php

//生产者 P

namespace MyObjSummaryabbitMQ;

require "BaseMQ.php";

class ProductMQ extends BaseMQ

{

private $routes = ["hello","word"]; //路由key

/**

* ProductMQ constructor.

* @throws AMQPConnectionException

*/

public function __construct()

{

parent::__construct();

}

/** 只控制发送成功 不接受消费者是否收到

* @throws AMQPChannelException

* @throws AMQPConnectionException

* @throws AMQPExchangeException

*/

public function run()

{

//频道

$channel = $this->channel();

//创建交换机对象

$ex = $this->exchange();

//消息内容

$message = "product message ".rand(1,99999);

//开始事务

$channel->startTransaction();

$sendEd = true ;

foreach ($this->routes as $route) {

$sendEd = $ex->publish($message, $route) ;

echo "Send Message:".$sendEd."

";

}

if(!$sendEd) {

$channel->rollbackTransaction();

}

$channel->commitTransaction(); //提交事务

$this->close();

die ;

}

}

try{

(new ProductMQ())->run();

}catch (Exception $exception){

var_dump($exception->getMessage()) ;

}

ConsumerMQ.php

<?php

//消费者 C

namespace MyObjSummaryabbitMQ;

require "BaseMQ.php";

class ConsumerMQ extends BaseMQ

{

private $q_name = "hello"; //队列名

private $route = "hello"; //路由key

/**

* ConsumerMQ constructor.

* @throws AMQPConnectionException

*/

public function __construct()

{

parent::__construct();

}

/** 接受消息 如果终止 重连时会有消息

* @throws AMQPChannelException

* @throws AMQPConnectionException

* @throws AMQPExchangeException

* @throws AMQPQueueException

*/

public function run()

{

//创建交换机

$ex = $this->exchange();

$ex->setType(AMQP_EX_TYPE_DIRECT); //direct类型

$ex->setFlags(AMQP_DURABLE); //持久化

//echo "Exchange Status:".$ex->declare()."

";

//创建队列

$q = $this->queue();

//var_dump($q->declare());exit();

$q->setName($this->q_name);

$q->setFlags(AMQP_DURABLE); //持久化

//echo "Message Total:".$q->declareQueue()."

";

//绑定交换机与队列,并指定路由键

echo "Queue Bind: ".$q->bind($this->exchange, $this->route)."

";

//阻塞模式接收消息

echo "Message:

";

while(True){

$q->consume(function ($envelope,$queue){

$msg = $envelope->getBody();

echo $msg."

"; //处理消息

$queue->ack($envelope->getDeliveryTag()); //手动发送ACK应答

});

//$q->consume("processMessage", AMQP_AUTOACK); //自动ACK应答

}

$this->close();

}

}

try{

(new ConsumerMQ)->run();

}catch (Exception $exception){

var_dump($exception->getMessage()) ;

}

以上内容希望帮助到大家,很多PHPer在进阶的时候总会遇到一些问题和瓶颈,业务代码写多了没有方向感,不知道该从那里入手去提升,对此我整理了一些资料,包括但不限于:分布式架构、高可扩展、高性能、高并发、服务器性能调优、TP6,laravel,YII2,Redis,Swoole、Swoft、Kafka、Mysql优化、shell脚本、Docker、微服务、Nginx等多个知识点高级进阶干货需要的可以免费分享给大家,需要戳这里 PHP进阶架构师>>>视频、面试文档免费获取

以上是 PHP实现RabbitMQ消息队列 的全部内容, 来源链接: utcz.com/z/514465.html

回到顶部