Java多執行緒及分散式爬蟲架構原理解析
這是 Java 爬蟲系列博文的第五篇,在上一篇Java 爬蟲伺服器被遮蔽的解決方案中,我們簡單的聊反爬蟲策略和反反爬蟲方法,主要針對的是 IP 被封及其對應辦法。前面幾篇文章我們把爬蟲相關的基本知識都講的差不多啦。這一篇我們來聊一聊爬蟲架構相關的內容。
前面幾章內容我們的爬蟲程式都是單執行緒,在我們除錯爬蟲程式的時候,單執行緒爬蟲沒什麼問題,但是當我們在線上環境使用單執行緒爬蟲程式去採集網頁時,單執行緒就暴露出了兩個致命的問題:
- 採集效率特別慢,單執行緒之間都是序列的,下一個執行動作需要等上一個執行完才能執行
- 對伺服器的CUP等利用率不高,想想我們的伺服器都是 8核16G,32G 的只跑一個執行緒會不會太浪費啦
線上環境不可能像我們本地測試一樣,不在乎採集效率,只要能正確提取結果就行。在這個時間就是金錢的年代,不可能給你時間去慢慢的採集,所以單執行緒爬蟲程式是行不通的,我們需要將單執行緒改成多執行緒的模式,來提升採集效率和提高計算機利用率。
多執行緒的爬蟲程式設計比單執行緒就要複雜很多,但是與其他業務在高併發下要保證資料安全又不同,多執行緒爬蟲在資料安全上到要求不是那麼的高,因為每個頁面都可以被看作是一個獨立體。要做好多執行緒爬蟲就必須做好兩點:第一點就是統一的待採集 URL 維護,第二點就是 URL 的去重, 下面我們簡單的來聊一聊這兩點。
維護待採集的 URL
多執行緒爬蟲程式就不能像單執行緒那樣,每個執行緒獨自維護這自己的待採集 URL,如果這樣的話,那麼每個執行緒採集的網頁將是一樣的,你這就不是多執行緒採集啦,你這是將一個頁面採集的多次。基於這個原因我們就需要將待採集的 URL 統一維護,每個執行緒從統一 URL 維護處領取採集 URL ,完成採集任務,如果在頁面上發現新的 URL 連結則新增到 統一 URL 維護的容器中。下面是幾種適合用作統一 URL 維護的容器:
- JDK 的安全佇列,例如 LinkedBlockingQueue
- 高效能的 NoSQL,比如 Redis、Mongodb
- MQ 訊息中介軟體
URL 的去重
URL 的去重也是多執行緒採集的關鍵一步,因為如果不去重的話,那麼我們將採集到大量重複的 URL,這樣並沒有提升我們的採集效率,比如一個分頁的新聞列表,我們在採集第一頁的時候可以得到 2、3、4、5 頁的連結,在採集第二頁的時候又會得到 1、3、4、5 頁的連結,待採集的 URL 佇列中將存在大量的列表頁連結,這樣就會重複採集甚至進入到一個死迴圈當中,所以就需要 URL 去重。URL 去重的方法就非常多啦,下面是幾種常用的 URL 去重方式:
- 將 URL 儲存到資料庫進行去重,比如 redis、MongoDB
- 將 URL 放到雜湊表中去重,例如 hashset
- 將 URL 經過 MD5 之後儲存到雜湊表中去重,相比於上面一種,能夠節約空間
- 使用 布隆過濾器(Bloom Filter)去重,這種方式能夠節約大量的空間,就是不那麼準確。
關於多執行緒爬蟲的兩個核心知識點我們都知道啦,下面我畫了一個簡單的多執行緒爬蟲架構圖,如下圖所示:
上面我們主要了解了多執行緒爬蟲的架構設計,接下來我們不妨來試試 Java 多執行緒爬蟲,我們以採集虎撲新聞為例來實戰一下 Java 多執行緒爬蟲,Java 多執行緒爬蟲中設計到了 待採集 URL 的維護和 URL 去重,由於我們這裡只是演示,所以我們就使用 JDK 內建的容器來完成,我們使用 LinkedBlockingQueue 作為待採集 URL 維護容器,HashSet 作為 URL 去重容器。下面是 Java 多執行緒爬蟲核心程式碼,詳細程式碼以上傳 GitHub,地址在文末:
/** * 多執行緒爬蟲 */ public class ThreadCrawler implements Runnable { // 採集的文章數 private final AtomicLong pageCount = new AtomicLong(0); // 列表頁連結正則表示式 public static final String URL_LIST = "https://voice.hupu.com/nba"; protected Logger logger = LoggerFactory.getLogger(getClass()); // 待採集的佇列 LinkedBlockingQueue<String> taskQueue; // 採集過的連結列表 HashSet<String> visited; // 執行緒池 CountableThreadPool threadPool; /** * * @param url 起始頁 * @param threadNum 執行緒數 * @throws InterruptedException */ public ThreadCrawler(String url,int threadNum) throws InterruptedException { this.taskQueue = new LinkedBlockingQueue<>(); this.threadPool = new CountableThreadPool(threadNum); this.visited = new HashSet<>(); // 將起始頁新增到待採集佇列中 this.taskQueue.put(url); } @Override public void run() { logger.info("Spider started!"); while (!Thread.currentThread().isInterrupted()) { // 從佇列中獲取待採集 URL final String request = taskQueue.poll(); // 如果獲取 request 為空,並且當前的執行緒採已經沒有執行緒在執行 if (request == null) { if (threadPool.getThreadAlive() == 0) { break; } } else { // 執行採集任務 threadPool.execute(new Runnable() { @Override public void run() { try { processRequest(request); } catch (Exception e) { logger.error("process request " + request + " error",e); } finally { // 採集頁面 +1 pageCount.incrementAndGet(); } } }); } } threadPool.shutdown(); logger.info("Spider closed! {} pages downloaded.",pageCount.get()); } /** * 處理採集請求 * @param url */ protected void processRequest(String url) { // 判斷是否為列表頁 if (url.matches(URL_LIST)) { // 列表頁解析出詳情頁連結新增到待採集URL佇列中 processTaskQueue(url); } else { // 解析網頁 processPage(url); } } /** * 處理連結採集 * 處理列表頁,將 url 新增到佇列中 * * @param url */ protected void processTaskQueue(String url) { try { Document doc = Jsoup.connect(url).get(); // 詳情頁連結 Elements elements = doc.select(" div.news-list > ul > li > div.list-hd > h4 > a"); elements.stream().forEach((element -> { String request = element.attr("href"); // 判斷該連結是否存在佇列或者已採集的 set 中,不存在則新增到佇列中 if (!visited.contains(request) && !taskQueue.contains(request)) { try { taskQueue.put(request); } catch (InterruptedException e) { e.printStackTrace(); } } })); // 列表頁連結 Elements list_urls = doc.select("div.voice-paging > a"); list_urls.stream().forEach((element -> { String request = element.absUrl("href"); // 判斷是否符合要提取的列表連結要求 if (request.matches(URL_LIST)) { // 判斷該連結是否存在佇列或者已採集的 set 中,不存在則新增到佇列中 if (!visited.contains(request) && !taskQueue.contains(request)) { try { taskQueue.put(request); } catch (InterruptedException e) { e.printStackTrace(); } } } })); } catch (Exception e) { e.printStackTrace(); } } /** * 解析頁面 * * @param url */ protected void processPage(String url) { try { Document doc = Jsoup.connect(url).get(); String title = doc.select("body > div.hp-wrap > div.voice-main > div.artical-title > h1").first().ownText(); System.out.println(Thread.currentThread().getName() + " 在 " + new Date() + " 採集了虎撲新聞 " + title); // 將採集完的 url 存入到已經採集的 set 中 visited.add(url); } catch (IOException e) { e.printStackTrace(); } } public static void main(String[] args) { try { new ThreadCrawler("https://voice.hupu.com/nba",5).run(); } catch (InterruptedException e) { e.printStackTrace(); } } }
我們用 5 個執行緒去採集虎撲新聞列表頁看看效果如果?執行該程式,得到如下結果:
多執行緒採集結果
結果中可以看出,我們啟動了 5 個執行緒採集了 61 頁頁面,一共耗時 2 秒鐘,可以說效果還是不錯的,我們來跟單執行緒對比一下,看看差距有多大?我們將執行緒數設定為 1 ,再次啟動程式,得到如下結果:
單執行緒執行結果
可以看出單執行緒採集虎撲 61 條新聞花費了 7 秒鐘,耗時差不多是多執行緒的 4 倍,你想想這可只是 61 個頁面,頁面更多的話,差距會越來越大,所以多執行緒爬蟲效率還是非常高的。
分散式爬蟲架構
分散式爬蟲架構是一個大型採集程式才需要使用的架構,一般情況下使用單機多執行緒就可以解決業務需求,反正我是沒有分散式爬蟲專案的經驗,所以這一塊我也沒什麼可以講的,但是我們作為技術人員,我們需要對技術儲存熱度,雖然不用,但是瞭解瞭解也無妨,我查閱了不少資料得出瞭如下結論:
分散式爬蟲架構跟我們多執行緒爬蟲架構在思路上來說是一樣的,我們只需要在多執行緒的基礎上稍加改進就可以變成一個簡單的分散式爬蟲架構。因為分散式爬蟲架構中爬蟲程式部署在不同的機器上,所以我們待採集的 URL 和 採集過的 URL 就不能存放在爬蟲程式機器的記憶體中啦,我們需要將它統一在某臺機器上維護啦,比如存放在 Redis 或者 MongoDB 中,每臺機器都從這上面獲取採集連結,而不是從 LinkedBlockingQueue 這樣的記憶體佇列中取連結啦,這樣一個簡單的分散式爬蟲架構就出現了,當然這裡面還會有很多細節問題,因為我沒有分散式架構的經驗,我也無從說起,如果你有興趣的話,歡迎交流。
原始碼:原始碼
以上就是本文的全部內容,希望對大家的學習有所幫助,也希望大家多多支援我們。