1. 程式人生 > >webMagic 全面剖析(更新中。。。)

webMagic 全面剖析(更新中。。。)

WebMagic爬蟲主要由Downloader,PageProcessor,Pipelines,scheduler四個主要的部分構成。總排程類為Spider,主要負責請求任務分發,控制多個執行緒同時對多個網頁進行下載,解析,儲存。本文會針對這五個部分進行詳細的程式碼剖析。

1.Downloader

1.1 Downloader介面

public interface Downloader {
    /**
     * Downloads web pages and store in Page object.
     *
     * @param request request
     * @param
task task * @return page */
public Page download(Request request, Task task); /** * Tell the downloader how many threads the spider used. * @param threadNum number of threads */ public void setThread(int threadNum); }

Downloader介面聲明瞭兩個方法,最主要的方法是download,針對引數request,下載相應的頁面,將下載內容等用Page物件封裝,並返回,以供PageProcessor處理。download還有個task引數,這個引數用來提供下載頁面時的一些引數配置。setThread方法並不是必須的,但是WebMagic使用了Httpclient來實現Downloader,HttpClient可以實現一個客戶端控制多個HTTP請求,從而避免多次的建立HttpClient,節省記憶體空間。所以WebMagic宣告setThread方法。具體的實現我會在HttpClientDownloader中說明。

1.2 AbstractDownloader抽象類

繼承Downloader介面,但是新新增如下四個方法

 /**
     * A simple method to download a url.
     *
     * @param url url
     * @return html
     */
    public Html download(String url) {
        return download(url, null);
    }

    /**
     * A simple method to download a url.
     *
     * @param
url url * @param charset charset * @return html */
public Html download(String url, String charset) { Page page = download(new Request(url), Site.me().setCharset(charset).toTask()); return (Html) page.getHtml(); } protected void onSuccess(Request request) { } protected void onError(Request request) { }

這裡不多做介紹,這四種方法對於整個爬蟲的影響不大,尤其是第三,和第四個方法,暫時並沒有搞清楚其作用是什麼。

1.3 HttpClientDownloader類

這個類是實現了Downloader介面的類。它主要是藉助HttpClient第三方包來實現HTTP請求,和對HTTP響應的處理,從而將相應頁面的原始碼下載下來。當然HttpClientDownloader是有侷限的,其短板是不能處理動態頁面。該類中的httpClientGenerator物件負責HTTPClient的生成,httpClients物件儲存不同域的HTTPClient,ProxyProvider物件提供代理IP,httpUriRequestConverter物件負責將Request物件,和Task物件等轉換成HTTPClient可以處理的物件。

public class HttpClientDownloader extends AbstractDownloader {

    private Logger logger = LoggerFactory.getLogger(getClass());

    private final Map<String, CloseableHttpClient> httpClients = new HashMap<String, CloseableHttpClient>();

    private HttpClientGenerator httpClientGenerator = new HttpClientGenerator();

    private HttpUriRequestConverter httpUriRequestConverter = new HttpUriRequestConverter();

    private ProxyProvider proxyProvider;

    private boolean responseHeader = true;



    private CloseableHttpClient getHttpClient(Site site) {
        if (site == null) {
            return httpClientGenerator.getClient(null);
        }
        String domain = site.getDomain();
        CloseableHttpClient httpClient = httpClients.get(domain);
        if (httpClient == null) {
            synchronized (this) {
                httpClient = httpClients.get(domain);
                if (httpClient == null) {
                    httpClient = httpClientGenerator.getClient(site);
                    httpClients.put(domain, httpClient);
                }
            }
        }
        return httpClient;
    }

