docker+redis+beanstalkd+swoole构建健壮的队列

编程

#从仓库里将redis和beanstalkd下载

docker pull redis:5.0.7

docker pull schickling/beanstalkd

#查看镜像列表

docker images

#将beanstalkd运行在docker容器并映射到本地主机11300端口

docker run --name beanstalkd -d -it -p 11300:11300 428

docker run --name redis01 -d -it -p 6380:6379 redis:5.0.7

d:

cd D:phpstudy_proWWWtestbeanstalkd

#安装

composer require pda/pheanstalk:3.1

composer require predis/predis:1.1

Bash

Copy

 

性能

可靠性(ack应答)

可扩展性

kafka

8w/s

不可靠

集群

rabitMQ

4w/s

可靠

集群

redis

8w/s

不可靠

集群

beanstalk

8w/s

可靠

手动构建

beanstalkd 是一个高性能、轻量级的内存队列系统
beanstalkd特性
1.支持优先级(支持队伍插队)
2.延迟(实现定时任务)
3.持久化(定时把内存中的数据刷到binlog日志)
4.预留(把任务设置成预留,消费者无法取出任务,等某个合适时机再拿出来处理)
5.任务超时重发(消费者必须在指定时间内处理任务,如果没有则认为任务失败重新进入队列)
beanstalkd核心元素
生产者->管道(tube)->任务(job)->消费者

job:一个需要异步处理的任务,需要放在一个tube中
tube:一个有名字的任务队列,用来存储统一类型的job,可以创建多个管道
producer:job的生产者
consumer:job的消费者

流程:由producer产生一个任务job,并将任务job推进到一个tube中,然后由consumer从tube中取出job执行

composer.json

"require":{

"pda/pheanstalk":"^3.1",

"predis/predis": "^1.1",

}

生产者产生任务

<?php

//producer.php

require"vendor/autoload.php";

try{

$p=newPheanstalkPheanstalk("127.0.0.1",11300);

//swoole_timer_tick 定时器

swoole_timer_tick(10,function()use($p)){

$data=[

"msg_id"=>session_create_id(),//php7.1新出的生成随机的id

"tid"=>time().uniqid(),

];

$id=$p->putInTube("task",json_encode($data));//放到管道

var_dump($id);

}

}catch(Exception $e){

}

PHP

Copy

消费者执行任务

<?php

//consumer.php

require"vendor/autoload.php";

//假如执行一个任务需要5秒,那10个任务就需要50秒,如何提高效率

//获取任务数据,根据任务类型,来执行业务(多进程编程)

$workerNum=5;

$pool=newSwooleProcessPool($workerNum);

//进程启动

$pool->on("WorderStart",function($pool,$workerId){

echo"Worder#".$workderId."is started";

try{

$p=newPheanstalkPheanstalk("127.0.0.1",11300);

$redis=newPredisClient("tcp://127.0.0.1:6380");

//从管道中取出任务

//如果没有数据会挂起等待数据,所以while不会进入死循环

while(true){

$job=$p->watch("task")->reserve();

if(!$empty($job)){

$json=$job->getData();

$data=json_decode($json,true);

$state=$redis->get("job:".$data["msg_id"]);

if($state==1){//有消费者正在执行当中

$p->release($job,0,5);//延迟5秒继续投递任务

}elseif($state==2){//已经执行过了

continue;

}else{

//setex(key,seconds,value)

$redis->setex("job:".$data["msg_id"],6,1);

//进行其他的操作 比如发送短信 加积分

sleep(5);

$redis->set("job:".$data["msg_id"],2);

//ack应答机制设计是为了消息的可靠性

$p->delete($job);

}

}

}

}catch(Exception $e){

}

});

$pool->on("WorderStop",function($pool,$workerId){

echo"Worder#".$workderId."is stopped";

});

PHP

Copy

以上是 docker+redis+beanstalkd+swoole构建健壮的队列 的全部内容, 来源链接: utcz.com/z/515318.html

回到顶部