1. 程式人生 > >TP5 中的redis 佇列 [轉載]

TP5 中的redis 佇列 [轉載]

首先我們看一下自己的TP5的框架中的  TP5\vendor\topthink ,這個檔案中有沒有think-queue這個資料夾,如果沒有請安裝,

安裝這個是要用到Composer的如果沒有安裝composer,請安裝Composer

1.$ curl -sS https://getcomposer.org/installer | php

2.$ mv composer.phar /usr/local/bin/composer
Linux上安裝 think-queue ,請先進入到框架的根目錄再執行

composer require topthink/think-queue
這個時候再去看就會有 think-queue 這個檔案夾了,確定一下看看是否安裝成功,執行

php think queue:work -h
能出現以下 結果 就表示think-queue的 安裝好了

 

 

配置
配置檔案位於 application/extra/queue.php

公共配置
 

<?php
return [
'connector' => 'Redis', // Redis 驅動
'expire' => 60, // 任務的過期時間,預設為60秒; 若要禁用,則設定為 null
'default' => 'default', // 預設的佇列名稱
'host' => '127.0.0.1', // redis 主機ip
'port' => 6379, // redis 埠
'password' => '', // redis 密碼
'select' => 0, // 使用哪一個 db,預設為 db0
'timeout' => 0, // redis連線的超時時間
'persistent' => false, // 是否是長連線

// 'connector' => 'Database', // 資料庫驅動
// 'expire' => 60, // 任務的過期時間,預設為60秒; 若要禁用,則設定為 null
// 'default' => 'default', // 預設的佇列名稱
// 'table' => 'jobs', // 儲存訊息的表名,不帶字首
// 'dsn' => [],

// 'connector' => 'Topthink', // ThinkPHP內部的佇列通知服務平臺 ,本文不作介紹
// 'token' => '',
// 'project_id' => '',
// 'protocol' => 'https',
// 'host' => 'qns.topthink.com',
// 'port' => 443,
// 'api_version' => 1,
// 'max_retries' => 3,
// 'default' => 'default',

// 'connector' => 'Sync', // Sync 驅動,該驅動的實際作用是取消訊息佇列,還原為同步執行
];
 

1.4 訊息的建立與推送

我們在業務控制器中建立一個新的訊息,並推送到 helloJobQueue 佇列

新增 \application\index\controller\JobTest.php 控制器,在該控制器中新增 actionWithHelloJob 方法

<?php
/**
* 檔案路徑: \application\index\controller\JobTest.php
* 該控制器的業務程式碼中藉助了thinkphp-queue 庫,將一個訊息推送到訊息佇列
*/
namespace app\index\controller;
use think\Exception;

use think\Queue;

class JobTest {
/**
* 一個使用了佇列的 action
*/
public function actionWithHelloJob(){

// 1.當前任務將由哪個類來負責處理。
// 當輪到該任務時,系統將生成一個該類的例項,並呼叫其 fire 方法
$jobHandlerClassName = 'application\index\job\Hello';
// 2.當前任務歸屬的佇列名稱,如果為新佇列,會自動建立
$jobQueueName = "helloJobQueue";
// 3.當前任務所需的業務資料 . 不能為 resource 型別,其他型別最終將轉化為json形式的字串
// ( jobData 為物件時,需要在先在此處手動序列化,否則只儲存其public屬性的鍵值對)
$jobData = [ 'ts' => time(), 'bizId' => uniqid() , 'a' => 1 ] ;
// 4.將該任務推送到訊息佇列,等待對應的消費者去執行
$isPushed = Queue::push( $jobHandlerClassName , $jobData , $jobQueueName );
// database 驅動時,返回值為 1|false ; redis 驅動時,返回值為 隨機字串|false
if( $isPushed !== false ){
echo date('Y-m-d H:i:s') . " a new Hello Job is Pushed to the MQ"."<br>";
}else{
echo 'Oops, something went wrong.';
}
}
}
注意: 在這個例子當中,我們是手動指定的 $jobHandlerClassName ,更合理的做法是先定義好訊息名稱與消費者類名的對映關係,然後由某個可以獲取該對映關係的類來推送這個訊息。這樣,生產者只需要知道訊息的名稱,而無需指定哪個消費者類來處理。