    @Override
    public Page download(Request request, Task task) {
        if (task == null || task.getSite() == null) {
            throw new NullPointerException("task or site can not be null");
        }
        logger.debug("downloading page {}", request.getUrl());
        CloseableHttpResponse httpResponse = null;
        CloseableHttpClient httpClient = getHttpClient(task.getSite());
        Proxy proxy = proxyProvider != null ? proxyProvider.getProxy(task) : null;
        HttpClientRequestContext requestContext = httpUriRequestConverter.convert(request, task.getSite(), proxy);
        Page page = Page.fail();
        try {
            httpResponse = httpClient.execute(requestContext.getHttpUriRequest(), requestContext.getHttpClientContext());
            page = handleResponse(request, task.getSite().getCharset(), httpResponse, task);
            onSuccess(request);
            logger.debug("downloading page success {}", page);
            return page;
        } catch (IOException e) {
            logger.warn("download page {} error", request.getUrl(), e);
            onError(request);
            return page;
        } finally {
            if (httpResponse != null) {
                //ensure the connection is released back to pool
                EntityUtils.consumeQuietly(httpResponse.getEntity());
            }
            if (proxyProvider != null && proxy != null) {
                proxyProvider.returnProxy(proxy, page, task);
            }
        }
    }
    protected Page handleResponse(Request request, String charset, HttpResponse httpResponse, Task task) throws IOException {
        String content = getResponseContent(charset, httpResponse);
        Page page = new Page();
        page.setRawText(content);
        page.setUrl(new PlainText(request.getUrl()));
        page.setRequest(request);
        page.setStatusCode(httpResponse.getStatusLine().getStatusCode());
        page.setDownloadSuccess(true);
        if (responseHeader) {
            page.setHeaders(HttpClientUtils.convertHeaders(httpResponse.getAllHeaders()));
        }
        return page;
    }

    private String getResponseContent(String charset, HttpResponse httpResponse) throws IOException {
        if (charset == null) {
            byte[] contentBytes = IOUtils.toByteArray(httpResponse.getEntity().getContent());
            String htmlCharset = getHtmlCharset(httpResponse, contentBytes);
            if (htmlCharset != null) {
                return new String(contentBytes, htmlCharset);
            } else {
                logger.warn("Charset autodetect failed, use {} as charset. Please specify charset in Site.setCharset()", Charset.defaultCharset());
                return new String(contentBytes);
            }
        } else {
            return IOUtils.toString(httpResponse.getEntity().getContent(), charset);
        }
    }

    private String getHtmlCharset(HttpResponse httpResponse, byte[] contentBytes) throws IOException {
        return CharsetUtils.detectCharset(httpResponse.getEntity().getContentType().getValue(), contentBytes);
    }
}

1.3.1 download(Request request, Task task)方法

首先該方法要求,Task物件不為空,並且task物件中的Site物件不為空,其目的是為了獲得域名和其他相關資訊。然後定義CloseableHttpResponse物件 httpResponse,用來封裝HTTP響應報文。呼叫HttpClientDownloader類中的getHttpClient方法,得到響相應域的CloseableHttpClient物件 httpClient。getHttpClient主要是從httpClients中查詢是否有對應域的CloseableHttpClient物件,如果有,返回。如果沒有,呼叫HttpClientGenerator中的getClient,儲存在HttpClients中,然後再返回。使用ProxyProvider來為HTTP請求提供代理IP,當然這裡允許代理IP為空。
然後呼叫httpUriRequestConverter物件的convert方法,將request,task,proxy轉換成HttpClientRequestContext物件 requestContext。這裡面需要注意requestContext物件包含了HttpClient第三方包中的HttpUriRequest的一個物件和HttpclientContext的一個物件,convert方法中呼叫兩個方法convertHttpClientContext和convertHttpUriRequest,前者用於新增代理伺服器(提供代理IP)的認證方式,以及新增Cookies,後者用於實現一個Http請求。通過呼叫httpClient的execute方法,實現HTTP請求,並新增代理和請求頭資訊,返回HTTP響應httpResponse。然後呼叫handleResponse方法,將httpResponse轉變成Page物件。

1.4 HttpClientGenerator類


public class HttpClientGenerator {

    private transient Logger logger = LoggerFactory.getLogger(getClass());

    private PoolingHttpClientConnectionManager connectionManager;

