1. 程式人生 > >node的async流程控制包parallel的實現

node的async流程控制包parallel的實現

Async介紹

Async是一個流程控制工具包,提供了直接而強大的非同步功能。基於Javascript為Node.js設計,同時也可以直接在瀏覽器中使用。

Async提供了大約20個函式,包括常用的map, reduce, filter, forEach 等,非同步流程控制模式包括,序列(series),並行(parallel),瀑布(waterfall)等。

Async函式介紹

基於async的0.2.9版本。

async主要實現了三個部分的流程控制功能:

集合: Collections

流程控制: Control Flow

工具類: Utils

1). 集合: Collections

each: 如果想對同一個集合中的所有元素都執行同一個非同步操作。

map: 對集合中的每一個元素,執行某個非同步操作,得到結果。所有的結果將彙總到最終的callback裡。與each的區別是,each只關心操作不管最後的值,而map關心的最後產生的值。

filter: 使用非同步操作對集合中的元素進行篩選, 需要注意的是,iterator的callback只有一個引數,只能接收true或false。

reject: reject跟filter正好相反,當測試為true時則拋棄

reduce: 可以讓我們給定一個初始值,用它與集合中的每一個元素做運算,最後得到一個值。reduce從左向右來遍歷元素,如果想從右向左,可使用reduceRight。

detect: 用於取得集合中滿足條件的第一個元素。

sortBy: 對集合內的元素進行排序,依據每個元素進行某非同步操作後產生的值,從小到大排序。

some: 當集合中是否有至少一個元素滿足條件時,最終callback得到的值為true,否則為false.

every: 如果集合裡每一個元素都滿足條件,則傳給最終回撥的result為true,否則為false

concat: 將多個非同步操作的結果合併為一個數組。

2). 流程控制: Control Flow

series: 序列執行,一個函式陣列中的每個函式,每一個函式執行完成之後才能執行下一個函式。

parallel: 並行執行多個函式,每個函式都是立即執行,不需要等待其它函式先執行。傳給最終callback的陣列中的資料按照tasks中宣告的順序,而不是執行完成的順序。

whilst: 相當於while,但其中的非同步呼叫將在完成後才會進行下一次迴圈。

doWhilst: 相當於do…while,doWhilst交換了fn,test的引數位置,先執行一次迴圈,再做test判斷。

until: until與whilst正好相反,當test為false時迴圈,與true時跳出。其它特性一致。

doUntil: doUntil與doWhilst正好相反,當test為false時迴圈,與true時跳出。其它特性一致。

forever: 無論條件迴圈執行,如果不出錯,callback永遠不被執行。

waterfall: 按順序依次執行一組函式。每個函式產生的值,都將傳給下一個。

compose: 建立一個包括一組非同步函式的函式集合,每個函式會消費上一次函式的返回值。把f(),g(),h()非同步函式,組合成f(g(h()))的形式,通過callback得到返回值。

applyEach: 實現給一陣列中每個函式傳相同引數,通過callback返回。如果只傳第一個引數,將返回一個函式物件,我可以傳參呼叫。

queue: 是一個序列的訊息佇列,通過限制了worker數量,不再一次性全部執行。當worker數量不夠用時,新加入的任務將會排隊等候,直到有新的worker可用。

cargo: 一個序列的訊息佇列,類似於queue,通過限制了worker數量,不再一次性全部執行。不同之處在於,cargo每次會載入滿額的任務做為任務單元,只有任務單元中全部執行完成後,才會載入新的任務單元。

auto: 用來處理有依賴關係的多個任務的執行。

iterator: 將一組函式包裝成為一個iterator,初次呼叫此iterator時,會執行定義中的第一個函式並返回第二個函式以供呼叫。

apply: 可以讓我們給一個函式預繫結多個引數並生成一個可直接呼叫的新函式,簡化程式碼。

nextTick: 與nodejs的nextTick一樣,再最後呼叫函式。

times: 非同步執行,times可以指定呼叫幾次,並把結果合併到陣列中返回

timesSeries: 與time類似,唯一不同的是同步執行

3). 工具類: Utils

memoize: 讓某一個函式在記憶體中快取它的計算結果。對於相同的引數,只計算一次,下次就直接拿到之前算好的結果。

unmemoize: 讓已經被快取的函式,返回不快取的函式引用。

log: 執行某非同步函式,並記錄它的返回值,日誌輸出。

dir: 與log類似,不同之處在於,會呼叫瀏覽器的console.dir()函式,顯示為DOM檢視。

noConflict: 如果之前已經在全域性域中定義了async變數,當匯入本async.js時,會先把之前的async變數儲存起來,然後覆蓋它。僅僅用於瀏覽器端,在nodejs中沒用,這裡無法演示。

4. async_demo使用介紹

詳細使用請參考github原始碼:https://github.com/bsspirit/async_demo

每個函式的用法,有非常詳細的例項!!

5. 場景:對資料庫的連續操作

這個場景進背景情況,請參考文章:用Nodejs連線MySQL

原場景中,對資料序列操作,增刪改查(CRUD),程式碼如下:

var mysql = require('mysql');
var conn = mysql.createConnection({
    host: 'localhost',
    user: 'nodejs',
    password: 'nodejs',
    database: 'nodejs',
    port: 3306
});
conn.connect();

var insertSQL = 'insert into t_user(name) values("conan"),("fens.me")';
var selectSQL = 'select * from t_user limit 10';
var deleteSQL = 'delete from t_user';
var updateSQL = 'update t_user set name="conan update"  where name="conan"';

