swoole(6)Task非同步任務
阿新 • • 發佈:2020-03-06
一:什麼是task程序?
task程序是獨立與worker程序的一組程序 ,他主要處理耗時較長的業務邏輯,並且不影響worker程序處理客戶端的請求.worker程序通過task()函式把資料投遞到Task程序去處理
開啟task功能(task功能是預設關閉的 ,開啟task功能需要滿足2個條件)
- 配置task程序的數量(task_worker_num)
- 註冊task的回掉函式onTask和onfinish
二:task設計流程
- worker程序中,我們呼叫對應的task()方法傳送資料通知到task worker程序
- task worker程序會在onTask()回撥中接收到這些資料,並進行處理
- 處理完會可以通過finish函式或直接return訊息給worker程序
- worker程序在onFinish()程序收到訊息並進行處理
程式碼:
<?php /** * Created by PhpStorm. * User: huahua * Date: 2020/3/6 * Time: 上午10:09 */ //基於第一個 TCP 伺服器,只需要增加 onTask 和 onFinish2 個事件回撥函式即可。 //另外需要設定 task 程序數量,可以根據任務的耗時和任務量配置適量的 task 程序。 $serv = new \Swoole\Server('127.0.0.1',9800); $serv->set([ 'worker_num'=>2, //設定程序 //配置此引數後將會啟用 task 功能。所以 Server 務必要註冊 onTask、onFinish 2 個事件回撥函式。如果沒有註冊,伺服器程式將無法啟動。 'task_worker_num' => 4, //配置 Task 程序的數量。 'task_ipc_mode'=>2,//設定 Task 程序與 Worker 程序之間通訊的方式 ]); //監聽連線進入事件 $serv->on('Connect', function ($serv, $fd) { echo "Client: Connect.\n"; }); //監聽資料接收事件 $serv->on('Receive', function ($serv, $fd, $from_id, $data) { $data=['tid'=>time()]; $task_id = $serv->task($data);////投遞到taskWorker程序組 echo "task id = {$task_id},from id = $from_id".PHP_EOL; }); $serv->on('task',function ($serv, $task_id, $from_id, $data){ echo "task id = {$task_id},from id = $from_id,程序id =".posix_getpid().PHP_EOL; echo '我是task任務'; $serv->finish("data -> ok"); }); $serv->on('finish',function ($serv, $task_id, $data){ echo "task id = {$task_id}".PHP_EOL; }); //監聽連線關閉事件 $serv->on('Close', function ($serv, $fd) { echo "Client: Close.\n"; }); //啟動伺服器 $serv->start();
cli下執行結果:
$task_id不等於workerStart中的worker_id,而是swoole維護的任務自增長id $from_id表示來自於哪個woker投遞而來的任務,id等於worker_id值 $serv->worker_id等於workerStart中的worker_id, 此處是task_woker的id $serv->worker_pid 此處是task_woker的程序pid
三:task任務切分
場景:假設有一臺伺服器專門處理前臺投遞的資料,利用簡單的任務拆分,分配到相應的程序去處理
思路:
- 1.將一個大的任務拆分成相應份數(是由$task_worker_num數量來確定)
- 2.通過foreach迴圈將資料投遞到指定的task程序,範圍是(0-(task_worker_num-1))區間之內
- 3.執行失敗的任務,需要保留,重新投遞執行(程序間通訊管道方式)
$server->on('receive',function (swoole_server $server, int $fd, int $reactor_id, string $data){ for ($i=0;$i<100;$i++){ $tasks[] =['id'=>$i,'msg'=>time()]; } $count=count($tasks); $data=array_chunk($tasks,ceil($count/3)); foreach ($data as $k=>$v){ $server->task($v,$k); //(0-task_woker_num-1) } });