PHP使用Beanstalkd实例

编程

1.使用Composer安装Pheanstalk

composer require pda/pheanstalk

2.实现代码

php查看beanstalkd状态脚本Status.php

<?php

/**

* Created by PhpStorm.

* User: jmsite.cn

* Date: 2019/1/21

* Time: 10:32

*/

require"../vendor/autoload.php";

usePheanstalkPheanstalk;

$pheanstalk = new Pheanstalk("192.168.75.135",11300);

print_r($pheanstalk->stats());

生产者代码Producter.php

<?php

/**

* Created by PhpStorm.

* User: jmsite.cn

* Date: 2019/1/20

* Time: 16:30

*/

require"../vendor/autoload.php";

usePheanstalkPheanstalk;

$pheanstalk = new Pheanstalk("192.168.75.135",11300);

for ($i=0;$i<50;$i++){

$data = array(

"key" => "testkey".$i,

"value" => "testvalue",

"time" => time(),

);

$ret = $pheanstalk->putInTube("test-tube", json_encode($data), Pheanstalk::DEFAULT_PRIORITY, Pheanstalk::DEFAULT_DELAY, Pheanstalk::DEFAULT_TTR);

var_dump($ret);

}

消费者代码Consumer.php

<?php

/**

* Created by PhpStorm.

* User: jmsite.cn

* Date: 2019/1/20

* Time: 16:31

*/

set_time_limit(0);

ini_set("default_socket_timeout", 900);

require"../vendor/autoload.php";

usePheanstalkPheanstalk;

$pheanstalk = new Pheanstalk("192.168.75.135",11300);

while (true){

$job = $pheanstalk

->watch("test-tube")

->ignore("default")

->reserve();

if ($job){

sleep(2);

echo $job->getData();

echo"

";

$pheanstalk->delete($job);

}

}

打开命令行/终端窗口,执行生产者,会向tube写入50条任务

PS E:epositoryworkeanstalk> php .Producter.php

int(101)

int(102)

int(103)

int(104)

int(105)

int(106)

int(107)

int(108)

int(109)

int(110)

int(111)

int(112)

int(113)

int(114)

......

由此可见,$pheanstalk->putInTube成功后返回的是job的id
查看状态

PS E:epositoryworkeanstalk> php Status.php

PheanstalkResponseArrayResponse Object

