封装php的RabbitMq

编程

简单封装了一个rabbitmq类(业务代码随便写的)

首先是账号密码配置

config.php

<?php

return $arr = [

"RabbitMq" => [

// Rabbitmq 服务地址

"host" => "127.0.0.1",

// Rabbitmq 服务端口

"port" => "5672",

// Rabbitmq 帐号

"login" => "guest",

// Rabbitmq 密码

"password" => "guest",

"vhost"=>"/"

]

];

基类 base.php

<?php

include dirname(__FILE__)."/object.php";

include dirname(__FILE__)."/config.php";

class RabbitMq implements object

{

//保存类实例的静态成员变量

static private $_instance;

static private $_conn;

static private $amp ;

static private $route = "key_1";

static private $q ;

static private $ex ;

static private $queue;

public static function getInstance(){

global $arr;

if (!(self::$_instance instanceof self)) {

self::$_instance = new self($arr["RabbitMq"]);

return self::$_instance;

}

return self::$_instance;

}

private function __construct($conn)

{

//创建连接和channel

$conn = new AMQPConnection($conn);

if(!$conn->connect()) {

die("Cannot connect to the broker!

");

}

self::$_conn = new AMQPChannel($conn);

self::$amp = $conn;

}

/* *

*

* parm 交换机名

* parm 队列名

*

* */

public function listen($exchangeName,$queuename){

self::$queue = $queuename;

return $this->setExchange($exchangeName,$queuename);

}

//连接交换机

public function setExchange($exchangeName,$queueName){

//创建交换机

$ex = new AMQPExchange(self::$_conn);

self::$ex = $ex;

$ex->setName($exchangeName);

$ex->setType(AMQP_EX_TYPE_DIRECT); //direct类型

$ex->setFlags(AMQP_DURABLE); //持久化

$ex->declare();

return self::setQueue($queueName,$exchangeName);

}

//创建队列

private static function setQueue($queueName,$exchangeName){

// 创建队列

$q = new AMQPQueue(self::$_conn);

$q->setName($queueName);

$q->setFlags(AMQP_DURABLE);

$q->declareQueue();

// 用于绑定队列和交换机

$routingKey = self::$route;

$q->bind($exchangeName, $routingKey);

self::$q = $q;

return(self::$_instance);

}

/*

* 消费者

* $fun_name = array($classobj,$function) or function name string

* $autoack 是否自动应答

*

* function processMessage($envelope, $queue) {

$msg = $envelope->getBody();

echo $msg."

"; //处理消息

$queue->ack($envelope->getDeliveryTag());//手动应答

}

*/

public function run($func, $autoack = True){

if (!$func || !self::$q) return False;

while(True){

if ($autoack) {

if(!self::$q->consume($func, AMQP_AUTOACK)){

// self::$q->ack($envelope->getDeliveryTag());

//失败之后会默认进入 noack 队列。下次重新开启会再次调用,目前还不清楚 回调配置应该这里做一个失败反馈

             //todu

}

}

self::$q->consume($func);

}

}

private static function closeConn(){

self::$amp->disconnect();

}

public function pushlish($msg){

while (1) {

sleep(1);

if (self::$ex->publish(date("H:i:s") . "用户" . "注册", self::$route)) {

//写入文件等操作

echo $msg;

}

}

}

//__clone方法防止对象被复制克隆

public function __clone()

{

trigger_error("Clone is not allow!", E_USER_ERROR);

}

}

consume 监听类(一个操作对应一个class)

<?php

include dirname(__FILE__)."/base.php";

class Add

{

public static function run(){

$dbms="mysql"; //数据库类型

$host="127.0.01"; //数据库主机名

$dbName="test"; //使用的数据库

$user="root"; //数据库连接用户名

$pass="admin"; //对应的密码

$dsn="$dbms:host=$host;dbname=$dbName";

sleep(1);

try {

$dbh = new PDO($dsn, $user, $pass); //初始化一个PDO对象

/*你还可以进行一次搜索操作

foreach ($dbh->query("SELECT * from FOO") as $row) {

print_r($row); //你可以用 echo($GLOBAL); 来看到这些值

}

*/

$dbh = null;

} catch (PDOException $e) {

die ("Error!: " . $e->getMessage() . "<br/>");

}

//默认这个不是长连接,如果需要数据库长连接,需要最后加一个参数:array(PDO::ATTR_PERSISTENT => true) 变成这样:

$db = new PDO($dsn, $user, $pass, array(PDO::ATTR_PERSISTENT => true));

$sql = "INSERT INTO `test`.`t_reg`(`names`) VALUES (9)";

$row = $db->query($sql);

if(!$row){

return false;

}

echo "OK";

}

}

$consume = new Add();

//tudo

//$s = RabbitMq::getInstance()->listen("jiaohuanji","queue1")->run(array($consume,"run")); 将run函数带入到consume里面作为回调 在consume里面增加$funname ,增加代码粘性

$s = RabbitMq::getInstance()->listen("jiaohuanji","queue1")->run(array($consume,"run"));

push 类(发送者)

<?php

include "base.php";

RabbitMq::getInstance()->listen("jiaohuanji","queue1")->pushlish("请求已发送");

接口interface

<?php

interface object

{

public static function getInstance();

}

监听 add.php

执行 send.php 即可完成简单的rabit操作

以上内容希望帮助到大家, 很多PHPer在进阶的时候总会遇到一些问题和瓶颈,业务代码写多了没有方向感,不知道该从那里入手去提升,对此我整理了一些资料,包括但不限于:分布式架构、高可扩展、高性能、高并发、服务器性能调优、TP6,laravel,YII2,Redis,Swoole、Swoft、Kafka、Mysql优化、shell脚本、Docker、微服务、Nginx等多个知识点高级进阶干货需要的可以免费分享给大家 ,需要戳这里   PHP进阶架构师>>>视频、面试文档免费获取

以上是 封装php的RabbitMq 的全部内容, 来源链接: utcz.com/z/517026.html

回到顶部