swoole学习笔记之task异步进程与消息列队

编程

客户端

<?php

$client=new swooleClient(SWOOLE_SOCK_TCP);

//发数据

$client->connect("127.0.0.1",9801);

$body="213";

$data=pack("N",strlen($body)).$body;//数据打包,包头包体,防止粘包现象

$client->send($data);

$client->close();

服务端

<?php

//tcp协议

$server=new SwooleServer("0.0.0.0",9801); //创建server对象

//include "222xx"; 不能

$key=ftok(__DIR__,1);//得到消息队列的KEY

echo $key;

$server->set([

"worker_num"=>1, //设置进程

//"heartbeat_idle_time"=>10,//连接最大的空闲时间

//"heartbeat_check_interval"=>3 //服务器定时检查

"task_worker_num"=>1, //task进程数

"task_ipc_mode"=>2,

"message_queue_key"=>$key,//绑定消息队列key,绑定后task如果因为意外关闭导致消息队列的数据没有被消费掉,那么重启后task会自动继续消耗消息队列数据

"open_length_check"=>1,

"package_length_type"=>"N",//设置包头的长度

"package_length_offset"=>0, //包长度从哪里开始计算

"package_body_offset"=>4, //包体从第几个字节开始计算

]);

$server->on("start",function (){

// include "index.php"; 不能

});

$server->on("Shutdown",function (){

// include "index.php"; 不能

echo "正常关闭";

});

$server->on("workerStart",function ($server,$fd){

//include "index.php";

if($server->taskworker){

echo "task_worker:".$server->worker_id.PHP_EOL;

}else{

echo "worker:".$server->worker_id.PHP_EOL;

}

});

//监听事件,连接事件

$server->on("connect",function ($server,$fd){

//echo "新的连接进入xxx:{$fd}".PHP_EOL;

});

//消息发送过来

$server->on("receive",function (swoole_server $server, int $fd, int $reactor_id, string $data){

//var_dump("消息发送过来:".$data);

//不需要立刻马上得到结果的适合task

$data=["tid"=>time()];

//sleep(10);

$data=str_repeat("a",10*1024*1024);//模拟大文件,发送给task的数据最好不好超过8K,会影响性能,超过8K的文件会在/tmp/生成一个临时文件

//$data = "a";

$server->task($data); //投递到taskWorker进程组。第二个参数可以指定哪个task (0-(task数量-1)),在task_ipc_mode=3的模式下无效

echo "异步非阻塞".PHP_EOL;

//服务端

});

//ontask事件回调

$server->on("task",function ($server,$task_id,$form_id,$data){

//var_dump(posix_getpid()); //进程确实是发生了变化

var_dump($server->worker_id);

echo "任务来自于:$form_id".",任务id为{$task_id}".PHP_EOL;

try{

}catch (Exception $e){

//$server->sendMessage();//这个是如果发生异常错误的,重新把数据发回给woker进程,不要发给task进程,第一个参数是数据,第二个参数是进程编号(0-(worker进程数+task进程数-1))

        //需要用 onPipeMessage 接收

}

sleep(10);

$server->finish("执行完毕");

});

$server->on("pipeMessage",function (swoole_server $server, int $src_worker_id,$message){

    echo "来自于{$src_worker_id}的错误信息".PHP_EOL;

    var_dump($message); //接收到投递的错误信息,记录错误次数,错误次数到达一定次数之后,就保留日志

});

$server->on("finish",function ($server,$task_id,$data){

echo "任务{$task_id}执行完毕:{$data}".PHP_EOL;

// var_dump(posix_getpid());

//$server->send($data["fd"], "任务执行完毕");

});

//消息关闭

$server->on("close",function (){

//echo "消息关闭".PHP_EOL;

});

//服务器开启

$server->start();

 

linux消息队列

//ipcs -q linux消息队列命令

//父进程跟子进程实现消息发送

//ftok根据路径名,提取文件信息,再根据这些文件信息及project ID合成key,该路径可以随便设置。

//该路径是必须存在的,ftok只是根据文件inode在系统内的唯一性来取一个数值,和文件的权限无关。

$msg_key=ftok(__DIR__,"u"); //注意在php创建消息队列,第二个参数会直接转成字符串,可能会导致通讯失败

$msg_queue=msg_get_queue($msg_key);

$pid=pcntl_fork();

if($pid==0){

//子进程发送消息

msg_send($msg_queue,10,"我是子进程发送的消息");//发送消息到消息队列

//msg_receive($msg_queue,10,$message_type,1024,$message);

//var_dump($message);

exit();

}elseif ($pid){

msg_receive($msg_queue,10,$message_type,1024,$message);//从消息队列中收取消息

var_dump($message);

//父进程接收消息

pcntl_wait($status);

msg_remove_queue($msg_queue);

}

以上是 swoole学习笔记之task异步进程与消息列队 的全部内容, 来源链接: utcz.com/z/512202.html

回到顶部