1. 程式人生 > >筆記-scrapy-請求-下載-結果處理流程

筆記-scrapy-請求-下載-結果處理流程

筆記-scrapy-請求-下載-結果處理流程

 

在使用時發現對scrpy的下載過程中的處理邏輯還是不太明晰,-寫個文件溫習一下。

1.      請求-下載-結果處理流程

從哪開始呢?

engine.py

   @defer.inlineCallbacks

    def open_spider(self, spider, start_requests=(), close_if_idle=True):

        assert self.has_capacity(), "No free spider slot when opening %r" % \

            spider.name

        logger.info("Spider opened", extra={'spider': spider})

        nextcall = CallLaterOnce(self._next_request, spider)

        scheduler = self.scheduler_cls.from_crawler(self.crawler)

        start_requests = yield self.scraper.spidermw.process_start_requests(start_requests, spider)

        slot = Slot(start_requests, close_if_idle, nextcall, scheduler)

        self.slot = slot

        self.spider = spider

        yield scheduler.open(spider)

        yield self.scraper.open_spider(spider)

        self.crawler.stats.open_spider(spider)

        yield self.signals.send_catch_log_deferred(signals.spider_opened, spider=spider)

        slot.nextcall.schedule() #

        slot.heartbeat.start(5)

注意最後兩句

nextcall 是自己寫的一個reactor呼叫中間類,在這裡實際是把self._next_request加入了reactor task佇列。

slot.nextcall.schedule() 等於呼叫self._next_request

slot.heartbeat.start(5)宣告每5秒呼叫一次nextcall.schedule

reactor是跑起來了

 

看到self._next_request

控制迴圈

    def _next_request(self, spider):

        slot = self.slot

        if not slot:

            return

 

        if self.paused:

            return

 

        while not self._needs_backout(spider):

            if not self._next_request_from_scheduler(spider):

                break

 

        if slot.start_requests and not self._needs_backout(spider):

            try:

                request = next(slot.start_requests)

            except StopIteration:

                slot.start_requests = None

            except Exception:

                slot.start_requests = None

                logger.error('Error while obtaining start requests',

                             exc_info=True, extra={'spider': spider})

            else:

                self.crawl(request, spider)

 

        if self.spider_is_idle(spider) and slot.close_if_idle:

            self._spider_idle(spider)

_needs_backout是用於判斷爬蟲狀態的函式

 

 

    def _next_request_from_scheduler(self, spider):

        slot = self.slot

        request = slot.scheduler.next_request()

        if not request:

            return

        d = self._download(request, spider)

        d.addBoth(self._handle_downloader_output, request, spider)

        d.addErrback(lambda f: logger.info('Error while handling downloader output',

                                           exc_info=failure_to_exc_info(f),

                                           extra={'spider': spider}))

        d.addBoth(lambda _: slot.remove_request(request))

        d.addErrback(lambda f: logger.info('Error while removing request from slot',

                                           exc_info=failure_to_exc_info(f),

                                           extra={'spider': spider}))

        d.addBoth(lambda _: slot.nextcall.schedule())

        d.addErrback(lambda f: logger.info('Error while scheduling new request',

                                           exc_info=failure_to_exc_info(f),

                                           extra={'spider': spider}))

        return d

 

d 是一個defer物件,然後為它添加了一堆回撥函式,包括後續處理,從請求佇列中刪除,取下一個請求(執行nextcall.schedule(),也就是self._next_request())

Slot是當前請求的狀態儲存類;

 

下面要分兩條線了,一條是下載_download(單列章節),一條是下載返回結果處理。

下載結果處理:

    def _handle_downloader_output(self, response, request, spider):

        assert isinstance(response, (Request, Response, Failure)), response

        # downloader middleware can return requests (for example, redirects)

        if isinstance(response, Request):

            self.crawl(response, spider)

            return

        # response is a Response or Failure

        d = self.scraper.enqueue_scrape(response, request, spider)

        d.addErrback(lambda f: logger.error('Error while enqueuing downloader output',

                                            exc_info=failure_to_exc_info(f),

                                            extra={'spider': spider}))

        return d

 

