1. 程式人生 > >基於ThinkPHP使用Beanstalk的使用 入門篇(一)

基於ThinkPHP使用Beanstalk的使用 入門篇(一)

Beanstalk,一個高效能、輕量級的分散式記憶體佇列系統,最初設計的目的是想通過後臺非同步執行耗時的任務來降低高容量Web應用系統的頁面訪問延遲,支援過有9.5 million使用者的Facebook Causes應用。後來開源,現在有PostRank大規模部署和使用,每天處理百萬級任務。Beanstalkd是典型的類Memcached設計,協議和使用方式都是同樣的風格,所以使用過memcached的使用者會覺得Beanstalkd似曾相識。

高效能離不開非同步,非同步離不開佇列,而其內部都是Producer-Consumer模式的原理。

Beanstalkd設計裡面的核心概念:

◆ job

一個需要非同步處理的任務,是Beanstalkd中的基本單元,需要放在一個tube中。

◆ tube

一個有名的任務佇列,用來儲存統一型別的job,是producer和consumer操作的物件。

◆ producer

Job的生產者,通過put命令來將一個job放到一個tube中。

◆ consumer

Job的消費者,通過reserve/release/bury/delete命令來獲取job或改變job的狀態。

Beanstalkd中一個job的生命週期。一個job有READY, RESERVED, DELAYED, BURIED四種狀態。當producer直接put一個job時,job就處於READY狀態,等待consumer來處理,如果選擇延遲put,job就先到DELAYED狀態,等待時間過後才遷移到READY狀態。consumer獲取了當前READY的job後,該job的狀態就遷移到RESERVED,這樣其他的consumer就不能再操作該job。當consumer完成該job後,可以選擇delete, release或者bury操作;delete之後,job從系統消亡,之後不能再獲取;release操作可以重新把該job狀態遷移回READY(也可以延遲該狀態遷移操作),使其他的consumer可以繼續獲取和執行該job;有意思的是bury操作,可以把該job休眠,等到需要的時候,再將休眠的job kick回READY狀態,也可以delete BURIED狀態的job。正是有這些有趣的操作和狀態,才可以基於此做出很多意思的應用,比如要實現一個迴圈佇列,就可以將RESERVED狀態的job休眠掉,等沒有READY狀態的job時再將BURIED狀態的job一次性kick回READY狀態。

Beanstalkd基於的原始碼安裝和使用很簡單,在此略過。這裡重點介紹一下其幾個很nice的特性。

◆ 優先順序

支援0到2**32的優先順序,值越小,優先順序越高,預設優先順序為1024。

◆ 持久化

可以通過binlog將job及其狀態記錄到檔案裡面,在Beanstalkd下次啟動時可以通過讀取binlog來恢復之前的job及狀態。

◆ 分散式容錯

分散式設計和Memcached類似,beanstalkd各個server之間並不知道彼此的存在,都是通過client來實現分散式以及根據tube名稱去特定server獲取job。

◆ 超時控制

為了防止某個consumer長時間佔用任務但不能處理的情況,Beanstalkd為reserve操作設定了timeout時間,如果該consumer不能在指定時間內完成job,job將被遷移回READY狀態,供其他consumer執行。

Beanstalkd不足之處:在使用中發現一個Beanstalkd尚無提供刪除一個tube的操作,只能將tube的job依次刪除,並讓Beanstalkd來自動刪除空tube。還有就是Beanstalkd不支援客戶端認證機制(開發者將應用場景定位在區域網)。

 

下面,我們通過php類Pheanstalk來進行簡單入門。

1. composer安裝

切換到專案目錄,使用composer進行載入安裝:composer require pda/pheanstalk

(如出現報錯,請更新composer在下載:composer update)

2. 連線Beanstalk

--- 生產者 ---

關於如何安裝Beanstalk,在這裡就不進行說明了,操作很簡單,請自行在執行系統安裝好,才能使用。

作者是使用 4.0 版本的類庫

連線服務

注意的是,在4.0之前的版本,是直接使用 new Pheanstalk($host, $port, ...) 方式連線的,4.0版本使用 create 靜態方法進行連線。

連線成功,我們來完成一個簡單測試,“像管道demo中,put進值”,如

檢視demo管道資訊

這樣,生產者就完成任務放進管道的操作。

3. 如何處理管道任務

--- 消費者 ---

我們建立一個消費者的方法,通過while來處理管道每次產生的任務(這是模擬,實際生產中,下一篇文章會詳細說明),如:

以下的程式碼是監聽管道demo,並將任務取出來處理

當我們在終端執行時,我看到每次處理任務的資料,如:

整個生產者到消費者的流程就是這樣,至於具體業務,需要根據自身的情況進行處理。

入門篇(一)完!

 

以下是整理了一些常用方法,僅供參考(來源於網上,感謝網友的提供)

//----------------------------------------維護類----------------------------------

//1.檢視目前pheanStalkd狀態資訊
//print_r($ph->stats()); 

//2.顯示目前存在的管道
//print_r($ph->listTubes()); 

//3.檢視NewUsers管道的資訊
//$ph->useTube('NewUsers')->put('test'); 
//$ph->useTube('NewUsers')->put('up'); //4.向NewUsers管道新增一個up任務
//print_r($ph->statsTube('NewUsers'));//3.檢視NewUsers管道的資訊

