PHP swoole中使用task程序非同步的處理耗時任務應用案例分析
本文例項講述了PHP swoole中使用task程序非同步的處理耗時任務。分享給大家供大家參考,具體如下:
我們知道,swoole中有兩大程序,分別是 master 主程序和 manager 管理程序。
其中 master 主程序中會有一個主 reactor 執行緒和多個 reactor 執行緒,主要的作用就是用來維護TCP連線,處理網路IO,收發資料。
而 manager 管理程序,作用則是 fork 和管理 worker 和 task 程序。
worker 程序的作用是接收 reactor 執行緒傳遞的資料,並處理資料,返回處理結果給 reactor 執行緒。
task 程序的作用是處理一些相對耗時的任務,task 與 worker 程序是獨立的,不會影響 worker 程序處理客戶端的請求。
一、task 程序的應用場景:
1、相對耗時的郵件群發,比如某某活動,需要給100W使用者傳送活動郵件。
2、推送某些大V的動態,比如某大V發了條新訊息,粉絲需要及時獲取到該動態。
二、worker 與 task 的相互關係:
1、worker 程序中能過呼叫 task() 來投遞任務,task 程序中 通過 onTask 事件來響應投遞來的任務。
2、task 程序中 通過 直接返回 或 呼叫 finish() 來告訴 worker 程序任務處理完畢,worker 程序中 通過 onFinish 事件響應任務完成。
三、使用 task 的前題:
1、在 Server 中 配置 task_worker_num 數量。
2、設定 Server 的 onTask 和 onFinish 事件回撥函式。
四、簡單的使用task進行累加和的計算例子
<?php $server = new swoole_server('0.0.0.0',6666); $server->set([ 'worker_num' => 2,'task_worker_num' => 16,]); $server->on('WorkerStart',function ($server,$worker_id) { //注意這裡,我們通過taskworker來判斷是task程序還是worker程序 //需要在worker程序中呼叫task(),不然會報出警告 //這裡會執行兩遍,因為我們設定了worker_num數為2 if (!$server->taskworker) { echo '投遞任務開始...',PHP_EOL; //投遞32個累加計算任務給16個task程序 for ($ix = 0; $ix < 32; $ix++) { //注意這裡的投遞是非同步的 $server->task([mt_rand(1,100),mt_rand(1000,9999)]); } echo '投遞任務結束...',PHP_EOL; } }); //server服務必須要有onReceive回撥 $server->on('Receive',$fd,$reactor_id,$data) { }); //注意,task程序完全是同步阻塞模式的 $server->on('Task',$task_id,$src_worker_id,$data) { echo "task {$task_id} 程序正在工作...",PHP_EOL; $start = $data[0]; $end = $data[1]; $total = 0; for (; $start <= $end; $start++) { $total += $start; } echo "task {$task_id} 程序完成工作...",PHP_EOL; return $total; }); $server->on('Finish',$data) { echo "task {$task_id} 程序處理完成,結果為 {$data}",PHP_EOL; }); $server->start();
注意,我們通過呼叫 task() 往任務池中投遞任務,swoole 底層會輪詢的投遞任務到各個 task 程序。
當你投遞任務的數量超過 onTask 的處理速度,這會導致任務池被塞滿,進而導致 worker 程序發生阻塞,所以需合理設定 task_worker_num 數量和處理速度之間的關係。
當然,我們也可以人為的把任務投遞到指定的 task 程序。task() 函式的第二個引數可以指定要投遞的 task 程序ID,ID範圍為 0 到 (task_worker_num - 1)。
五、對任務進行切分,人為控制投遞到 task 程序
<?php $server = new swoole_server('0.0.0.0',6666); $server->set([ 'worker_num' => 1,'task_worker_num' => 10,$worker_id) { //為了方便演示,把worker_num設定為1,這裡只會執行一次 if (!$server->taskworker) { //通過swoole_table共享記憶體,在不同程序中共享資料 $server->result = new swoole_table(10240); //用於儲存task程序完成數量 $server->result->column('finish_nums',swoole_table::TYPE_INT); //用於儲存最終計算結果 $server->result->column('result',swoole_table::TYPE_INT); $server->result->create(); //計算1000的累加和,並把計算任務分配到10個task程序上 $num = 1000; $step = $num / $server->setting['task_worker_num']; for ($ix = 0; $ix < $server->setting['task_worker_num']; $ix++) { $start = $ix * $step; $server->task([$start,$start + $step],$ix); } } }); $server->on('Receive',$data) { echo "task {$task_id} 程序正在工作... 計算 {$data[0]} - {$data[1]} ",PHP_EOL; $start = ++$data[0]; $end = $data[1]; $total = 0; for (; $start <= $end; $start++) { $total += $start; } echo "task {$task_id} 程序完成工作...",PHP_EOL; $server->result->incr('finish_nums','finish_nums'); $server->result->set('result',['result' => $data + $server->result->get('result','result')]); if ($server->result->get('finish_nums','finish_nums') == $server->setting['task_worker_num']) { echo "最終計算結果:{$server->result->get('result','result')}",PHP_EOL; } }); $server->start();
更多關於PHP相關內容感興趣的讀者可檢視本站專題:《PHP網路程式設計技巧總結》、《php socket用法總結》、《php面向物件程式設計入門教程》、《PHP資料結構與演算法教程》及《php程式設計演算法總結》
希望本文所述對大家PHP程式設計有所幫助。