1. 程式人生 > 程式設計 >PHP swoole中使用task程序非同步的處理耗時任務應用案例分析

PHP swoole中使用task程序非同步的處理耗時任務應用案例分析

本文例項講述了PHP swoole中使用task程序非同步的處理耗時任務。分享給大家供大家參考,具體如下:

我們知道,swoole中有兩大程序,分別是 master 主程序和 manager 管理程序。

其中 master 主程序中會有一個主 reactor 執行緒和多個 reactor 執行緒,主要的作用就是用來維護TCP連線,處理網路IO,收發資料。

而 manager 管理程序,作用則是 fork 和管理 worker 和 task 程序。

worker 程序的作用是接收 reactor 執行緒傳遞的資料,並處理資料,返回處理結果給 reactor 執行緒。

task 程序的作用是處理一些相對耗時的任務,task 與 worker 程序是獨立的,不會影響 worker 程序處理客戶端的請求。

一、task 程序的應用場景:

1、相對耗時的郵件群發,比如某某活動,需要給100W使用者傳送活動郵件。

2、推送某些大V的動態,比如某大V發了條新訊息,粉絲需要及時獲取到該動態。

二、worker 與 task 的相互關係:

1、worker 程序中能過呼叫 task() 來投遞任務,task 程序中 通過 onTask 事件來響應投遞來的任務。

2、task 程序中 通過 直接返回 或 呼叫 finish() 來告訴 worker 程序任務處理完畢,worker 程序中 通過 onFinish 事件響應任務完成。

三、使用 task 的前題:

1、在 Server 中 配置 task_worker_num 數量。

2、設定 Server 的 onTask 和 onFinish 事件回撥函式。

四、簡單的使用task進行累加和的計算例子

<?php
$server = new swoole_server('0.0.0.0',6666);
 
$server->set([
  'worker_num' => 2,'task_worker_num' => 16,]);
 
$server->on('WorkerStart',function ($server,$worker_id) {
  //注意這裡,我們通過taskworker來判斷是task程序還是worker程序
  //需要在worker程序中呼叫task(),不然會報出警告
  //這裡會執行兩遍,因為我們設定了worker_num數為2
  if (!$server->taskworker) {
    echo '投遞任務開始...',PHP_EOL;
    //投遞32個累加計算任務給16個task程序
    for ($ix = 0; $ix < 32; $ix++) {
      //注意這裡的投遞是非同步的
      $server->task([mt_rand(1,100),mt_rand(1000,9999)]);
    }
    echo '投遞任務結束...',PHP_EOL;
  }
});
 
//server服務必須要有onReceive回撥
$server->on('Receive',$fd,$reactor_id,$data) {
 
});
 
//注意,task程序完全是同步阻塞模式的
$server->on('Task',$task_id,$src_worker_id,$data) {
  echo "task {$task_id} 程序正在工作...",PHP_EOL;
  $start = $data[0];
  $end = $data[1];
  $total = 0;
  for (; $start <= $end; $start++) {
    $total += $start;
  }
  echo "task {$task_id} 程序完成工作...",PHP_EOL;
  return $total;
});
 
$server->on('Finish',$data) {
  echo "task {$task_id} 程序處理完成,結果為 {$data}",PHP_EOL;
});
 
$server->start();

注意,我們通過呼叫 task() 往任務池中投遞任務,swoole 底層會輪詢的投遞任務到各個 task 程序。

當你投遞任務的數量超過 onTask 的處理速度,這會導致任務池被塞滿,進而導致 worker 程序發生阻塞,所以需合理設定 task_worker_num 數量和處理速度之間的關係。

當然,我們也可以人為的把任務投遞到指定的 task 程序。task() 函式的第二個引數可以指定要投遞的 task 程序ID,ID範圍為 0 到 (task_worker_num - 1)。

五、對任務進行切分,人為控制投遞到 task 程序

<?php
$server = new swoole_server('0.0.0.0',6666);
 
$server->set([
  'worker_num' => 1,'task_worker_num' => 10,$worker_id) {
  //為了方便演示,把worker_num設定為1,這裡只會執行一次
  if (!$server->taskworker) {
    //通過swoole_table共享記憶體,在不同程序中共享資料
    $server->result = new swoole_table(10240);
    //用於儲存task程序完成數量
    $server->result->column('finish_nums',swoole_table::TYPE_INT);
    //用於儲存最終計算結果
    $server->result->column('result',swoole_table::TYPE_INT);
    $server->result->create();
    //計算1000的累加和,並把計算任務分配到10個task程序上
    $num = 1000;
    $step = $num / $server->setting['task_worker_num'];
    for ($ix = 0; $ix < $server->setting['task_worker_num']; $ix++) {
      $start = $ix * $step;
      $server->task([$start,$start + $step],$ix);
    }
  }
});
 
$server->on('Receive',$data) {
  echo "task {$task_id} 程序正在工作... 計算 {$data[0]} - {$data[1]} ",PHP_EOL;
  $start = ++$data[0];
  $end = $data[1];
  $total = 0;
  for (; $start <= $end; $start++) {
    $total += $start;
  }
  echo "task {$task_id} 程序完成工作...",PHP_EOL;
  $server->result->incr('finish_nums','finish_nums');
  $server->result->set('result',['result' => $data + $server->result->get('result','result')]);
 
  if ($server->result->get('finish_nums','finish_nums') == $server->setting['task_worker_num']) {
    echo "最終計算結果:{$server->result->get('result','result')}",PHP_EOL;
  }
});
 
$server->start();

更多關於PHP相關內容感興趣的讀者可檢視本站專題:《PHP網路程式設計技巧總結》、《php socket用法總結》、《php面向物件程式設計入門教程》、《PHP資料結構與演算法教程》及《php程式設計演算法總結》

希望本文所述對大家PHP程式設計有所幫助。