繼續d = self.scraper.enqueue_scrape(response, request, spider)

    def enqueue_scrape(self, response, request, spider):

        slot = self.slot

        dfd = slot.add_response_request(response, request)

        def finish_scraping(_):

            slot.finish_response(response, request) # slot狀態更新

            self._check_if_closing(spider, slot)

            self._scrape_next(spider, slot) # 下一步

            return _

        dfd.addBoth(finish_scraping)

        dfd.addErrback(

            lambda f: logger.error('Scraper bug processing %(request)s',

                                   {'request': request},

                                   exc_info=failure_to_exc_info(f),

                                   extra={'spider': spider}))

        self._scrape_next(spider, slot) #下一步

        return dfd

照例:slot用於儲存scraper當前任務佇列及狀態

 

繼續:

    def _scrape_next(self, spider, slot):

        while slot.queue:

            response, request, deferred = slot.next_response_request_deferred()

            self._scrape(response, request, spider).chainDeferred(deferred)

 

    def _scrape(self, response, request, spider):

        """Handle the downloaded response or failure through the spider

        callback/errback"""

        assert isinstance(response, (Response, Failure))

 

        dfd = self._scrape2(response, request, spider) # returns spiders processed output

        dfd.addErrback(self.handle_spider_error, request, response, spider)

        dfd.addCallback(self.handle_spider_output, request, response, spider)

        return dfd

 

添加回調函式dfd.addCallback(self.handle_spider_output, request, response, spider)

 

注意:dfd = self._scrape2(response, request, spider) # returns spiders processed output會完成新增spider解析的回撥函式。

    def call_spider(self, result, request, spider):

        result.request = request

        dfd = defer_result(result)

        dfd.addCallbacks(request.callback or spider.parse, request.errback)

        return dfd.addCallback(iterate_spider_output)

關鍵句:dfd.addCallbacks(request.callback or spider.parse, request.errback)

 

繼續看self.handle_spider_output,

    def handle_spider_output(self, result, request, response, spider):

        if not result:

            return defer_succeed(None)

        it = iter_errback(result, self.handle_spider_error, request, response, spider)

        dfd = parallel(it, self.concurrent_items,

            self._process_spidermw_output, request, response, spider)

        return dfd

 

    def _process_spidermw_output(self, output, request, response, spider):

        """Process each Request/Item (given in the output parameter) returned

        from the given spider

        """

        if isinstance(output, Request):

            self.crawler.engine.crawl(request=output, spider=spider)

        elif isinstance(output, (BaseItem, dict)):

            self.slot.itemproc_size += 1

            dfd = self.itemproc.process_item(output, spider)

            dfd.addBoth(self._itemproc_finished, output, response, spider)

            return dfd

        elif output is None:

            pass

        else:

            typename = type(output).__name__

            logger.error('Spider must return Request, BaseItem, dict or None, '

                         'got %(typename)r in %(request)s',

                         {'request': request, 'typename': typename},

                         extra={'spider': spider})

如果返回的是請求,crawl()

如果是item或dict,self.itemproc.process_item(output, spider);也就是pipeline中寫的process_item方法了

 

1.1.    小結

程式碼中對功能函式的拆分,呼叫還是有一點複雜的,但這樣做的好處是程式碼塊之間的耦合性不高,可以非常方便的進行某一函式或功能塊的替換。

 

 

2.      下載器

下載流程解析

從engine.py中開始:

 

    def _download(self, request, spider):

        slot = self.slot

        # slot 狀態更新

        slot.add_request(request)

        def _on_success(response):

            assert isinstance(response, (Response, Request))

            if isinstance(response, Response):

                response.request = request # tie request to response received

                logkws = self.logformatter.crawled(request, response, spider)

                logger.log(*logformatter_adapter(logkws), extra={'spider': spider})

                self.signals.send_catch_log(signal=signals.response_received, \

                    response=response, request=request, spider=spider)

            return response

 

        def _on_complete(_):

            slot.nextcall.schedule()

            return _

 

        dwld = self.downloader.fetch(request, spider)

        dwld.addCallbacks(_on_success)

        dwld.addBoth(_on_complete)

        return dwld

主要是聲明瞭一個deffer並添加了一些回撥函式用於狀態更新,訊號傳遞。

dwld = self.downloader.fetch(request, spider)

 

進入downloader.fetch()

    def fetch(self, request, spider):

        def _deactivate(response):

            self.active.remove(request)

            return response

 

        self.active.add(request)

        dfd = self.middleware.download(self._enqueue_request, request, spider)

        return dfd.addBoth(_deactivate)

active用於儲存當前下載的任務

新增deactive用於在下載完成後刪除active中對應記錄

 

