1. 程式人生 > >swoole(6)Task非同步任務

swoole(6)Task非同步任務

一:什麼是task程序?

task程序是獨立與worker程序的一組程序  ,他主要處理耗時較長的業務邏輯,並且不影響worker程序處理客戶端的請求.worker程序通過task()函式把資料投遞到Task程序去處理

開啟task功能(task功能是預設關閉的 ,開啟task功能需要滿足2個條件) 

  1. 配置task程序的數量(task_worker_num)
  2. 註冊task的回掉函式onTask和onfinish

二:task設計流程

  1. worker程序中,我們呼叫對應的task()方法傳送資料通知到task worker程序
  2. task worker程序會在onTask()回撥中接收到這些資料,並進行處理
  3. 處理完會可以通過finish函式或直接return訊息給worker程序
  4. worker程序在onFinish()程序收到訊息並進行處理

程式碼:

<?php
/**
 * Created by PhpStorm.
 * User: huahua
 * Date: 2020/3/6
 * Time: 上午10:09
 */

//基於第一個 TCP 伺服器,只需要增加 onTask 和 onFinish2 個事件回撥函式即可。
//另外需要設定 task 程序數量,可以根據任務的耗時和任務量配置適量的 task 程序。

$serv = new \Swoole\Server('127.0.0.1',9800);

$serv->set([
    'worker_num'=>2, //設定程序
    //配置此引數後將會啟用 task 功能。所以 Server 務必要註冊 onTask、onFinish 2 個事件回撥函式。如果沒有註冊,伺服器程式將無法啟動。
    'task_worker_num' => 4, //配置 Task 程序的數量。
    'task_ipc_mode'=>2,//設定 Task 程序與 Worker 程序之間通訊的方式
]);
//監聽連線進入事件
$serv->on('Connect', function ($serv, $fd) {
    echo "Client: Connect.\n";
});

//監聽資料接收事件
$serv->on('Receive', function ($serv, $fd, $from_id, $data) {
    $data=['tid'=>time()];
    $task_id = $serv->task($data);////投遞到taskWorker程序組
    echo "task id = {$task_id},from id = $from_id".PHP_EOL;
});

$serv->on('task',function ($serv, $task_id, $from_id, $data){
    echo "task id = {$task_id},from id = $from_id,程序id =".posix_getpid().PHP_EOL;
    echo '我是task任務';
    $serv->finish("data -> ok");
});

$serv->on('finish',function ($serv, $task_id, $data){
    echo "task id = {$task_id}".PHP_EOL;
});
//監聽連線關閉事件
$serv->on('Close', function ($serv, $fd) {
    echo "Client: Close.\n";
});

//啟動伺服器
$serv->start();

cli下執行結果:

$task_id不等於workerStart中的worker_id,而是swoole維護的任務自增長id
$from_id表示來自於哪個woker投遞而來的任務,id等於worker_id值
$serv->worker_id等於workerStart中的worker_id, 此處是task_woker的id
$serv->worker_pid 此處是task_woker的程序pid

 三:task任務切分

場景:假設有一臺伺服器專門處理前臺投遞的資料,利用簡單的任務拆分,分配到相應的程序去處理

思路:

  • 1.將一個大的任務拆分成相應份數(是由$task_worker_num數量來確定)
  • 2.通過foreach迴圈將資料投遞到指定的task程序,範圍是(0-(task_worker_num-1))區間之內
  • 3.執行失敗的任務,需要保留,重新投遞執行(程序間通訊管道方式)
$server->on('receive',function (swoole_server $server, int $fd, int $reactor_id, string $data){
    for ($i=0;$i<100;$i++){
        $tasks[] =['id'=>$i,'msg'=>time()];
    }
    $count=count($tasks);
    $data=array_chunk($tasks,ceil($count/3));
    foreach ($data as $k=>$v){
        $server->task($v,$k);  //(0-task_woker_num-1)
    }

});