開源JAVA爬蟲crawler4j原始碼分析
要做一個大的系統,得要聘請多個程式設計師一起工作,同樣,要爬取內容龐大的網際網路資料,如果只有一隻爬蟲,當然是不夠的。
使用crawler4j只需配置指定數量的執行緒,就會有指定數量的爬蟲執行緒一起抓取指定的網頁內容:
controller.start(BasicCrawler.class, numberOfCrawlers);//numberOfCrawlers:執行緒數
那麼,crawler4j到底是怎樣管理這些執行緒的呢?以下假定指定的是10個執行緒。
進入CrawlController的start方法:
首先,定義兩個List,分別用來儲存所有的爬蟲執行緒和實際的爬蟲邏輯類:
final List<Thread> threads = new ArrayList<>(); final List<T> crawlers = new ArrayList<>(); for (int i = 1; i <= numberOfCrawlers; i++) { T crawler = _c.newInstance(); Thread thread = new Thread(crawler, "Crawler " + i); crawler.setThread(thread); crawler.init(i, this); thread.start(); crawlers.add(crawler); threads.add(thread); logger.info("Crawler " + i + " started."); }
迴圈10次,建立10個執行緒並啟動它們。這時爬蟲執行緒就已經開始工作了,不斷的迴圈爬取頁面,我們暫且不管具體單個爬蟲是怎麼工作的,本文只關注執行緒管理。
既然執行緒已經啟動並正在工作了,怎麼監控它們呢? 往下:
Thread monitorThread = new Thread(new Runnable() {
@Override
public void run() {/*...此處省略N個字...*/}
}
monitorThread.start();
if (isBlocking) {
waitUntilFinish();
}
建立一個監控的執行緒並啟動,然後主執行緒開始睡覺直到完成。監控執行緒monitor不斷的、每隔10秒檢查一次:
sleep(10); boolean someoneIsWorking = false; for (int i = 0; i < threads.size(); i++) { Thread thread = threads.get(i); if (!thread.isAlive()) { if (!shuttingDown) { logger.info("Thread " + i + " was dead, I'll recreate it."); T crawler = _c.newInstance(); thread = new Thread(crawler, "Crawler " + (i + 1)); threads.remove(i); threads.add(i, thread); crawler.setThread(thread); crawler.init(i + 1, controller); thread.start(); crawlers.remove(i); crawlers.add(i, crawler); } } else if (crawlers.get(i).isNotWaitingForNewURLs()) { someoneIsWorking = true; } }
迴圈遍歷每一個爬蟲執行緒,首先看是否還活著,如果執行緒已死但是爬取工作卻還沒有結束,則重新建立一個新的執行緒加入列表,並刪除原來已經die的執行緒,這樣可確保一直都有10個執行緒在工作,防止某個工人不小心掉井下掛了然後影響工期 :)
如果活著,接著檢查執行緒是不是正在幹活,即不在等新的URL。什麼意思呢?稍微有點繞,1個爬蟲執行緒開始工作後會先去URL地址庫領取50個URL,然後對個50個URL進行抓取,把抓取到的新的URL填入URL地址庫,完了再去領……10個爬蟲執行緒開始工作後就在URL地址庫視窗排隊領取,如果第X個領完就木有了,後面那10-X個就只能坐在那打牌了,等著前面X個找到了新URL。這時,只要X>0,則說明整個爬取工作還在進行。假設那X個執行緒沒找著新的URL就回來了,那隻能跟著排在後面押寶了。這時monitor執行緒發現,10個傢伙都在打牌押寶,好,說明工作可能完成了,該驗收了。
接下來:
if (!someoneIsWorking) {
// Make sure again that none of the threads
// are
// alive.
logger.info("It looks like no thread is working, waiting for 10 seconds to make sure...");
sleep(10);
someoneIsWorking = false;
for (int i = 0; i < threads.size(); i++) {
Thread thread = threads.get(i);
if (thread.isAlive() && crawlers.get(i).isNotWaitingForNewURLs()) {
someoneIsWorking = true;
}
}
if (!someoneIsWorking) {
if (!shuttingDown) {
long queueLength = frontier.getQueueLength();
if (queueLength > 0) {
continue;
}
logger.info("No thread is working and no more URLs are in queue waiting for another 10 seconds to make sure...");
sleep(10);
queueLength = frontier.getQueueLength();
if (queueLength > 0) {
continue;
}
}
為了保險起見,再檢查一下看看10個工人是不是都還活著並且在打牌。
如果是的,再檢查一下URL地址庫還有沒有URL,如果有,則繼續等10秒後再重新查。為什麼要這樣呢?這是為了防止低概率事件發生,比如第1個工人正在打牌,第10個剛好帶著新的URL回來了,也加入打牌隊伍,第1個工人正要去領URL時監工monitor來了,看到都在,以為可以收工了,實際上URL地址庫還有,所以此時monitor再查一下還有沒有剩餘的URL沒爬取。如果沒有了,這時再等10秒看下是否真的沒有了,防止剛剛第10個工人剛帶回新URL來而錯過了。
10秒後發現也沒有,收工:
logger.info("All of the crawlers are stopped. Finishing the process...");
// At this step, frontier notifies the
// threads that were
// waiting for new URLs and they should
// stop
frontier.finish();
for (T crawler : crawlers) {
crawler.onBeforeExit();
crawlersLocalData.add(crawler.getMyLocalData());
}
logger.info("Waiting for 10 seconds before final clean up...");
sleep(10);
frontier.close();
docIdServer.close();
pageFetcher.shutDown();
finished = true;
waitingLock.notifyAll();
return;
這些都是收尾工作了。 不過作者在這少收了一個,就是忘了把Environment close了,這導致程式不結束的話臨時檔案也刪不掉,把Environment env變成field然後在這加上env.close()就好了。
因主執行緒還一直在做夢,waitingLock.notifyAll();是叫醒主執行緒可以結束了,下面是主執行緒睡覺的位置:
public void waitUntilFinish() {
while (!finished) {
synchronized (waitingLock) {
if (finished) {
return;
}
try {
waitingLock.wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
這就是一個JAVA爬蟲的執行緒管理機制,crawler4j以一種相對比較簡單但是又比較安全的方法實現了,值得借鑑。