//6.檢視指定管道中某一個任務的情況
//$job = $ph->watch('NewUsers')->reserve(); //5.從管道中取出任務(消費)
//print_r($ph->statsJob($job)); //6.檢視指定管道中某一個任務的情況

//7.檢視任務id為1的任務詳情
//$job = $ph->peek(1);7.直接取出任務id為1的任務 [注:beanstalkd中所有任務的id都具有唯一性] 
//print_r($ph->statsJob($job));//檢視任務id為1的任務詳情


//----------------------------------------生產類--------------------------------------

////第一種 put()

//$tube = $ph->useTube('NewUsers');//連線NewUsers管道
//print_r($tube->put('four'));//向NewUsers管道新增任務four,並返回結果
//注: put()方法還有3個可選引數(依次為: 優先順序priority,延遲時間delay,任務超時重發ttr)

////第二種 putInTube() [注: putInTube()就是對useTube()和put()的封裝]
//$res = $ph->putInTube('NewUsers','three');//向NewUsers管道新增任務three
////注: putInTube()方法還有3個可選引數(依次為: 優先順序priority,延遲時間delay,任務超時重發ttr)
//print_r($res);//返回任務id

//print_r($ph->statsTube('NewUsers'));//檢視NewUsers管道的詳細情況


//---------------------------------------消費類--------------------------------------

// 1.watch 監聽NewUsers管道 [ 注: watch()同樣可以監聽多個管道 ]
//$tube = $ph->watch('NewUsers');
//print_r($ph->listTubesWatched());//列印已經監聽的管道


// 2.watch 監聽多個管道
//$tube = $ph->watch('NewUsers')
//           ->watch('default');
//print_r($ph->listTubesWatched());//列印已經監聽的管道


// 3.ignore 監聽NewUsers管道,忽略default管道
//$tube = $ph->watch('NewUsers')
//            ->ignore('default');
//print_r($ph->listTubesWatched());//列印已經監聽的管道


// 4.reserve 監聽NewUsers管道,並且取出任務
//$job = $ph->watch('NewUsers')
//          ->reserve();
//
////注reserve()有1個引數,阻塞的時間,過了阻塞時間,不管有沒有東西,直接返回
//
//var_dump($job);//列印已經取出的任務
//$ph->delete($job);//刪除已經取出的任務


// 5.putInTube/put 向NewUsers管道寫入任務 [ 注:此為生產者方法,放到此處是為了方便理解 ]
//$ph->putInTube('NewUsers','number_1',5);
//$ph->putInTube('NewUsers','number_2',3);
//$ph->putInTube('NewUsers','number_3',0);
//$ph->putInTube('NewUsers','number_4',4);
//print_r($ph->statsTube('NewUsers'));//5.檢視NewUsers管道詳細資訊


// 6.release 將取出的任務放回ready狀態,還有2個引數(優先順序和延遲)
//$job = $ph->watch('NewUsers')->reserve();//6.監聽NewUsers管道,並取出任務

//if (true) {
//    sleep(30);
//    $ph->release($job);//6.將任務取出之後,停留30秒,然後將任務狀態重新變為ready
//} else {
//    $ph->delete($job);
//}


// 7.bury (預留) 將任務取出之後,發現後面執行的邏輯不成熟(比如發郵件,突然發現郵件伺服器掛掉了),
//或者說還不能執行後面的邏輯,需要把任務先封存起來,等待時機成熟了,再拿出這個任務進行消費

//$job = $ph->watch('NewUsers')->reserve();//取出任務
//$ph->bury($job);//取出任務後,將任務放到一邊(預留)

// 8.peekBuried() 將處在bury狀態的任務讀取出來
//$job = $ph->peekBuried('NewUsers');//將NewUsers管道中處在bury狀態的任務讀取出來
//var_dump($ph->statsJob($job));//列印任務狀態(此時任務狀態應該是bury)

// 9.kickJob() 將處在bury任務狀態的任務轉化為ready狀態
//$job = $ph->peekBuried('NewUsers');//將NewUsers管道中處在bury狀態的任務讀取出來
//$ph->kickJob($job);

// 10.kick()  將處在bury任務狀態的任務轉化為ready狀態,有第二個引數int, 批量將任務id小於此數值的任務轉化為ready
//$ph->useTube('NewUsers')->kick(65);//把NewUsers管道中任務id小於65,並且任務狀態處於bury的任務全部轉化為ready

// 11.peekReady() 將管道中處於ready狀態的任務讀出來
//$job = $ph->peekReady('NewUser');//將NewUser管道中處於ready狀態的任務讀取出來
//var_dump($job);
//$ph->delete($job);

// 12.peekDelay() 將管道中所有處於delay狀態的任務讀取出來
//$job = $ph->peekDelayed('NewUser');
//var_dump($job);
//$ph->delete($job);


// 13.pauseTube() 對整個管道進行延遲設定,讓管道處於延遲狀態
//$ph->pauseTube('NewUser',10);//設定管道NewUser延遲時間為10s
//$job = $ph->watch('NewUser')->reserve();//監聽NewUser管道,並取出任務
//var_dump($job);

// 14.resumeTube() 恢復管道,讓管道處於不延遲狀態,立即被消費
//$ph->resumeTube('NewUser');//取消管道NewUser的延遲狀態,變為立即讀取
//$job = $ph->watch('NewUser')->reserve();//監聽NewUser管道,並取出任務
//var_dump($job);

// 15.touch() 讓任務重新計算任務超時重發ttr時間,相當於給任務延長壽命