除了 Queue::push( $jobHandlerClassName , $jobData , $jobQueueName );這種方式之外,還可以直接傳入 Queue::push( $jobHandlerObject ,null , $jobQueueName ); 這時,需要在 $jobHandlerObject 中定義一個 handle() 方法,訊息佇列在執行到該任務時會自動反序列化該物件,並呼叫其 handle()方法。 該方式的缺點是無法傳入自定義資料。

1.5 訊息的消費與刪除

編寫 Hello 消費者類,用於處理 helloJobQueue 佇列中的任務

新增 \application\index\job\Hello.php 消費者類,並編寫其 fire() 方法

<?php
/**
* 檔案路徑: \application\index\job\Hello.php
* 這是一個消費者類,用於處理 helloJobQueue 佇列中的任務
*/
namespace app\index\job;

use think\queue\Job;

class Hello {

/**
* fire方法是訊息佇列預設呼叫的方法
* @param Job $job 當前的任務物件
* @param array|mixed $data 釋出任務時自定義的資料
*/
public function fire(Job $job,$data){
         // 如有必要,可以根據業務需求和資料庫中的最新資料,判斷該任務是否仍有必要執行.
         $isJobStillNeedToBeDone = $this->checkDatabaseToSeeIfJobNeedToBeDone($data);
         if(!isJobStillNeedToBeDone){
$job->delete();
return;
}
       
$isJobDone = $this->doHelloJob($data);

if ($isJobDone) {
//如果任務執行成功, 記得刪除任務
$job->delete();
print("<info>Hello Job has been done and deleted"."</info>\n");
}else{
if ($job->attempts() > 3) {
//通過這個方法可以檢查這個任務已經重試了幾次了
print("<warn>Hello Job has been retried more than 3 times!"."</warn>\n");
$job->delete();
// 也可以重新發布這個任務
//print("<info>Hello Job will be availabe again after 2s."."</info>\n");
//$job->release(2); //$delay為延遲時間,表示該任務延遲2秒後再執行
}
}
}

/**
      * 有些訊息在到達消費者時,可能已經不再需要執行了
* @param array|mixed $data 釋出任務時自定義的資料
* @return boolean 任務執行的結果
      */
     private function checkDatabaseToSeeIfJobNeedToBeDone($data){
return true;
}

/**
* 根據訊息中的資料進行實際的業務處理
* @param array|mixed $data 釋出任務時自定義的資料
* @return boolean 任務執行的結果
*/
private function doHelloJob($data) {
// 根據訊息中的資料進行實際的業務處理...

print("<info>Hello Job Started. job Data is: ".var_export($data,true)."</info> \n");
print("<info>Hello Job is Fired at " . date('Y-m-d H:i:s') ."</info> \n");
print("<info>Hello Job is Done!"."</info> \n");

return true;
}
}

至此,所有的程式碼都已準備完畢,在執行訊息佇列之前,我們先看一下現在的目錄結構:

 

1.6 釋出任務

在瀏覽器中訪問 http://your.project.domain/index/job_test/actionWithHelloJob ,可以看到訊息推送成功。

 

這個時候去Linux中連結redis,檢視redis的佇列任務,就可以看到有資料在裡面

連線redis:/usr/local/redis/bin/redis-cli -h 12.131.12.12 -p 6379

redis對列的命令,顯示helloJobQueue對列中的資料:LRANGE queues:helloJobQueue 0 -1  

顯示對列中有幾條資料:Llen queues:helloJobQueue

 

看到這樣就說明已經加入到隊列了,截圖是加入了好多條

 

 

