laravel5.5隊列
[TOC]
簡單實例
- 配置驅動
假如使用database這個隊列驅動,首先要創建數據表進行記錄
//以下命令會在數據庫中生成jobs表
php artisan queue:table
php artisan migrate
然後更改驅動配置,可以修改.env 中的配置
QUEUE_DRIVER=database
- 創建任務類
php artisan make:job InsertData
以上命令生成app/Jobs/InsertData.php,然後修改該文件的handle方法
public function handle() { // 可以在這裏定義你想做的任何事情。。。 // 以向數據庫寫入文件為例 \DB::table('orders')->insert( array( 'user_id' => 1, 'remark' => time(), ) ); }
- 分發任務
<?php
namespace App\Http\Controllers\Home;
use App\Jobs\InsertData;
class TestController extends CommonController
{
public function testQueue()
{
InsertData::dispatch();
}
}
定義一條路由,方便訪問
Route::get('/test/queue','Home\TestController@testQueue');
訪問該條路由後,
- 查看隊列數據表jobs,發現多了一條數據,證明分發數據成功,已經寫入隊列。
- 查看數據表orders,並沒有任何變化,說明隊列並沒有運行
監聽隊列
php artisan queue:work
- 查看隊列數據表jobs,發現上面寫入的數據消失
- 查看數據表orders,寫入一條新的數據
總結: 實際上應該先運行隊列監聽,上面3 和 4 顛倒是為了便於觀察過程,以上就是隊列運行的最基本過程,先往jobs數據表中寫入任務信息, 然後通過queue排隊讀取該任務,運行成功後清除該條數據。
1. 簡介和配置
1.1 好處
將耗時的任務延時處理,比如發送郵件,從而大幅度縮短 Web 請求和相應的時間
1.2 配置文件
隊列配置文件存放在 config/queue.php
// 設置使用哪種隊列驅動
'default' => env('QUEUE_DRIVER', 'sync'),
// 隊列驅動方式可選項的詳細配置
'connections' => [
'sync' => [
'driver' => 'sync',
],
'database' => [
'driver' => 'database',
'table' => 'jobs',
'queue' => 'default',
'retry_after' => 90,
],
// 其他一些選項 。。。
'redis' => [
'driver' => 'redis',
'connection' => 'default', # config/database.php 中redis連接選項
'queue' => 'default', # 默認隊列任務被發給指定連接的時候會被分發到這個隊列中
'retry_after' => 90,
],
],
1.3 隊列驅動的必要配置
- 如果使用database隊列驅動
首先要創建數據表
//以下命令會在數據庫中生成jobs表
php artisan queue:table
php artisan migrate
然後更改驅動配置,可以修改.env 中的配置
QUEUE_DRIVER=database
- 如果使用redis隊列驅動
2. 創建任務
2.1 生成任務類
php artisan make:job InsertData
2.2 修改任務類
一般來說,最基本的就是補充handle方法
namespace App\Jobs;
use Illuminate\Bus\Queueable;
use Illuminate\Queue\SerializesModels;
use Illuminate\Queue\InteractsWithQueue;
use Illuminate\Contracts\Queue\ShouldQueue;
use Illuminate\Foundation\Bus\Dispatchable;
use App\Http\Model\Order;
class InsertData implements ShouldQueue
{
use Dispatchable, InteractsWithQueue, Queueable, SerializesModels;
// 最大嘗試次數
public $tries = 5;
// 超時時間
public $timeout = 120;
// 上面兩個屬性也可以通過命令行指定,但是優先級低於上面的屬性定義
// php artisan queue:work --tries=3
// php artisan queue:work --timeout=30
public $order = '';
/**
* Create a new job instance.
*
* @return void
*/
public function __construct(Order $order)
{
$this->order = $order;
}
/**
* Execute the job.
*
* @return void
*/
public function handle()
{
\DB::table('orders')->insert(
array(
'user_id' => 1,
'remark' => $this->order->remark,
)
);
}
}
這裏有一點需要註意,官方文檔說
在任務類的構造器中直接傳遞了一個 Eloquent 模型。因為我們在任務類裏引用了 SerializesModels 這個 trait,使得 Eloquent 模型在處理任務時可以被優雅地序列化和反序列化。如果你的隊列任務類在構造器中接收了一個 Eloquent 模型,那麽只有可識別出該模型的屬性會被序列化到隊列裏。當任務被實際運行時,隊列系統便會自動從數據庫中重新取回完整的模型。
這句話怎麽理解呢,何為優雅的序列化和反序列化?
簡單說,當使用Eloquent模型的時候,存入隊列的時候只存了 關鍵數據 ,實際運行的時候 重新 去數據庫取完成模型。
我們用上面的例子測試下
<?php
namespace App\Http\Controllers\Home;
use App\Jobs\InsertData;
use App\Http\Model\Order;
class TestController extends CommonController
{
public function testQueue()
{
$order = Order::find(1);
$order->remark = 'modify';
InsertData::dispatch($order);
}
}
最終的結果是,盡管我們修改了order的屬性remark,但是最終存入到數據庫中的數據仍舊是我們使用Order::ind(1)對象的remark的值
2.3 分發任務
- 使用dispatch方法
// 這個任務將被分發到默認隊列...
YourJob::dispatch();
YourJob::dispatch($order);
// 這個任務將被發送到「emails」隊列...
YourJob::dispatch()->onQueue('emails');
延遲分發
//我們指定一個任務在分配後 10 分鐘內不可被處理: InsertData::dispatch($order) ->delay(Carbon::now()->addMinutes(10));
工作鏈
工作鏈允許你指定應該按順序運行的隊列列表。如果一個任務失敗了,則其余任務將不會運行。你可以在分發任務的時候使用 withChain 方法來執行具有工作鏈的隊列任務。
Job1::withChain([
new otherJob2,
new otherJob3
])->dispatch();
$order = Order::find(1);
$post = Post::find(1);
InsertData::withChain([
new InsertDataToPost($post)
])->dispatch($order);
2.4 自定義隊列 & 連接
指定隊列,使用onQueue()
ProcessPodcast::dispatch($podcast)->onQueue('processing');
指定連接,使用onConnection()
ProcessPodcast::dispatch($podcast)->onConnection('redis');
連起來使用
ProcessPodcast::dispatch($podcast)
->onConnection('sqs')
->onQueue('processing');
3. 運行隊列處理器
3.1 運行
命令:
php artisan queue:work
隊列處理器是長時間運行的進程,如果你修改代碼那這些改變是不會應用到處理器中的。所以在你重新部署過程中,一定要 重啟
3.2 處理單一任務
每次只執行一個隊列任務
php artisan queue:work --once
3.3 指定連接 & 隊列
指定需要監聽的連接
//只啟動了redis隊列的驅動,如果你將數據推到其它連接比如database,則不會自動處理
php artisan queue:work redis
指定隊列
//啟動一個只處理那個特定隊列的隊列處理器
php artisan queue:work redis --queue=emails
3.4 資源註意事項
守護程序隊列不會在處理每個作業之前 「重新啟動」 框架。因此,在每個任務完成後,您應該釋放任何占用過大的資源。例如,如果你使用 GD 庫進行圖像處理,你應該在完成後用 imagedestroy 釋放內存。
3.5 隊列優先級
把一個任務推到 high 優先級的隊列中
dispatch((new Job)->onQueue('high'));
要保證high隊列的任務在low隊列任務之前處理
// 隊列名稱先後排列,中間用逗號隔開
php artisan queue:work --queue=high,low
3.6 隊列重啟
php artisan queue:restart
3.7 任務過期 & 超時
- 任務過期
config/queue.php 配置文件裏,每一個隊列連接都定義了一個 retry_after 選項, 比如 ‘retry_after‘ => 90, 那麽當任務運行超過90s之後,該任務會 重新回到隊列中
- 隊列處理超時
指定了 Laravel 隊列處理器最多執行多長時間後就應該被 關閉掉
php artisan queue:work --timeout=60
--timeout 應該永遠都要比 retry_after 短至少幾秒鐘的時間。這樣就能保證任務進程總能在失敗重試前就被殺死了。如果你的 --timeout 選項大於 retry_after 配置選項,你的任務可能被執行兩次,甚至更多次。
3.8 隊列進程睡眠時間
當隊列需要處理任務時,進程將繼續處理任務,它們之間沒有延遲。但是,如果沒有新的工作可用,sleep 參數決定了工作進程將 「睡眠」 多長時間:
php artisan queue:work --sleep=3
4. 配置Supervisor
4.1 下載程序並安裝
yum install python-setuptools
easy_install supervisor
// 使用root身份創建一個全局配置文件
#echo_supervisord_conf > /etc/supervisord.conf
4.2 編輯配置文件
vim /etc/supervisord.conf
加入以下內容,directory 指向工程所在的根目錄
[program:laravel-worker]
process_name=%(program_name)s_%(process_num)02d
directory = /data/zhengde
command=php artisan queue:work database --sleep=3 --tries=3
autostart=true
autorestart=true
user=root
numprocs=8
redirect_stderr=true
stdout_logfile=/var/log/worker.log
4.3 使用
這個工具主要就兩個命令:
- supervisord : supervisor的服務器端部分,啟動supervisor就是運行這個命令
supervisorctl:啟動supervisor的命令行窗口。
```3.1. 啟動supervisord
$supervisord -c /etc/supervisord.conf
3.2. 關閉supervisord
$supervisorctl shutdown
3.3. 重新載入配置
$supervisorctl reload
3.4.更新新的配置到supervisord
$supervisorctl update
3.5.啟動某個進程(program_name=你配置中寫的程序名稱)
$supervisorctl start program_name
3.6.查看正在守候的進程
$supervisorctl
3.7.停止某一進程 (program_name=你配置中寫的程序名稱)
$pervisorctl stop program_name
3.8.重啟某一進程 (program_name=你配置中寫的程序名稱)
$supervisorctl restart program_name
3.9.停止全部進程
$supervisorctl stop all
註意:顯示用stop停止掉的進程,用reload或者update都不會自動重啟 。
# 5. 處理失敗的任務
## 5.1 生成隊列失敗數據表
php artisan queue:failed-table
php artisan migrate
在調用 queue worker,命令時你應該通過 --tries 參數指定任務的最大重試次數。如果不指定,任務就會永久重試:
php artisan queue:work redis --tries=3
## 5.2 處理失敗任務
你可以在任務類裏直接定義 failed 方法,它能在任務失敗時運行任務的清除邏輯。這個地方用來發一條警告給用戶或者重置任務執行的操作等再好不過了。導致任務失敗的異常信息會被傳遞到 failed 方法:
<?php
namespace App\Jobs;
use Exception;
use App\Podcast;
use App\AudioProcessor;
use Illuminate\Bus\Queueable;
use Illuminate\Queue\SerializesModels;
use Illuminate\Queue\InteractsWithQueue;
use Illuminate\Contracts\Queue\ShouldQueue;
class ProcessPodcast implements ShouldQueue
{
use InteractsWithQueue, Queueable, SerializesModels;
protected $podcast;
public function __construct(Podcast $podcast)
{
$this->podcast = $podcast;
}
public function handle(AudioProcessor $processor)
{
// 處理上傳播客...
}
public function failed(Exception $exception)
{
// 給用戶發送失敗通知,等等...
}
}
## 5.3 任務失敗事件
如果你想註冊一個當隊列任務失敗時會被調用的事件,則可以用 Queue::failing 方法。這樣你就有機會通過這個事件來用 e-mail 或 HipChat 通知你的團隊。
例如我們可以在 Laravel 內置的 AppServiceProvider 中對這個事件附加一個回調函數:
<?php
namespace App\Providers;
use Illuminate\Support\Facades\Queue;
use Illuminate\Queue\Events\JobFailed;
use Illuminate\Support\ServiceProvider;
class AppServiceProvider extends ServiceProvider
{
public function boot()
{
Queue::failing(function (JobFailed $event) {
// $event->connectionName
// $event->job
// $event->exception
});
}
public function register()
{
//
}
}
## 5.4 重試失敗任務
// 查看所有任務
php artisan queue:failed
// 重試id為5的任務
php artisan queue:retry 5
// 重試所有失敗任務
php artisan queue:retry all
// 刪除一條失敗任務
php artisan queue:forget 5
// 刪除所有失敗任務
php artisan queue:flush
## 5.5 任務事件
使用隊列的 before 和 after 方法,你能指定任務處理前和處理後的回調處理。在這些回調裏正是實現額外的日誌記錄或者增加統計數據的好時機。
<?php
namespace App\Providers;
use Illuminate\Support\Facades\Queue;
use Illuminate\Support\ServiceProvider;
use Illuminate\Queue\Events\JobProcessed;
use Illuminate\Queue\Events\JobProcessing;
class AppServiceProvider extends ServiceProvider
{
/**
* 啟動任意服務。
@return void
*/
public function boot()
{
Queue::before(function (JobProcessing $event) {
// $event->connectionName
// $event->job
// $event->job->payload()
});
Queue::after(function (JobProcessed $event) {
// $event->connectionName
// $event->job
// $event->job->payload()
});
}
/**
* 註冊服務提供者。
*
* @return void
*/
public function register()
{
//
}
}
```
laravel5.5隊列