TP6(thinkphp6)佇列與延時佇列
阿新 • • 發佈:2021-12-17
安裝
在此我就不再略過TP6的專案建立過程了,大致就是安裝composer工具,安裝成功以後,再使用composer去建立專案即可。
think-queue 安裝
composer require topthink/think-queue
專案中新增驅動配置
我們需要在安裝好的config下找到 queue.php
<?php return [ 'default' => 'redis', 'connections' => [ 'sync' => [ 'type' => 'sync', ], 'database' => [ 'type' => 'database', 'queue' => 'default', 'table' => 'jobs', 'connection' => null, ], 'redis' => [ 'type' => 'redis', 'queue' => 'default', 'host' => '127.0.0.1', 'port' => 6379, 'password' => '', 'select' => 4, 'timeout' => 0, 'persistent' => false, ], ], 'failed' => [ 'type' => 'none', 'table' => 'failed_jobs', ], ];
生產者
<?php namespace app\controller; use app\BaseController; use think\facade\Queue; class Index extends BaseController { public function queue() { //當前任務將由哪個類來負責處理。 //當輪到該任務時,系統將生成一個該類的例項,並預設呼叫其 fire 方法$jobHandlerClassName = 'app\Job\Order'; //當前任務歸屬的佇列名稱,如果為新佇列,會自動建立 //php think queue:work --queue orderJobQueue //php think queue:work --queue orderJobQueue --daemon $jobQueueName = "orderJobQueue"; //陣列資料 $orderData = [ 'id' => uniqid(), 'time' => time(), 'message' => 'later message83' ]; //將該任務推送到訊息佇列,等待對應的消費者去執行 //這裡只是負責將資料新增到相應的佇列名稱的佇列裡,消費者與生產者並無聯絡 //立即執行 $isPushed = Queue::push($jobHandlerClassName, $orderData, $jobQueueName); //延遲10秒後執行 //$isPushed = Queue::later(10, $jobHandlerClassName, $orderData, $jobQueueName); if ($isPushed !== false) { echo date('Y-m-d H:i:s') . " 佇列新增成功"; } else { echo '佇列新增失敗'; } } }
消費者
<?php namespace app\Job; use think\facade\Log; use think\queue\Job; /** * @Title: app\task\job$Order * @Package package_name * @Description: todo(測試訂單消費者) * @author Jack */ class Order { /** * @Title: fire * @Description: todo(fire方法是訊息佇列預設呼叫的方法) * @param Job $job * @param array $data * @author Jack * @throws */ public function fire(Job $job, array $data) { //有些訊息在到達消費者時,可能已經不再需要執行了 $isJobStillNeedToBeDone = $this->checkDatabaseToSeeIfJobNeedToBeDone($data); if(!$isJobStillNeedToBeDone){ $job->delete(); return; } $jobId = $job->getJobId(); $isJobDone = $this->orders($data, $jobId); if ($isJobDone) { //如果任務執行成功,記得刪除任務 $job->delete(); } else { //通過這個方法可以檢查這個任務已經重試了幾次了 if ($job->attempts() > 3){ Log::error('試了3次了'); $job->delete(); //也可以重新發布這個任務 //print("<info>Hello Job will be availabe again after 2s."."</info>\n"); //$job->release(2); //$delay為延遲時間,表示該任務延遲2秒後再執行 } } } /** * @Title: checkDatabaseToSeeIfJobNeedToBeDone * @Description: todo(有些訊息在到達消費者時,可能已經不再需要執行了) * @param array $data * @return boolean * @author Jack * @throws */ private function checkDatabaseToSeeIfJobNeedToBeDone($data) { return true; } /** * @Title: orders * @Description: todo(資料處理) * @param array $data * @author Jack * @throws */ public function orders(array $data, $jobId) { //對訂單進行資料庫操作或其他等等 Log::info(date('Y-m-d H:i:s') . ' - data:' . json_encode($data)); return true; } }
伺服器執行常駐命令
php think queue:work --queue orderJobQueue