1. 程式人生 > 實用技巧 >如何使用Nodejs進行批量下載

如何使用Nodejs進行批量下載

0x1 Nodejs登場

Nodejs是一款基於穀人希的V8引擎開發javascript執行環境。在高效能的V8引擎以及事件驅動的單執行緒非同步非阻塞執行模型的支援下,Nodejs實現的web服務可以在沒有Nginx的http伺服器做反向代理的情況下實現很高的業務併發量(當然了配合Nginx食用風味更佳)。

0x2 準備工作

現在我們假設你的爬蟲已經幫你爬到了一堆圖片的連結,然後你的nodejs指令碼以某種方式(接收post http請求,程序間通訊,讀寫檔案或資料庫等等。。。)獲得了這些連結,這裡我用某款大型角色扮演網路遊戲的官網上提供的桌布連結為例子(這裡似乎並沒有為一款運營10年經久不衰的遊戲打廣告的意思,僅僅只是情懷溢位。。。)

(function() {
  "use strict";
  const urlList = [
    "http://content.battlenet.com.cn/wow/media/wallpapers/patch/fall-of-the-lich-king/fall-of-the-lich-king-1920x1080.jpg",
    "http://content.battlenet.com.cn/wow/media/wallpapers/patch/black-temple/black-temple-1920x1200.jpg",
    "http://content.battlenet.com.cn/wow/media/wallpapers/patch/zandalari/zandalari-1920x1200.jpg",
    "http://content.battlenet.com.cn/wow/media/wallpapers/patch/rage-of-the-firelands/rage-of-the-firelands-1920x1200.jpg",
    "http://content.battlenet.com.cn/wow/media/wallpapers/patch/fury-of-hellfire/fury-of-hellfire-3840x2160.jpg",
  ];
})();

我們可以對urlList執行一個遍歷來依次下載這些圖片,確切的說是依次啟動下載這些連結的任務。

(function() {
  //略...

  var startDownloadTask = function(imgSrc, dirName, index) {
    //TODO: startDownloadTask
  }

  urlList.forEach(function(item, index, array) {
    startDownloadTask(item, './', index);
  })
})();

startDownloadTask這個函式就是用來下載這些圖片的。其中imgSrc

是圖片的連結,dirName是我們存放下載後的圖片的路徑,index是圖片連結在列表中的序號。我們在這個函式中,會呼叫Nodejs的系統Apihttp.request來完成下載工作,由於該Api和大多數Nodejs的Api一樣是非同步非阻塞模式,所以startDownloadTask函式呼叫該Api後不會等待下載完成,就會立即返回。在下載的過程中,以及完成之後,或者發生異常時,系統會呼叫http.request的回掉函式來做相應的處理。我們接下來會看到該Api的詳細宣告和用法,在瞭解了該Api的使用方法之後,就可以用它來實現startDownloadTask函式。

0x3 http.request的宣告和使用方法

我們在Nodejs的官方文件上可以找到http.request的完整宣告和各個引數的說明。它的宣告如下:

http.request(options[, callback])

其中options可以是帶有請求的目的地址的一條字串,亦可以是一系用於發起請求的列詳細引數,用於對請求進行更精確的控制。我們現在暫時不需要這些精確的引數控制,直接傳入圖片的連結就可以。

至於callback引數就是剛才說到的回撥函式,這是個非常重要的函式,圖片下載下來後能否存入我們指定的位置可全靠它。這個回撥函式會接受一個入參,文件中對這個入參沒有詳細說明,通過後面的例子我們發現,這個叫res的入參監聽了兩個事件,分別是dataend事件,並且還有一個setEncoding方法,並且還有statusCodeheaders兩個成員屬性。熟悉Nodejs Api的同學不難猜出,這個res其實是一個stream.Readable型別的子類的變數,那兩個事件監聽和setEncoding方法就是繼承自這個型別,而那兩個成員屬性是子類擴充套件的。這並沒有什麼意外的,在其他語言的類庫中,http請求Api返回一個可讀資料流是很常見的做法。仔細閱讀文件的其他部分後可以發現,這個res的真實型別是http.IncomingMessage。這裡不得不對這種不寫明每個引數的型別的文件提出批評,像javascript這種動態弱型別指令碼語言,開發者要想知道一個Api各個引數和返回值有可能是什麼型別,拿過來怎麼處理可全靠文件啊。

介紹完了入參,再來看看http.request會返回什麼。文件中說它會返回一個http.ClientRequest型別的變數,這個變數可以接受error事件,來對請求異常的情況進行處理。