//delete
conn.query(deleteSQL, function (err0, res0) {
    if (err0) console.log(err0);
    console.log("DELETE Return ==> ");
    console.log(res0);

    //insert
    conn.query(insertSQL, function (err1, res1) {
        if (err1) console.log(err1);
        console.log("INSERT Return ==> ");
        console.log(res1);

        //query
        conn.query(selectSQL, function (err2, rows) {
            if (err2) console.log(err2);

            console.log("SELECT ==> ");
            for (var i in rows) {
                console.log(rows[i]);
            }

            //update
            conn.query(updateSQL, function (err3, res3) {
                if (err3) console.log(err3);
                console.log("UPDATE Return ==> ");
                console.log(res3);

                //query
                conn.query(selectSQL, function (err4, rows2) {
                    if (err4) console.log(err4);

                    console.log("SELECT ==> ");
                    for (var i in rows2) {
                        console.log(rows2[i]);
                    }
                });
            });
        });
    });
});

//conn.end();

為了實現了序列操作,所有的呼叫都是在callback中實現的,5層巢狀結構。這種程式碼已經變得不可以維護了。所以,需要用async庫,對上面的程式碼結構進行重寫!

修改後的程式碼

var mysql = require('mysql');
var async = require('async');

var conn = mysql.createConnection({
    host: 'localhost',
    user: 'nodejs',
    password: 'nodejs',
    database: 'nodejs',
    port: 3306
});

var sqls = {
    'insertSQL': 'insert into t_user(name) values("conan"),("fens.me")',
    'selectSQL': 'select * from t_user limit 10',
    'deleteSQL': 'delete from t_user',
    'updateSQL': 'update t_user set name="conan update"  where name="conan"'
};

var tasks = ['deleteSQL', 'insertSQL', 'selectSQL', 'updateSQL', 'selectSQL'];
async.eachSeries(tasks, function (item, callback) {
    console.log(item + " ==> " + sqls[item]);
    conn.query(sqls[item], function (err, res) {
        console.log(res);
        callback(err, res);
    });
}, function (err) {
    console.log("err: " + err);
});

控制檯輸出

deleteSQL ==> delete from t_user
{ fieldCount: 0,
  affectedRows: 0,
  insertId: 0,
  serverStatus: 34,
  warningCount: 0,
  message: '',
  protocol41: true,
  changedRows: 0 }
insertSQL ==> insert into t_user(name) values("conan"),("fens.me")
{ fieldCount: 0,
  affectedRows: 2,
  insertId: 45,
  serverStatus: 2,
  warningCount: 0,
  message: '&Records: 2  Duplicates: 0  Warnings: 0',
  protocol41: true,
  changedRows: 0 }
selectSQL ==> select * from t_user limit 10
[ { id: 45,
    name: 'conan',
    create_date: Fri Sep 13 2013 12:24:51 GMT+0800 (中國標準時間) },
  { id: 46,
    name: 'fens.me',
    create_date: Fri Sep 13 2013 12:24:51 GMT+0800 (中國標準時間) } ]
updateSQL ==> update t_user set name="conan update"  where name="conan"
{ fieldCount: 0,
  affectedRows: 1,
  insertId: 0,
  serverStatus: 2,
  warningCount: 0,
  message: '(Rows matched: 1  Changed: 1  Warnings: 0',
  protocol41: true,
  changedRows: 1 }
selectSQL ==> select * from t_user limit 10
[ { id: 45,
    name: 'conan update',
    create_date: Fri Sep 13 2013 12:24:51 GMT+0800 (中國標準時間) },
  { id: 46,
    name: 'fens.me',
    create_date: Fri Sep 13 2013 12:24:51 GMT+0800 (中國標準時間) } ]
err: null

程式碼一下讀性就增強了許多倍,這就是高效的開發。

以上原文出自:

處理業務邏輯時,有這樣的需求:需要執行幾個非同步函式funcA, funcB funcC,全部執行完之後,再執行funcD,這裡對於funcA funcB funcC的執行順序沒有要求,它們執行完之後彙總再執行funcD。正好可以使用async. Parallel的函式

函式介面很簡潔,但是卻實現了這樣的功能,那麼問題來了,它是如何實現這樣的功能的呢?

看了下github上的實現,實現比較隱晦,大致上應該是這樣的流程:

Var Parallel = function(array, callback) {

         Varcompleted = 0;

         執行完一次非同步函式後,如果completed大於等於array.length了,那麼就直接呼叫callback返回;如果completed小於array.length了,那麼就將completed加一

}

總體上來講,我們雖然能夠自己在業務邏輯中實現的這樣的程式碼,但是卻嚴重的降低了程式碼可讀性和可維護性。

再有,像這樣的“回撥黑洞”出現,是不是很無語!!

doAsync1(function() {

  doAsync2(function () {

    doAsync3(function () {

      doAsync4(function () {

    })

  })

})

我們需要doAsync1 doAsync2 doAsync3 doAsync4這四個非同步函式能夠順序的執行。就像巢狀太多的程式碼,有時候也沒什麼問題。為了控制呼叫順序,非同步程式碼變得非常複雜,這就是黑洞。有個問題非常合適衡量黑洞到底有多深:如果doAsync2發生在doAsync1之前,你要忍受多少重構的痛苦?目標不單單是減少巢狀層數,而是要編寫模組化(可測試)的程式碼,便於理解和修改。

使用async.eachSeries,難道不是神清氣爽嗎。不過話說eachSeries最終也是轉為了回撥鏈實現的吧,只不過是對你遮蔽了實現的細節而已。