    public HttpClientGenerator() {
        Registry<ConnectionSocketFactory> reg = RegistryBuilder.<ConnectionSocketFactory>create()
                .register("http", PlainConnectionSocketFactory.INSTANCE)
                .register("https", buildSSLConnectionSocketFactory())
                .build();
        connectionManager = new PoolingHttpClientConnectionManager(reg);
        connectionManager.setDefaultMaxPerRoute(100);
    }

    private SSLConnectionSocketFactory buildSSLConnectionSocketFactory() {
        try {
            return new SSLConnectionSocketFactory(createIgnoreVerifySSL()); // 優先繞過安全證書
        } catch (KeyManagementException e) {
            logger.error("ssl connection fail", e);
        } catch (NoSuchAlgorithmException e) {
            logger.error("ssl connection fail", e);
        }
        return SSLConnectionSocketFactory.getSocketFactory();
    }

    private SSLContext createIgnoreVerifySSL() throws NoSuchAlgorithmException, KeyManagementException {
        // 實現一個X509TrustManager介面,用於繞過驗證,不用修改裡面的方法
        X509TrustManager trustManager = new X509TrustManager() {

            @Override
            public void checkClientTrusted(X509Certificate[] chain, String authType) throws CertificateException {
            }

            @Override
            public void checkServerTrusted(X509Certificate[] chain, String authType) throws CertificateException {
            }

            @Override
            public X509Certificate[] getAcceptedIssuers() {
                return null;
            }

        };

        SSLContext sc = SSLContext.getInstance("SSLv3");
        sc.init(null, new TrustManager[] { trustManager }, null);
        return sc;
    }

    public HttpClientGenerator setPoolSize(int poolSize) {
        connectionManager.setMaxTotal(poolSize);
        return this;
    }

    public CloseableHttpClient getClient(Site site) {
        return generateClient(site);
    }

    private CloseableHttpClient generateClient(Site site) {
        HttpClientBuilder httpClientBuilder = HttpClients.custom();

        httpClientBuilder.setConnectionManager(connectionManager);
        if (site.getUserAgent() != null) {
            httpClientBuilder.setUserAgent(site.getUserAgent());
        } else {
            httpClientBuilder.setUserAgent("");
        }
        if (site.isUseGzip()) {
            httpClientBuilder.addInterceptorFirst(new HttpRequestInterceptor() {

                public void process(
                        final HttpRequest request,
                        final HttpContext context) throws HttpException, IOException {
                    if (!request.containsHeader("Accept-Encoding")) {
                        request.addHeader("Accept-Encoding", "gzip");
                    }
                }
            });
        }

        //解決post/redirect/post 302跳轉問題
        httpClientBuilder.setRedirectStrategy(new CustomRedirectStrategy());

        SocketConfig.Builder socketConfigBuilder = SocketConfig.custom();
        socketConfigBuilder.setSoKeepAlive(true).setTcpNoDelay(true);
        socketConfigBuilder.setSoTimeout(site.getTimeOut());
        SocketConfig socketConfig = socketConfigBuilder.build();
        httpClientBuilder.setDefaultSocketConfig(socketConfig);
        connectionManager.setDefaultSocketConfig(socketConfig);
        httpClientBuilder.setRetryHandler(new DefaultHttpRequestRetryHandler(site.getRetryTimes(), true));
        generateCookie(httpClientBuilder, site);
        return httpClientBuilder.build();
    }

    private void generateCookie(HttpClientBuilder httpClientBuilder, Site site) {
        CookieStore cookieStore = new BasicCookieStore();
        for (Map.Entry<String, String> cookieEntry : site.getCookies().entrySet()) {
            BasicClientCookie cookie = new BasicClientCookie(cookieEntry.getKey(), cookieEntry.getValue());
            cookie.setDomain(site.getDomain());
            cookieStore.addCookie(cookie);
        }
        for (Map.Entry<String, Map<String, String>> domainEntry : site.getAllCookies().entrySet()) {
            for (Map.Entry<String, String> cookieEntry : domainEntry.getValue().entrySet()) {
                BasicClientCookie cookie = new BasicClientCookie(cookieEntry.getKey(), cookieEntry.getValue());
                cookie.setDomain(domainEntry.getKey());
                cookieStore.addCookie(cookie);
            }
        }
        httpClientBuilder.setDefaultCookieStore(cookieStore);
    }
}