剛才說過,這個Api是一個非同步介面,呼叫這個Api之後會立即返回一個http.ClientRequest型別變數,假設變數名為req。但這時候不會馬上發起請求。我們這時候可以設定reqerror事件的監聽回撥,如果是POST請求的話,還可以呼叫req.write方法來設定請求訊息體,然後呼叫req.end方法來結束此次請求的傳送過程。當收到響應時(嚴格的說是確認接收完響應頭時),就會呼叫callback回撥函式,在這個回撥函式中,可以通過讀取res.statusCoderes.headers獲取響應的返回狀態碼和頭部資訊,其中頭部資訊包含了重要的欄位content-length,表示響應訊息體的總長度。由於響應訊息體可能很長,服務端需要把訊息體拆分成多個tcp封包來發送,客戶端在接收到tcp封包後還要進行訊息體的重組,所以這裡採用一個數據流物件來對返回的訊息體做讀取操作,需要註冊dataend事件監聽,分別處理鏈路層緩衝區接收了若干位元組的訊息體封包並且拼接完成回撥上層協議處理和tcp連線拆線時的事務。

Api聲明後面附帶了一個例子,比較簡單不難看懂,這裡就不詳細說了。

0x4 實現startDownloadTask

瞭解了http.request的基本使用方法,以及看過例子之後,我們很快就能寫出一個簡單的下載過程了:

(function() {
  "use strict";
  const http = require("http");

  //略...

  function getHttpReqCallback(imgSrc, dirName, index) {
    var callback = function(res) {
      // TODO: callback回撥函式實現
    };

    return callback;
  }

  var startDownloadTask = function(imgSrc, dirName, index) {
    var req = http.request(imgSrc, getHttpReqCallback(imgSrc, dirName, index));
    req.on('error', function(e){});
    req.end();
  }

  //略
})();

我暫且先忽略了請求的錯誤處理。這裡需要講解的是函式getHttpReqCallback,這個函式本身不是回撥函式,在呼叫http.request時會先呼叫它,它返回了一個閉包callback,作為http.request的回撥函式。我很快會解釋為什麼需要這樣寫。

接下來我們來實現這個回撥函式:

(function() {
  "use strict";
  const http = require("http");
  const fs = require("fs");
  const path = require("path");
  //略...

  function getHttpReqCallback(imgSrc, dirName, index) {
    var fileName = index + "-" + path.basename(imgSrc);
    var callback = function(res) {      
      var fileBuff = [];
      res.on('data', function (chunk) {
        var buffer = new Buffer(chunk);
        fileBuff.push(buffer);
      });
      res.on('end', function() {
        var totalBuff = Buffer.concat(fileBuff);      
        fs.appendFile(dirName + "/" + fileName, totalBuff, function(err){});
      });        
    };

    return callback;
  }

  //略
})();

這裡的callback函式的邏輯目前為止還不是很複雜,resdata事件的回撥函式中,chunk引數是從可讀資料流中讀出的資料,將其轉換為Buffer物件後插入fillBuff陣列以待後用。

resend事件意味著鏈路層連結拆除,資料接收完畢,在該事件的回撥中,我們通過Buffer.concat函式,將fileBuff中的所有Buffer物件依次重組為一個新的Buffer物件totalBuff,該物件既是接收到的完整的資料。之後通過fs.appendFile函式將totalBuff存入磁碟,存放路徑為dirName + "/" + fileName

於是我們就有了一個完整的勉強可以工作的指令碼,完整的指令碼程式碼如下:

(function() {
  "use strict";
  const fs = require("fs");
  const http = require("http");
  const path = require("path");

  const urlList = [
    "http://content.battlenet.com.cn/wow/media/wallpapers/patch/fall-of-the-lich-king/fall-of-the-lich-king-1920x1080.jpg",
    "http://content.battlenet.com.cn/wow/media/wallpapers/patch/black-temple/black-temple-1920x1200.jpg",
    "http://content.battlenet.com.cn/wow/media/wallpapers/patch/zandalari/zandalari-1920x1200.jpg",
    "http://content.battlenet.com.cn/wow/media/wallpapers/patch/rage-of-the-firelands/rage-of-the-firelands-1920x1200.jpg",
    "http://content.battlenet.com.cn/wow/media/wallpapers/patch/fury-of-hellfire/fury-of-hellfire-3840x2160.jpg",
  ];

  function getHttpReqCallback(imgSrc, dirName, index) {
    var fileName = index + "-" + path.basename(imgSrc);
    var callback = function(res) {      
      var fileBuff = [];
      res.on('data', function (chunk) {
        var buffer = new Buffer(chunk);
        fileBuff.push(buffer);
      });
      res.on('end', function() {
        var totalBuff = Buffer.concat(fileBuff);
        fs.appendFile(dirName + "/" + fileName, totalBuff, function(err){});
      });
    };
    return callback;
  }

  var startDownloadTask = function(imgSrc, dirName, index) {
    var req = http.request(imgSrc, getHttpReqCallback(imgSrc, dirName, index));
    req.on('error', function(e){});
    req.end();
  }

  urlList.forEach(function(item, index, array) {
    startDownloadTask(item, './', index);
  })
})();

