1. 程式人生 > 其它 >TP6(thinkphp6)佇列與延時佇列

TP6(thinkphp6)佇列與延時佇列

安裝

在此我就不再略過TP6的專案建立過程了,大致就是安裝composer工具,安裝成功以後,再使用composer去建立專案即可。

think-queue 安裝

composer require topthink/think-queue

專案中新增驅動配置

我們需要在安裝好的config下找到 queue.php

<?php
return [
    'default'     => 'redis',
    'connections' => [
        'sync'     => [
            'type' => 'sync',
        ]
, 'database' => [ 'type' => 'database', 'queue' => 'default', 'table' => 'jobs', 'connection' => null, ], 'redis' => [ 'type' => 'redis', 'queue' => 'default', 'host' => '127.0.0.1', 'port' => 6379, 'password' => '', 'select' => 4, 'timeout' => 0, 'persistent' => false
, ], ], 'failed' => [ 'type' => 'none', 'table' => 'failed_jobs', ], ];

生產者

<?php
namespace app\controller;

use app\BaseController;
use think\facade\Queue;

class Index extends BaseController
{
    public function queue()
    {
        //當前任務將由哪個類來負責處理。
        //當輪到該任務時,系統將生成一個該類的例項,並預設呼叫其 fire 方法
$jobHandlerClassName = 'app\Job\Order'; //當前任務歸屬的佇列名稱,如果為新佇列,會自動建立 //php think queue:work --queue orderJobQueue //php think queue:work --queue orderJobQueue --daemon $jobQueueName = "orderJobQueue"; //陣列資料 $orderData = [ 'id' => uniqid(), 'time' => time(), 'message' => 'later message83' ]; //將該任務推送到訊息佇列,等待對應的消費者去執行 //這裡只是負責將資料新增到相應的佇列名稱的佇列裡,消費者與生產者並無聯絡 //立即執行 $isPushed = Queue::push($jobHandlerClassName, $orderData, $jobQueueName); //延遲10秒後執行 //$isPushed = Queue::later(10, $jobHandlerClassName, $orderData, $jobQueueName); if ($isPushed !== false) { echo date('Y-m-d H:i:s') . " 佇列新增成功"; } else { echo '佇列新增失敗'; } } }

消費者

<?php
namespace app\Job;

use think\facade\Log;
use think\queue\Job;

/**
 * @Title: app\task\job$Order
 * @Package package_name
 * @Description: todo(測試訂單消費者)
 * @author Jack
 */
class Order
{
    /**
     * @Title: fire
     * @Description: todo(fire方法是訊息佇列預設呼叫的方法)
     * @param Job $job
     * @param array $data
     * @author Jack
     * @throws
     */
    public function fire(Job $job, array $data)
    {
        //有些訊息在到達消費者時,可能已經不再需要執行了
        $isJobStillNeedToBeDone = $this->checkDatabaseToSeeIfJobNeedToBeDone($data);
        if(!$isJobStillNeedToBeDone){
            $job->delete();
            return;
        }
        $jobId =  $job->getJobId();
        $isJobDone = $this->orders($data, $jobId);
        if ($isJobDone) {
            //如果任務執行成功,記得刪除任務
            $job->delete();
        } else {
            //通過這個方法可以檢查這個任務已經重試了幾次了
            if ($job->attempts() > 3){
                Log::error('試了3次了');
                $job->delete();

                //也可以重新發布這個任務
                //print("<info>Hello Job will be availabe again after 2s."."</info>\n");
                //$job->release(2); //$delay為延遲時間,表示該任務延遲2秒後再執行
            }
        }
    }

    /**
     * @Title: checkDatabaseToSeeIfJobNeedToBeDone
     * @Description: todo(有些訊息在到達消費者時,可能已經不再需要執行了)
     * @param array $data
     * @return boolean
     * @author Jack
     * @throws
     */
    private function checkDatabaseToSeeIfJobNeedToBeDone($data)
    {
        return true;
    }

    /**
     * @Title: orders
     * @Description: todo(資料處理)
     * @param array $data
     * @author Jack
     * @throws
     */
    public function orders(array $data,  $jobId)
    {
        //對訂單進行資料庫操作或其他等等
        Log::info(date('Y-m-d H:i:s') . ' - data:' . json_encode($data));
        return true;
    }
}

伺服器執行常駐命令

php think queue:work --queue orderJobQueue