1. 程式人生 > >nodejs Async 詳解

nodejs Async 詳解

為了適應非同步程式設計,減少回撥的巢狀,我嘗試了很多庫。最終覺得還是async最靠譜。

Async的內容分為三部分:

  1. 流程控制:簡化十種常見流程的處理
  2. 集合處理:如何使用非同步操作處理集合中的資料
  3. 工具類:幾個常用的工具類

本文介紹其中最簡單最常用的流程控制部分。

由於nodejs是非同步程式設計模型,有一些在同步程式設計中很容易做到的事情,現在卻變得很麻煩。Async的流程控制就是為了簡化這些操作。

1. series(tasks, [callback]) (多個函式依次執行,之間沒有資料交換)

有多個非同步函式需要依次呼叫,一個完成之後才能執行下一個。各函式之間沒有資料的交換,僅僅需要保證其執行順序。這時可使用series。

純js程式碼:

step1(function(err, v1){
  step2(function(err, v2){
    step3(function(err, v3){// do somethig with the err or values v1/v2/v3}}});

從中可以看到這巢狀還是比較多深的,如果再多幾步,會更深。在程式碼中忽略對了每一層err的處理,否則還都等加上 if(err) return callback(err),那就更麻煩了。

對於這種情況,使用async來處理,就是這樣的:

var async =require('async')
async
.series([ step1, step2, step3 ],function(err, values){// do somethig with the err or values v1/v2/v3});

可以看到程式碼簡潔了很多,而且自動處理每個回撥中的錯誤。當然,這裡只給出來最最簡單的例子,在實際中,我們常會在每個step中執行一些操作,這時可寫成:

var async =require('async')
async.series([function(cb){ step1(function(err,v1){// do something with v1
     cb(err,
v1);}),function(cb){ step2(...)},function(cb){ step3(...)}],function(err, values){// do somethig with the err or values v1/v2/v3});

該函式的詳細解釋為:

  1. 依次執行一個函式陣列中的每個函式,每一個函式執行完成之後才能執行下一個函式。
  2. 如果任何一個函式向它的回撥函式中傳了一個error,則後面的函式都不會被執行,並且將會立刻會將該error以及已經執行了的函式的結果,傳給series中最後那個callback。
  3. 當所有的函式執行完後(沒有出錯),則會把每個函式傳給其回撥函式的結果合併為一個數組,傳給series最後的那個callback。
  4. 還可以json的形式來提供tasks。每一個屬性都會被當作函式來執行,並且結果也會以json形式傳給series最後的那個callback。這種方式可讀性更高一些。

其程式碼中還包含了:

  1. 如果中間某個函數出錯,series函式如何處理
  2. 如果某個函式傳給回撥的值為undefined, null, {}, []等,series如何處理

另外還需要注意的是:多個series呼叫之間是不分先後的,因為series本身也是非同步呼叫。

2. parallel(tasks, [callback]) (多個函式並行執行)

並行執行多個函式,每個函式都是立即執行,不需要等待其它函式先執行。傳給最終callback的陣列中的資料按照tasks中宣告的順序,而不是執行完成的順序。

如果某個函數出錯,則立刻將err和已經執行完的函式的結果值傳給parallel最終的callback。其它未執行完的函式的值不會傳到最終資料,但要佔個位置。

同時支援json形式的tasks,其最終callback的結果也為json形式。

示例程式碼:

async.parallel([function(cb){ t.fire('a400', cb,400)},function(cb){ t.fire('a200', cb,200)},function(cb){ t.fire('a300', cb,300)}],function(err, results){
    log('1.1 err: ', err);// -> undefined
    log('1.1 results: ', results);// ->[ 'a400', 'a200', 'a300' ]});

中途出錯的示例:

async.parallel([function(cb){ log('1.2.1: ','start'); t.fire('a400', cb,400)},// 該函式的值不會傳給最終callback,但要佔個位置function(cb){ log('1.2.2: ','start'); t.err('e200', cb,200)},function(cb){ log('1.2.3: ','start'); t.fire('a100', cb,100)}],function(err, results){
    log('1.2 err: ', err);// -> e200
    log('1.2 results: ', results);// -> [ , undefined, 'a100' ]});

以json形式傳入tasks

async.parallel({
    a:function(cb){ t.fire('a400', cb,400)},
    b:function(cb){ t.fire('c300', cb,300)}},function(err, results){
    log('1.3 err: ', err);// -> undefined
    log('1.3 results: ', results);// -> { b: 'c300', a: 'a400' }});

3. waterfall(tasks, [callback]) (多個函式依次執行,且前一個的輸出為後一個的輸入)

與seires相似,按順序依次執行多個函式。不同之處,每一個函式產生的值,都將傳給下一個函式。如果中途出錯,後面的函式將不會被執行。錯誤資訊以及之前產生的結果,將傳給waterfall最終的callback。

這個函式名為waterfall(瀑布),可以想像瀑布從上到下,中途衝過一層層突起的石頭。注意,該函式不支援json格式的tasks。

async.waterfall([function(cb){ log('1.1.1: ','start'); cb(null,3);},function(n, cb){ log('1.1.2: ',n); t.inc(n, cb);},function(n, cb){ log('1.1.3: ',n); t.fire(n*n, cb);}],function(err, result){
    log('1.1 err: ', err);// -> null
    log('1.1 result: ', result);// -> 16});

4. auto(tasks, [callback]) (多個函式有依賴關係,有的並行執行,有的依次執行)

用來處理有依賴關係的多個任務的執行。比如某些任務之間彼此獨立,可以並行執行;但某些任務依賴於其它某些任務,只能等那些任務完成後才能執行。

雖然我們可以使用async.parallel和async.series結合起來實現該功能,但如果任務之間關係複雜,則程式碼會相當複雜,以後如果想新增一個新任務,也會很麻煩。這時使用async.auto,則會事半功倍。

如果有任務中途出錯,則會把該錯誤傳給最終callback,所有任務(包括已經執行完的)產生的資料將被忽略。

這裡假設我要寫一個程式,它要完成以下幾件事:

  1. 從某處取得資料
  2. 在硬碟上建立一個新的目錄
  3. 將資料寫入到目錄下某檔案
  4. 傳送郵件,將檔案以附件形式傳送給其它人。

分析該任務,可以知道1與2可以並行執行,3需要等1和2完成,4要等3完成。

async.auto({
    getData:function(callback){
        setTimeout(function(){
            console.log('1.1: got data');
            callback();},300);},
    makeFolder:function(callback){
        setTimeout(function(){
            console.log('1.1: made folder');
            callback();},200);},
    writeFile:['getData','makeFolder',function(callback){
        setTimeout(function(){
            console.log('1.1: wrote file');
            callback(null,'myfile');},300);}],
    emailFiles:['writeFile',function(callback, results){
        log('1.1: emailed file: ', results.writeFile);// -> myfile
        callback(null, results.writeFile);}]},function(err, results){
    log('1.1: err: ', err);// -> null
    log('1.1: results: ', results);// -> { makeFolder: undefined,//      getData: undefined,//      writeFile: 'myfile',//      emailFiles: 'myfile' }});

5. whilst(test, fn, callback)(用可於非同步呼叫的while)

相當於while,但其中的非同步呼叫將在完成後才會進行下一次迴圈。舉例如下:

var count1 =0;
async.whilst(function(){return count1 <3},function(cb){
        log('1.1 count: ', count1);
        count1++;
        setTimeout(cb,1000);},function(err){// 3s have passed
        log('1.1 err: ', err);// -> undefined});

它相當於:

try{
  whilst(test){
    fn();}
  callback();}catch(err){
  callback(err);}

該函式的功能比較簡單,條件變數通常定義在外面,可供每個函式訪問。在迴圈中,非同步呼叫時產生的值實際上被丟棄了,因為最後那個callback只能傳入錯誤資訊。

另外,第二個函式fn需要能接受一個函式cb,這個cb最終必須被執行,用於表示出錯或正常結束。

6. until(test, fn, callback) (與while相似,但判斷條件相反)

var count4 =0;
async.until(function(){return count4>3},function(cb){
        log('1.4 count: ', count4);
        count4++;
        setTimeout(cb,200);},function(err){// 4s have passed
        log('1.4 err: ',err);// -> undefined});

當第一個函式條件為false時,繼續執行第二個函式,否則跳出。

7. queue (可設定worker數量的佇列)

queue相當於一個加強版的parallel,主要是限制了worker數量,不再一次性全部執行。當worker數量不夠用時,新加入的任務將會排隊等候,直到有新的worker可用。

該函式有多個點可供回撥,如worker用完時、無等候任務時、全部執行完時等。

定義一個queue,其worker數量為2,並在任務執行時,記錄一下日誌:

var q = async.queue(function(task, callback){
    log('worker is processing task: ', task.name);
    task.run(callback);},2);

worker數量將用完時,會呼叫saturated函式:

q.saturated =