繼續,middleware.download()

    def download(self, download_func, request, spider):

        @defer.inlineCallbacks

        def process_request(request):

            for method in self.methods['process_request']:

                response = yield method(request=request, spider=spider)

                assert response is None or isinstance(response, (Response, Request)), \

                        'Middleware %s.process_request must return None, Response or Request, got %s' % \

                        (six.get_method_self(method).__class__.__name__, response.__class__.__name__)

                if response:

                    defer.returnValue(response)

            defer.returnValue((yield download_func(request=request,spider=spider)))

 

        @defer.inlineCallbacks

        def process_response(response):

            assert response is not None, 'Received None in process_response'

            if isinstance(response, Request):

                defer.returnValue(response)

 

            for method in self.methods['process_response']:

                response = yield method(request=request, response=response,

                                        spider=spider)

                assert isinstance(response, (Response, Request)), \

                    'Middleware %s.process_response must return Response or Request, got %s' % \

                    (six.get_method_self(method).__class__.__name__, type(response))

                if isinstance(response, Request):

                    defer.returnValue(response)

            defer.returnValue(response)

 

        @defer.inlineCallbacks

        def process_exception(_failure):

            exception = _failure.value

            for method in self.methods['process_exception']:

                response = yield method(request=request, exception=exception,

                                        spider=spider)

                assert response is None or isinstance(response, (Response, Request)), \

                    'Middleware %s.process_exception must return None, Response or Request, got %s' % \

                    (six.get_method_self(method).__class__.__name__, type(response))

                if response:

                    defer.returnValue(response)

            defer.returnValue(_failure)

 

        deferred = mustbe_deferred(process_request, request)

        deferred.addErrback(process_exception)

        deferred.addCallback(process_response)

        return deferred

 

這裡完成了下載中介軟體的處理

 然後呼叫defer.returnValue((yield download_func(request=request,spider=spider)))

實際上就是download的self._enqueue_request

 

向下走

    def _enqueue_request(self, request, spider):

        key, slot = self._get_slot(request, spider)

        request.meta['download_slot'] = key

 

        def _deactivate(response):

            slot.active.remove(request)

            return response

 

        slot.active.add(request)

        self.signals.send_catch_log(signal=signals.request_reached_downloader,

                                    request=request,

                                    spider=spider)

        deferred = defer.Deferred().addBoth(_deactivate)

        slot.queue.append((request, deferred))

        self._process_queue(spider, slot)

        return deferred

老規矩,slot儲存任務資訊

呼叫self._process_queue

下載延遲是在這裡進行的。

    def _process_queue(self, spider, slot):

        if slot.latercall and slot.latercall.active():

            return

 

        # Delay queue processing if a download_delay is configured

        now = time()

        delay = slot.download_delay()

        if delay:

            penalty = delay - now + slot.lastseen

            if penalty > 0:

                slot.latercall = reactor.callLater(penalty, self._process_queue, spider, slot)

                return

 

        # Process enqueued requests if there are free slots to transfer for this slot

        while slot.queue and slot.free_transfer_slots() > 0:

            slot.lastseen = now

            request, deferred = slot.queue.popleft()

            dfd = self._download(slot, request, spider)

            dfd.chainDeferred(deferred)

            # prevent burst if inter-request delays were configured

            if delay:

                self._process_queue(spider, slot)

                break

 

下載語句_download()

    def _download(self, slot, request, spider):

        # The order is very important for the following deferreds. Do not change!

 

        # 1. Create the download deferred

        dfd = mustbe_deferred(self.handlers.download_request, request, spider)

 

        # 2. Notify response_downloaded listeners about the recent download

        # before querying queue for next request

        def _downloaded(response):

            self.signals.send_catch_log(signal=signals.response_downloaded,

                                        response=response,

                                        request=request,

                                        spider=spider)

            return response

        dfd.addCallback(_downloaded)

 

        # 3. After response arrives,  remove the request from transferring

        # state to free up the transferring slot so it can be used by the

        # following requests (perhaps those which came from the downloader

        # middleware itself)

        slot.transferring.add(request)

 

        def finish_transferring(_):

            slot.transferring.remove(request)

            self._process_queue(spider, slot)

            return _

 

        return dfd.addBoth(finish_transferring)

在這裡,也維護了一個下載佇列,可根據配置達到延遲下載的要求。真正發起下載請求的是呼叫了self.handlers.download_request:

 

後面的水有點深了,涉及的知識點比較多,以後有機會再寫。

每個下載handler可以理解為requests包,輸入url輸出response就可以了。