1. 程式人生 > >開源JAVA爬蟲crawler4j原始碼分析

開源JAVA爬蟲crawler4j原始碼分析

要做一個大的系統,得要聘請多個程式設計師一起工作,同樣,要爬取內容龐大的網際網路資料,如果只有一隻爬蟲,當然是不夠的。

使用crawler4j只需配置指定數量的執行緒,就會有指定數量的爬蟲執行緒一起抓取指定的網頁內容:

controller.start(BasicCrawler.class, numberOfCrawlers);//numberOfCrawlers:執行緒數

那麼,crawler4j到底是怎樣管理這些執行緒的呢?以下假定指定的是10個執行緒。

進入CrawlController的start方法:

首先,定義兩個List,分別用來儲存所有的爬蟲執行緒和實際的爬蟲邏輯類:

			final List<Thread> threads = new ArrayList<>();
			final List<T> crawlers = new ArrayList<>();

			for (int i = 1; i <= numberOfCrawlers; i++) {
				T crawler = _c.newInstance();
				Thread thread = new Thread(crawler, "Crawler " + i);
				crawler.setThread(thread);
				crawler.init(i, this);
				thread.start();
				crawlers.add(crawler);
				threads.add(thread);
				logger.info("Crawler " + i + " started.");
			}

迴圈10次,建立10個執行緒並啟動它們。這時爬蟲執行緒就已經開始工作了,不斷的迴圈爬取頁面,我們暫且不管具體單個爬蟲是怎麼工作的,本文只關注執行緒管理。

既然執行緒已經啟動並正在工作了,怎麼監控它們呢? 往下:

			Thread monitorThread = new Thread(new Runnable() {

				@Override
				public void run() {/*...此處省略N個字...*/}
			}

			monitorThread.start();

			if (isBlocking) {
				waitUntilFinish();
			}

建立一個監控的執行緒並啟動,然後主執行緒開始睡覺直到完成。監控執行緒monitor不斷的、每隔10秒檢查一次:

								sleep(10);
								boolean someoneIsWorking = false;
								for (int i = 0; i < threads.size(); i++) {
									Thread thread = threads.get(i);
									if (!thread.isAlive()) {
										if (!shuttingDown) {
											logger.info("Thread " + i + " was dead, I'll recreate it.");
											T crawler = _c.newInstance();
											thread = new Thread(crawler, "Crawler " + (i + 1));
											threads.remove(i);
											threads.add(i, thread);
											crawler.setThread(thread);
											crawler.init(i + 1, controller);
											thread.start();
											crawlers.remove(i);
											crawlers.add(i, crawler);
										}
									} else if (crawlers.get(i).isNotWaitingForNewURLs()) {
										someoneIsWorking = true;
									}
								}

迴圈遍歷每一個爬蟲執行緒,首先看是否還活著,如果執行緒已死但是爬取工作卻還沒有結束,則重新建立一個新的執行緒加入列表,並刪除原來已經die的執行緒,這樣可確保一直都有10個執行緒在工作,防止某個工人不小心掉井下掛了然後影響工期 :)

如果活著,接著檢查執行緒是不是正在幹活,即不在等新的URL。什麼意思呢?稍微有點繞,1個爬蟲執行緒開始工作後會先去URL地址庫領取50個URL,然後對個50個URL進行抓取,把抓取到的新的URL填入URL地址庫,完了再去領……10個爬蟲執行緒開始工作後就在URL地址庫視窗排隊領取,如果第X個領完就木有了,後面那10-X個就只能坐在那打牌了,等著前面X個找到了新URL。這時,只要X>0,則說明整個爬取工作還在進行。假設那X個執行緒沒找著新的URL就回來了,那隻能跟著排在後面押寶了。這時monitor執行緒發現,10個傢伙都在打牌押寶,好,說明工作可能完成了,該驗收了。

接下來:

								if (!someoneIsWorking) {
									// Make sure again that none of the threads
									// are
									// alive.
									logger.info("It looks like no thread is working, waiting for 10 seconds to make sure...");
									sleep(10);

									someoneIsWorking = false;
									for (int i = 0; i < threads.size(); i++) {
										Thread thread = threads.get(i);
										if (thread.isAlive() && crawlers.get(i).isNotWaitingForNewURLs()) {
											someoneIsWorking = true;
										}
									}
									if (!someoneIsWorking) {
										if (!shuttingDown) {
											long queueLength = frontier.getQueueLength();
											if (queueLength > 0) {
												continue;
											}
											logger.info("No thread is working and no more URLs are in queue waiting for another 10 seconds to make sure...");
											sleep(10);
											queueLength = frontier.getQueueLength();
											if (queueLength > 0) {
												continue;
											}
										}

為了保險起見,再檢查一下看看10個工人是不是都還活著並且在打牌。

如果是的,再檢查一下URL地址庫還有沒有URL,如果有,則繼續等10秒後再重新查。為什麼要這樣呢?這是為了防止低概率事件發生,比如第1個工人正在打牌,第10個剛好帶著新的URL回來了,也加入打牌隊伍,第1個工人正要去領URL時監工monitor來了,看到都在,以為可以收工了,實際上URL地址庫還有,所以此時monitor再查一下還有沒有剩餘的URL沒爬取。如果沒有了,這時再等10秒看下是否真的沒有了,防止剛剛第10個工人剛帶回新URL來而錯過了。

10秒後發現也沒有,收工:

										logger.info("All of the crawlers are stopped. Finishing the process...");
										// At this step, frontier notifies the
										// threads that were
										// waiting for new URLs and they should
										// stop
										frontier.finish();
										for (T crawler : crawlers) {
											crawler.onBeforeExit();
											crawlersLocalData.add(crawler.getMyLocalData());
										}

										logger.info("Waiting for 10 seconds before final clean up...");
										sleep(10);

										frontier.close();
										docIdServer.close();
										pageFetcher.shutDown();

										finished = true;
										waitingLock.notifyAll();

										return;

這些都是收尾工作了。 不過作者在這少收了一個,就是忘了把Environment close了,這導致程式不結束的話臨時檔案也刪不掉,把Environment env變成field然後在這加上env.close()就好了。

因主執行緒還一直在做夢,waitingLock.notifyAll();是叫醒主執行緒可以結束了,下面是主執行緒睡覺的位置:

	public void waitUntilFinish() {
		while (!finished) {
			synchronized (waitingLock) {
				if (finished) {
					return;
				}
				try {
					waitingLock.wait();
				} catch (InterruptedException e) {
					e.printStackTrace();
				}
			}
		}
	}

這就是一個JAVA爬蟲的執行緒管理機制,crawler4j以一種相對比較簡單但是又比較安全的方法實現了,值得借鑑。