1.7 處理任務

切換當前終端視窗的目錄到專案根目錄下,執行

php think queue:work --queue helloJobQueue
可以看到執行的結果類似如下:

 

​由於php think queue:work --queue helloJobQueue這個命令只能在TP5框架的根目錄才能執行成功,所以,shell指令碼要先cd到框架的根目錄,具體見下面的shell指令碼截圖

至此,我們成功地經歷了一個訊息的 建立 -> 推送 -> 消費 -> 刪除 的基本流程

但是!!! 但是!!! 但是!!!,重要的事說3遍

雖然現在是可以將訊息的建立 -> 推送 -> 消費 -> 刪除 的基本流程全部跑通,但是每次執行 php think queue:work --queue helloJobQueue 這個命令都是進行了一次,也就是說,在對列 helloJobQueue 中有5條要處理的資料,每次執行 php think queue:work --queue helloJobQueue 都是隻執行了一條資料,還有4條資料沒有處理,我們要的是執行一次可以直接將對列中的資料全部處理掉,於是,我們想到定時任務去處理

首先我們寫兩個shell指令碼

1.monitorHandleQueue.sh,作用是檢查佇列的程序是否在執行


pid=$(ps -ef| grep handleQueue |grep -v grep | awk ' NR==1 {print $2}')

if [ -z $pid ]
then
sh /home/wwwroot/default/www/thinkphp5/handleQueue.sh &>/dev/null 2>&1
fi
mysqld是程序名稱

檢查程序是否存在,如果不存在啟動handleQueue.sh指令碼,注意:monitorHandleQueue.sh指令碼中的啟動handleQueue.sh的路徑寫自己的,NR==1表示只取第一個程序,|grep -v grep 過濾掉自己的程序

2.handleQueue.sh 指令碼

cd /home/wwwroot/default/www/thinkphp5
while [ 2 > 0 ]
do
len=`/usr/local/redis/bin/redis-cli -h 1.1.1.1 -p 6379 Llen queues:helloJobQueue`
if [ $((len + 0 )) -gt 0 ];then
php think queue:work --queue helloJobQueue
else
sleep 3
php think queue:work --queue helloJobQueue
fi
done
此指令碼中的cd /home/wwwroot/default/www/thinkphp5,一定要切換到框架的根目錄,解釋一下handleQueue.sh指令碼的邏輯:先切換到框架的根目錄,while判斷2大於0為真,所以會一直執行,連線到redis,獲取佇列的長度,if判斷,如果佇列的長度大於0直接執行佇列,否則就停3秒再執行佇列,很簡單,寫了很長時間,還有一點要注意,shell指令碼最好不要在編輯器編輯,直接在Linux上編輯,因為如果在編輯器上編輯上傳到Linux上會產生意想不到的問題(我在這裡耽誤了很長時間),找不到問題所在就直接在Linux上編寫好了,省的麻煩

上面的shell指令碼是我第一次寫,碰到了很多問題

1.[ 2 > 0 ]格式為[空格判斷表示式空格]

2.len=`/usr/local/redis/bin/redis-cli -h 47.101.54.26 -p 6379 Llen queues:helloJobQueue`,等於號的左右兩邊不能有空格,反引號 `` 不知道怎麼輸出,在1是我左邊的那個鍵

3. if 的格式,if [ $((len + 0 )) -gt 0 ];then 變數大於0,大於號用 -gt 表示

建立定時任務

*/1 * * * *  /home/wwwroot/default/www/thinkphp5/monitorHandleQueue.sh &>/dev/null 2>&1

好了,至此一個迴圈請求佇列就寫好了

 

 

 

參考:https://blog.csdn.net/will5451/article/details/80434174

參考:https://www.kancloud.cn/yangweijie/learn_thinkphp5_with_yang/367645

原文:https://blog.csdn.net/dabao87/article/details/82414839