(

[_name:PheanstalkResponseArrayResponse:private] => OK

[storage:ArrayObject:private] => Array

(

[current-jobs-urgent] => 0

[current-jobs-ready] => 50

[current-jobs-reserved] => 0

[current-jobs-delayed] => 0

[current-jobs-buried] => 0

......

结果中显示处于ready待读取状态的job是50个
打开两个或以上命令行/终端窗口,执行消费者,模拟多消费者竞争
消费者1

PS E:epositoryworkeanstalk> php .Consumer.php

{"key":"testkey0","value":"testvalue","time":1548039103}

{"key":"testkey1","value":"testvalue","time":1548039103}

{"key":"testkey2","value":"testvalue","time":1548039103}

{"key":"testkey4","value":"testvalue","time":1548039103}

{"key":"testkey6","value":"testvalue","time":1548039103}

{"key":"testkey8","value":"testvalue","time":1548039103}

{"key":"testkey10","value":"testvalue","time":1548039103}

{"key":"testkey12","value":"testvalue","time":1548039103}

{"key":"testkey14","value":"testvalue","time":1548039103}

{"key":"testkey16","value":"testvalue","time":1548039103}

{"key":"testkey18","value":"testvalue","time":1548039103}

{"key":"testkey20","value":"testvalue","time":1548039103}

{"key":"testkey22","value":"testvalue","time":1548039103}

{"key":"testkey24","value":"testvalue","time":1548039103}

{"key":"testkey26","value":"testvalue","time":1548039103}

{"key":"testkey28","value":"testvalue","time":1548039103}

{"key":"testkey30","value":"testvalue","time":1548039103}

{"key":"testkey32","value":"testvalue","time":1548039103}

{"key":"testkey34","value":"testvalue","time":1548039103}

{"key":"testkey36","value":"testvalue","time":1548039103}

{"key":"testkey38","value":"testvalue","time":1548039103}

{"key":"testkey40","value":"testvalue","time":1548039103}

{"key":"testkey42","value":"testvalue","time":1548039103}

{"key":"testkey44","value":"testvalue","time":1548039103}

{"key":"testkey46","value":"testvalue","time":1548039103}

{"key":"testkey48","value":"testvalue","time":1548039103}

消费者2

PS E:epositoryworkeanstalk> php .Consumer.php

{"key":"testkey3","value":"testvalue","time":1548039103}

{"key":"testkey5","value":"testvalue","time":1548039103}

{"key":"testkey7","value":"testvalue","time":1548039103}

{"key":"testkey9","value":"testvalue","time":1548039103}

{"key":"testkey11","value":"testvalue","time":1548039103}

{"key":"testkey13","value":"testvalue","time":1548039103}

{"key":"testkey15","value":"testvalue","time":1548039103}

{"key":"testkey17","value":"testvalue","time":1548039103}

{"key":"testkey19","value":"testvalue","time":1548039103}

{"key":"testkey21","value":"testvalue","time":1548039103}

{"key":"testkey23","value":"testvalue","time":1548039103}

{"key":"testkey25","value":"testvalue","time":1548039103}

{"key":"testkey27","value":"testvalue","time":1548039103}

{"key":"testkey29","value":"testvalue","time":1548039103}

{"key":"testkey31","value":"testvalue","time":1548039103}

{"key":"testkey33","value":"testvalue","time":1548039103}

{"key":"testkey35","value":"testvalue","time":1548039103}

{"key":"testkey37","value":"testvalue","time":1548039103}

{"key":"testkey39","value":"testvalue","time":1548039103}

{"key":"testkey41","value":"testvalue","time":1548039103}

{"key":"testkey43","value":"testvalue","time":1548039103}

{"key":"testkey45","value":"testvalue","time":1548039103}

{"key":"testkey47","value":"testvalue","time":1548039103}

{"key":"testkey49","value":"testvalue","time":1548039103}

两个消费者竞争着完成了全部任务,由于我的beanstalkd启动时开启了binlog持久,所以beanstalkd重启后任务也不会丢失

3.需要注意的事项

1.创建job时,设置的超时时间Pheanstalk::DEFAULT_TTR一定要比消费者处理一个job的时间要长,否则job在超时之后会被tube更改为ready状态,被其他消费者获取,而此时当前消费者还在处理该job,这就出现了一个job被多个消费者重复执行的可怕现象
2.Pheanstalk的维护者发生了变化,在新版的Pheanstalk中是不支持长连接的,当客户端socket连接服务器时间超过php.ini中设置的default_socket_timeout时,如果未能从服务端tube获得job,连接将会被断开,所以消费者进程需要维护,以便在退出后可以重新开启进程,推荐使用supervisord维护消费者进程。
判断socket超时的代码

publicfunctiongetLine($length = null)

{

$timeout = ini_get("default_socket_timeout");

$timer = microtime(true);

do {

$data = isset($length) ?

$this->_wrapper()->fgets($this->_socket, $length) :

$this->_wrapper()->fgets($this->_socket);

if ($this->_wrapper()->feof($this->_socket)) {

thrownewExceptionSocketException("Socket closed by server!");

}

if (($data === false) && microtime(true) - $timer > $timeout) {

$this->disconnect();

thrownewExceptionSocketException("Socket timed out!");

}

} while ($data === false);

return rtrim($data);

}

以上是 PHP使用Beanstalkd实例 的全部内容, 来源链接: utcz.com/z/512854.html

回到顶部