HttpClientGenerator 這個類主要負責生成HttpClient。 WebMagic選擇使用PoolingHttpClientConnectionManager,這樣可以使用一個Client來生成多個HTTP連線。同時註冊:如果是http連線,使用PlainConnectionSocketFactory.INSTANCE生成套接字連線,如果是https連線,繞過安全證書驗證(雖然不安全,但是對於爬蟲來說比較方便)。

        SocketConfig.Builder socketConfigBuilder = SocketConfig.custom();
        socketConfigBuilder.setSoKeepAlive(true).setTcpNoDelay(true);
        socketConfigBuilder.setSoTimeout(site.getTimeOut());
        SocketConfig socketConfig = socketConfigBuilder.build();
        httpClientBuilder.setDefaultSocketConfig(socketConfig);
        connectionManager.setDefaultSocketConfig(socketConfig);

這部分主要是完成套接字連線的一些選項設定。setSoKeepAlive:如果啟用So_keepAlive,客戶端會偶爾通過空閒連線傳送一個數據包,以確保伺服器未崩潰。
如果伺服器沒能相應此包。客戶端會持續嘗試11分鐘,知道接受到響應為止。如果在12分鐘內未收到響應,客戶端關閉socket。沒有So_keepalive,不活動的客戶端可能永久存在下去。而不會注意到伺服器已經崩潰。setTcpNoDelay,設定TCP_NODELAY為true,可以確保會盡可能快地傳送,而無論包的大小正常情況下,小的包在傳送之前會組合為大一點的包。在傳送另一個包之前。本地主機要等待遠端系統對前一個包的迴應。如果遠端系統沒有儘可能的將回應發回本地系統,那麼依賴於小資料量資訊穩定傳輸的應用程式就會變得很慢。SO_timeout 正常情況下,當嘗試讀取socket,會阻塞儘可能長的時間,以得到足夠的位元組。設定SO_TIMEOUT可以確保此次呼叫阻塞的時間不會大於某個固定的毫秒數。

2.PageProcessor

3.Pipelines

4.scheduler

5.Spider類

5.1 run()方法

第一步:checkRunningStat() . 目的之一:防止多次呼叫run()方法。因為Spider是主控制類,用來完成整個爬蟲的排程任務。如果多次呼叫run,將會導致整個爬蟲的排程工作混亂。

第二步:initComponent().主要包括新增downloader元件,新增持久化資料元件,新增執行緒元件,新增種子url(使用startRequests(型別為ListRequest)域來新增,在新增之前需要檢查Spider是否已經開始工作,也就是執行checkIfRunning()),設定開始時間。

第三步:從scheduler中抽出一個request(其實就是將需要爬取的Url包裝成的請求),如果scheduler中沒有request物件,需要判斷執行緒池中的執行緒是否都處於啟用狀態,webmagic重新構造CountableThreadPool類,呼叫CountableThreadPoo類中的getThreadAlive()方法 來獲得當前活躍的執行緒數。如果webmagic沒有啟用狀態的執行緒時,表示爬取任務結束。

    public void stop() {
        if (stat.compareAndSet(STAT_RUNNING, STAT_STOPPED)) {
            logger.info("Spider " + getUUID() + " stop success!");
        } else {
            logger.info("Spider " + getUUID() + " stop fail!");
        }
    }