之所以說它勉強可工作,是因為它完全沒有做錯誤處理,程式的健壯性幾乎為0,甚至連列印日誌都沒有了,下載過程中一旦出現任何意外情況,那就自求多福吧。

但即使這樣一個漏洞百出的程式碼,也還是有幾點需要特殊說明。

為什麼要採用閉包?

因為實際上作為http.request的回撥函式callback,它的宣告原型決定的它只可以接受唯一一個引數res,但是在callback函式中我們需要明確知道下載下來的資料在硬碟上存放的路徑,這個路徑取決於startDownloadTask的入參dirNameindex。所以函式getHttpReqCallback就是用於建立一個閉包,將dirNameindex的值寫入這個閉包中。
其實我們原本並不需要getHttpReqCallback這個函式來顯示的返回一個閉包,而是可以直接使用內聯匿名函式的方法實現http.requestcallback,程式碼大概會寫成這樣:

var startDownloadTask = function(imgSrc, dirName, index) {
  var req = http.request(imgSrc, function(res) {
    var fileName = index + "-" + path.basename(imgSrc);
    var fileBuff = [];
    res.on('data', function (chunk) {
      var buffer = new Buffer(chunk);
      fileBuff.push(buffer);
    });
    res.on('end', function() {
      var totalBuff = Buffer.concat(fileBuff);
      fs.appendFile(dirName + "/" + fileName, totalBuff, function(err){});
    });
  });
  req.on('error', function(e){});
  req.end();
}

這樣也可以工作,http.requestcallback直接訪問外層作用域的變數,即函式startDownloadTask的入參dirNameindex,這也是一個閉包。這樣寫的問題在於,一段非同步程式碼強行插入原本連貫的同步程式碼中,也許現在你覺得這也沒什麼,這是因為目前callback裡還沒有處理任何的異常情況,所以邏輯比較簡單,這樣看起來也不算很混亂,但是我需要說的是,一旦後面加入了異常處理的程式碼,這一塊看起來就會非常糟糕了。

為什麼在data事件中要使用一個列表快取接收到的所有資料,然後在end中一次性寫入硬碟?

首先要說的是,這裡並不是出於通過減少寫磁碟次數達到提高效能或者延長磁碟壽命的目的,雖然可能確實有這樣的效果。根本原因在於,如果不採用一次性寫入,在nodejs的非同步非阻塞執行機制下,這樣存入磁碟的資料會混亂,導致不堪入目的後果,比較直觀的情況見附錄。

在同步阻塞執行模型的語言中(java, c, python),確實存在將遠端連線傳輸過來的資料先快取在記憶體裡,待接收完整或或快取了一定長度的資料之後再一次性寫入硬碟的做法,以達到減少寫磁碟操作次數的目的。但是如果在每一次從遠端連線接中讀取到資料之後立即將資料寫入硬碟,也不會有什麼問題(tcp協議已經幫我們將資料包排好序),這是因為在同步阻塞執行模型中,讀tcp連線和寫磁碟這兩個動作必然不可能同時執行,而是讀tcp -> 寫磁碟 -> 讀tcp -> 寫磁碟...這樣的序列執行,在上一個操作完成之後,下一個操作才會開始。這樣的執行方式也許效率會比較低,但是寫入的磁碟的資料並不會混亂。

現在回到我們的非同步非阻塞世界中來,在這個世界中,遠端讀取的操作是通過事件回撥的方式發生的,resdata事件任何一個時間片內都可能觸發,你無法預知,無法控制,甚至觸發頻率都和你無關,那取決於本次連線的頻寬。而我們的寫磁碟操作fs.appendFile和Nodejs的大部分Api一樣是一個非同步非阻塞的呼叫,它會非常快的返回,但是它所執行的寫檔案操作,則會慢的多,而程序不會阻塞在那裡等待這個操作完成。在常識裡,遠端連線的下載速度比本地硬碟的寫入速度要慢,但這並不是絕對的,隨著網速的提高,現在一塊高速網絡卡,在高速的網路中帶來的下載速度超過一塊老舊的機械硬碟的寫入速度並非不可能發生。除此之外,即使在較長的一段時間內,網路的平均連線速度並沒有快的那麼誇張,但是我們知道在tcp/ip協議棧中,鏈路層下層的網路層中前後兩個ip報文的到達時間間隔也是完全無法確定的,有可能它們會在很短的時間間隔內到達,被tcp協議重組之後上拋給應用層協議,在我們的執行環境中以很短的間隔兩次觸發data事件,而這個間隔並不足夠磁碟將前一段資料寫入。

