swoole学习笔记之task异步进程与消息列队
客户端<?php
$client=new swooleClient(SWOOLE_SOCK_TCP);
//发数据
$client->connect("127.0.0.1",9801);
$body="213";
$data=pack("N",strlen($body)).$body;//数据打包,包头包体,防止粘包现象
$client->send($data);
$client->close();
服务端<?php
//tcp协议
$server=new SwooleServer("0.0.0.0",9801); //创建server对象
//include "222xx"; 不能
$key=ftok(__DIR__,1);//得到消息队列的KEY
echo $key;
$server->set([
"worker_num"=>1, //设置进程
//"heartbeat_idle_time"=>10,//连接最大的空闲时间
//"heartbeat_check_interval"=>3 //服务器定时检查
"task_worker_num"=>1, //task进程数
"task_ipc_mode"=>2,
"message_queue_key"=>$key,//绑定消息队列key,绑定后task如果因为意外关闭导致消息队列的数据没有被消费掉,那么重启后task会自动继续消耗消息队列数据
"open_length_check"=>1,
"package_length_type"=>"N",//设置包头的长度
"package_length_offset"=>0, //包长度从哪里开始计算
"package_body_offset"=>4, //包体从第几个字节开始计算
]);
$server->on("start",function (){
// include "index.php"; 不能
});
$server->on("Shutdown",function (){
// include "index.php"; 不能
echo "正常关闭";
});
$server->on("workerStart",function ($server,$fd){
//include "index.php";
if($server->taskworker){
echo "task_worker:".$server->worker_id.PHP_EOL;
}else{
echo "worker:".$server->worker_id.PHP_EOL;
}
});
//监听事件,连接事件
$server->on("connect",function ($server,$fd){
//echo "新的连接进入xxx:{$fd}".PHP_EOL;
});
//消息发送过来
$server->on("receive",function (swoole_server $server, int $fd, int $reactor_id, string $data){
//var_dump("消息发送过来:".$data);
//不需要立刻马上得到结果的适合task
$data=["tid"=>time()];
//sleep(10);
$data=str_repeat("a",10*1024*1024);//模拟大文件,发送给task的数据最好不好超过8K,会影响性能,超过8K的文件会在/tmp/生成一个临时文件
//$data = "a";
$server->task($data); //投递到taskWorker进程组。第二个参数可以指定哪个task (0-(task数量-1)),在task_ipc_mode=3的模式下无效
echo "异步非阻塞".PHP_EOL;
//服务端
});
//ontask事件回调
$server->on("task",function ($server,$task_id,$form_id,$data){
//var_dump(posix_getpid()); //进程确实是发生了变化
var_dump($server->worker_id);
echo "任务来自于:$form_id".",任务id为{$task_id}".PHP_EOL;
try{
}catch (Exception $e){
//$server->sendMessage();//这个是如果发生异常错误的,重新把数据发回给woker进程,不要发给task进程,第一个参数是数据,第二个参数是进程编号(0-(worker进程数+task进程数-1))
//需要用 onPipeMessage 接收
}
sleep(10);
$server->finish("执行完毕");
});
$server->on("pipeMessage",function (swoole_server $server, int $src_worker_id,$message){
echo "来自于{$src_worker_id}的错误信息".PHP_EOL;
var_dump($message); //接收到投递的错误信息,记录错误次数,错误次数到达一定次数之后,就保留日志
});
$server->on("finish",function ($server,$task_id,$data){
echo "任务{$task_id}执行完毕:{$data}".PHP_EOL;
// var_dump(posix_getpid());
//$server->send($data["fd"], "任务执行完毕");
});
//消息关闭
$server->on("close",function (){
//echo "消息关闭".PHP_EOL;
});
//服务器开启
$server->start();
linux消息队列//ipcs -q linux消息队列命令
//父进程跟子进程实现消息发送
//ftok根据路径名,提取文件信息,再根据这些文件信息及project ID合成key,该路径可以随便设置。
//该路径是必须存在的,ftok只是根据文件inode在系统内的唯一性来取一个数值,和文件的权限无关。
$msg_key=ftok(__DIR__,"u"); //注意在php创建消息队列,第二个参数会直接转成字符串,可能会导致通讯失败
$msg_queue=msg_get_queue($msg_key);
$pid=pcntl_fork();
if($pid==0){
//子进程发送消息
msg_send($msg_queue,10,"我是子进程发送的消息");//发送消息到消息队列
//msg_receive($msg_queue,10,$message_type,1024,$message);
//var_dump($message);
exit();
}elseif ($pid){
msg_receive($msg_queue,10,$message_type,1024,$message);//从消息队列中收取消息
var_dump($message);
//父进程接收消息
pcntl_wait($status);
msg_remove_queue($msg_queue);
}
以上是 swoole学习笔记之task异步进程与消息列队 的全部内容, 来源链接: utcz.com/z/512202.html