同時webmagic使用exitWhenComplete(預設值為True)使得整個爬蟲任務在完成之後自動終止(所謂的終止是要跳出while迴圈,任務完成並不一定跳出while迴圈)。否則當爬蟲任務結束時,必須呼叫spider中的stop方法,手動終止任務。使用手動終止任務,可以通過設定stat來實現在爬蟲執行過程中中止爬蟲的功效。
如果scheduler中有Request,呼叫Spider類中的processRequest方法,這個方法有三個作用,首先是呼叫downloader物件中的download方法,下載相應的頁面,如果下載成功,呼叫onDownloaderSuccess方法,進行頁面的內容解析,以及資料的可持久化。具體過程如下:首先呼叫Spider類中的onSuccess方法,然後判斷由download方法返回的Page物件中的相應狀態碼是否合法,相應的方法呼叫為site.getAcceptStatCode().contains(page.getStatusCode()),site中首先會預設設定200狀態碼為唯一的合法狀態碼。如果Page物件中的狀態碼合法,那麼呼叫PageProcessor中的Process(page)方法,對之前下載的頁面進行解析,解析過的頁面內容仍然儲存在page物件中。然後呼叫Spider類中的extractAndAddRequests(Page page,boolean spawnUrl)方法,作用就是將之前下載的頁面中的有效URL新增到Scheduler中,這裡spawnUrl用來判斷是否將這些有效的URL新增到Scheduler中,因為有時候我們只需要解析種子URl中的內容,不需要進一步的進行解析。而獲取這些有效的URL的方法是page.getTargetRequests。在使用Webmagic爬取資料時,作者要求我們要實現PageProcessor介面,在實現這個介面的process方法時,我們需要做的一件事就是將當前解析的網頁中的某些URL抽取出來新增到Page物件中,這便是有效URL的來源。需要做的另一件事就是抽取我們想要的頁面內容,通過Page物件中的putField(String key,Object field)方法將希望抽取的內容放入Page物件的ResultItems物件中,這裡的resultItems本質上就是一個鍵值對對映集。然後判斷這個頁面的內容是否需要儲存,因為有可能當前處理的頁面中並沒有我們感興趣的內容,這樣,我們可以通過page.getResultItems().isSkip()來判斷是否需要儲存,webmagic中的這個isSkip是封裝在ResultItems類中的,但是個人覺得封裝在Page類中更加的合理。如果這個頁面需要儲存,那麼就呼叫pipeline的process方法,在Spider中是有一組的pipeline,也就是說我們可以一次進行多種方式的資料儲存,預設的是在控制檯輸出結果,也可以輸出到資料庫,檔案等等。同時webmagic設定了SleepTime,用來確定在處理完一個頁面(下載,解析,儲存)之後,需要爬蟲暫停一段時間,然後再進行下一個URl的爬取,防止爬取的頻率過高。如果下載相應的頁面失敗,就不會再進行頁面的解析,而是呼叫spider中的onDownloaderFail方法,該方法主要是用來判斷是否需要重新的將當前的URl加入到Scheduler中,從而再次對該URL進行爬取。(這個和HttpClient中的重複請求略有不同),具體的過程如下:首先判斷Site物件中的CycleRetryTiems是否為零,如果為零,表示不允許重新的將URL加入到Scheduler中,如果不為零,表示可以對該URL重新放入Scheduler中最大次數。這裡便可以看出將URL封裝成Request的好處,就是可以追蹤該URL到底重試了多少次,使用Request.CYCLE_TRIED_TIMES便可以記錄。在這裡webmagic對Request進行了深度複製之後,將複製之後的request放入到Scheduler中,個人感覺其目的是為不破壞原有的Request,以防在後面的程式中再次用到原來的Request。

第四步,在處理完相應的頁面之後,pageCount自增,用來計數。同時呼叫signalNewUrl()方法,其目的是:可能因為總排程程式因為Scheduler中沒有Request,從而處於阻塞狀態,在處理完一個頁面之後,Scheduler中將會新增一些Request,從而令總排程程式解除阻塞狀態。

注:呼叫Spider類中的onSuccess,Spider類中的onSuccess或者是onError的作用是呼叫SpiderListener類中的onSuccess(Request)方法或者是onError方法,但是具體呼叫有什麼作用目前並不知道,作者也沒有做出相關的例子。同時在downloader 中的download方法也呼叫了抽象類AbstractDownloader中onSuccess和onError方法。但是同樣作者在這裡只是做了一個介面,並沒有具體的實現。

Spider類中的其他重要方法已經在1.1中敘述,不再做詳細分析。