我畫個草圖來解釋到底發生什麼事情:

|data1
|       |data2
|-----------------------------|  //<- write data1
|       |-----------------------------|  //<- write data2
|       |
|----------------------------------------------------------> time

此時要想寫入的資料保持有序不混亂,只能寄希望於機械硬碟的一面只有一個磁頭來從物理層面保證原子操作了。但是很可惜我們知道現代機械硬碟每一面至少都有兩個磁頭。

有著很多java或者c++程式設計經驗的你也許會想在這裡加一個同步鎖,不過Nodejs作為一個表面宣稱的單執行緒環境(底層的V8引擎肯定還是有多執行緒甚至多程序排程機制實現的),在語法和Api層面並沒有鎖這個概念。

所以為了保證最終寫入磁碟的資料不混亂,在data事件的回撥中不可以再用非同步的方式處理資料了,於是有了現在這種先寫入快取列表中,在資料接收完整後再一次性寫檔案的做法。由於new Buffer(chunk)fileBuff.push(buffer)都是同步操作,並且執行的速度非常快;即使下一個data事件到來的比這兩個操作還要快,由於單執行緒執行模型的限制,也必須等待這兩個操作完成後才會開始第二次回撥。所以能保證資料有序的快取到記憶體中,再有序的寫入硬碟。

0x5 異常處理

剛才說到,目前為止我們的指令碼雖然能夠正常工作,但是沒有異常處理,程式非常脆弱。由於異常處理是一個程式非常重要的部分,所以在這裡我有義務要完成這部分程式碼。

首先我們從最簡單的做起,列印一些日誌來幫助除錯程式。

(function() {
  //略。。
  function getHttpReqCallback(imgSrc, dirName, index) {
    var callback = function(res) {
      console.log("request: " + imgSrc + " return status: " + res.statusCode);
      //略。。
      res.on('end', function() {
        console.log("end downloading " + imgSrc);
        //略。。
      });
    };
    return callback;
  }

  var startDownloadTask = function(imgSrc, dirName, index) {
    console.log("start downloading " + imgSrc);
    //略。。。
  }
})

接下來我們在reqerror事件中,進行重新下載嘗試的操作:

  var startDownloadTask = function(imgSrc, dirName, index) {
    //略。。
    req.on('error', function(e){
      console.log("request " + imgSrc + " error, try again");
      startDownloadTask(imgSrc, dirName, index);
    });
  }

這樣一旦在請求階段出現異常,會自動重新發起請求。你也可以在這裡自行新增重試次數上限。

下面的程式碼給請求設定了一個一分鐘的超時時間:

  var startDownloadTask = function(imgSrc, dirName, index) {
    //略。。
    req.setTimeout(60 * 1000, function() {
      console.log("reqeust " + imgSrc " timeout, abort this reqeust");
      req.abort();
    })
  }

一旦在一分鐘之內下載還沒有完成,則會強制終止此次請求,這會立即觸發resend事件。

req的異常處理大致就是這些,接下來是對res的異常處理。

我們首先需要獲取包體的總長度,該值在響應頭的content-length欄位中:

function getHttpReqCallback(imgSrc, dirName, index) {
  var callback = function(res) {
    var contentLength = parseInt(res.headers['content-length']);
    //略。。
  }
}

end事件的回撥中,用接收到的資料總長度和響應頭中的包體長度進行比較,驗證響應資訊是否接收完全:

res.on('end', function() {
  console.log("end downloading " + imgSrc);
  if (isNaN(contentLength)) {
    console.log(imgSrc + " content length error");
    return;
  }
  var totalBuff = Buffer.concat(fileBuff);
  console.log("totalBuff.length = " + totalBuff.length + " " + "contentLength = " + contentLength);
  if (totalBuff.length < contentLength) {
    console.log(imgSrc + " download error, try again");
    startDownloadTask(imgSrc, dirName, index);
    return;
  }
  fs.appendFile(dirName + "/" + fileName, totalBuff, function(err){});
}

0x6 結束

本人在Nodejs方面也是完全的新手,沒有太深入的研究Nodejs內部的執行機制,只是網上讀過幾篇文章,用Nodejs寫過一些簡短的指令碼,在這個過程中掉過一些坑,本文就是一次印象深刻的爬坑過程的整理和總結。總的來說,Nodejs是一個非常強大且有趣的工具,但是由於其獨特的執行模型,以及javascript自身也有不少的歷史遺留問題需要解決,所以對於長期以來習慣了java, c/c++, python一類思維方式的猿們剛剛接觸它的時候產生不少疑惑,希望本文能幫助大家理解Nodejs中的一些不同於其他語言的和執行環境的地方。