調度 engine._next_request_from_scheduler() 取出request交給handler,結果是request,執行engine.craw(),結果是resp/fail,下一步看scraper.enqueue_scrape()
0.def _next_request_from_scheduler(self, spider):
C:\Program Files\Anaconda2\Lib\site-packages\scrapy\core\engine.py
def _next_request_from_scheduler(self, spider): slot = self.slot request = slot.scheduler.next_request() #首先從優先級隊列取出一個 request if not request:return d = self._download(request, spider) #request交給handler下載 d.addBoth(self._handle_downloader_output, request, spider) #拿到下載結果執行回調 d.addErrback(lambda f: logger.info(‘Error while handling downloader output‘, exc_info=failure_to_exc_info(f), extra={‘spider‘: spider})) d.addBoth(lambda _: slot.remove_request(request)) d.addErrback(lambda f: logger.info(‘Error while removing request from slot‘, exc_info=failure_to_exc_info(f), extra={‘spider‘: spider})) d.addBoth(lambda _: slot.nextcall.schedule()) d.addErrback(lambda f: logger.info(‘Error while scheduling new request‘, exc_info=failure_to_exc_info(f), extra={‘spider‘: spider})) return d
1. request = slot.scheduler.next_request() #scheduler從優先級隊列[當前優先級]取出一個request返回給engine
C:\Program Files\Anaconda2\Lib\site-packages\scrapy\core\scheduler.py
class Scheduler(object): def next_request(self): request = self.mqs.pop() #這裏的mqs指的是管理多個mqclss的pqclass,其中self.queues = {0:mqclss(),},如果使用dqclass,scheduler會先從指定文件夾加載保存的request,使用 PickleLifoDiskQueue if request: self.stats.inc_value(‘scheduler/dequeued/memory‘, spider=self.spider) else: request = self._dqpop() if request: self.stats.inc_value(‘scheduler/dequeued/disk‘, spider=self.spider) if request: self.stats.inc_value(‘scheduler/dequeued‘, spider=self.spider) return request
C:\Program Files\Anaconda2\Lib\site-packages\queuelib\pqueue.py ########################後面補充入隊相關操作
class PriorityQueue(object): def push(self, obj, priority=0): #入隊默認優先級為0 if priority not in self.queues: self.queues[priority] = self.qfactory(priority) q = self.queues[priority] q.push(obj) # this may fail (eg. serialization error) if self.curprio is None or priority < self.curprio: #入隊的時候發現更高優先級,更新當前優先級,保證優先處理重定向 self.curprio = priority def pop(self): if self.curprio is None: return q = self.queues[self.curprio] m = q.pop() #取出當前pri的一個request if len(q) == 0: #如果取完之後當前pri隊列為空,則更新當前pri del self.queues[self.curprio] q.close() prios = [p for p, q in self.queues.items() if len(q) > 0] self.curprio = min(prios) if prios else None #更新當前pri為最小值 return m
2. d = self._download(request, spider) #request交給handler下載
C:\Program Files\Anaconda2\Lib\site-packages\scrapy\core\engine.py
def _download(self, request, spider): slot = self.slot slot.add_request(request) #slot:self.inprogress = set() self.inprogress.add(request) def _on_success(response): assert isinstance(response, (Response, Request)) if isinstance(response, Response): response.request = request # tie request to response received logkws = self.logformatter.crawled(request, response, spider) logger.log(*logformatter_adapter(logkws), extra={‘spider‘: spider}) self.signals.send_catch_log(signal=signals.response_received, response=response, request=request, spider=spider) return response def _on_complete(_): slot.nextcall.schedule() return _ dwld = self.downloader.fetch(request, spider) #DOWNLOADER = ‘scrapy.core.downloader.Downloader‘ dwld.addCallbacks(_on_success) ############################################### dwld.addBoth(_on_complete) ############################################# return dwld
###dwld = self.downloader.fetch(request, spider)
C:\Program Files\Anaconda2\Lib\site-packages\scrapy\core\downloader\__init__.py
from .middleware import DownloaderMiddlewareManager from .handlers import DownloadHandlers class Downloader(object): def __init__(self, crawler): self.handlers = DownloadHandlers(crawler) self.middleware = DownloaderMiddlewareManager.from_crawler(crawler) def fetch(self, request, spider): def _deactivate(response): self.active.remove(request) return response self.active.add(request) dfd = self.middleware.download(self._enqueue_request, request, spider) return dfd.addBoth(_deactivate)
####先在 DownloaderMiddlewareManager 加工 request
C:\Program Files\Anaconda2\Lib\site-packages\scrapy\core\downloader\middleware.py
class DownloaderMiddlewareManager(MiddlewareManager): def download(self, download_func, request, spider): @defer.inlineCallbacks def process_request(request): for method in self.methods[‘process_request‘]: response = yield method(request=request, spider=spider) assert response is None or isinstance(response, (Response, Request)), ‘Middleware %s.process_request must return None, Response or Request, got %s‘ % (six.get_method_self(method).__class__.__name__, response.__class__.__name__) if response: defer.returnValue(response) defer.returnValue((yield download_func(request=request,spider=spider))) #正常流程是對 request 進行一系列加工,去 yield 傳入的 _enqueue_request() deferred = mustbe_deferred(process_request, request) #正常流程走完,激活走下面的 process_exception 或 process_response deferred.addErrback(process_exception) deferred.addCallback(process_response) return deferred
####加工後的request存入downloader維護的 self.slots{hostname:slot},順便從當前slot queue取出request交給handler下載,直到填滿當前域名最大並行數
C:\Program Files\Anaconda2\Lib\site-packages\scrapy\core\downloader\__init__.py
class Downloader(object): def __init__(self, crawler): self.slots = {} #字典 self.slots[key] = Slot(conc, delay, self.randomize_delay) def _enqueue_request(self, request, spider): #從scheduler取出request》》》downloader.fetch:mw加工request,在downloader維護的slots字典中選定slot key, slot = self._get_slot(request, spider) #key默認是hostname,指向一個對應的slot。如果字典沒有對應的slot,新建。 request.meta[‘download_slot‘] = key def _deactivate(response): slot.active.remove(request) return response slot.active.add(request) ## deferred = defer.Deferred().addBoth(_deactivate) ## slot.queue.append((request, deferred)) # queue無限存儲 active set() transferring set() self._process_queue(spider, slot) # 當前hostname對應的slot的並行有空缺,則取出request交給handler,並一定就是當前存入slot queue的request。 return deferred def _process_queue(self, spider, slot): if slot.latercall and slot.latercall.active(): return # Delay queue processing if a download_delay is configured now = time() delay = slot.download_delay() #默認DOWNLOAD_DELAY = 0 if delay: penalty = delay - now + slot.lastseen if penalty > 0: slot.latercall = reactor.callLater(penalty, self._process_queue, spider, slot) return # Process enqueued requests if there are free slots to transfer for this slot #只要當前hostname對應的queue存儲不為空而且並行有空缺 while slot.queue and slot.free_transfer_slots() > 0: #默認每個域名即hostname對應一個slot,CONCURRENT_REQUESTS_PER_DOMAIN = 8,這裏計算 8減去self.transferring,下面顯示request交給handler之後加1 slot.lastseen = now request, deferred = slot.queue.popleft() dfd = self._download(slot, request, spider) dfd.chainDeferred(deferred) # prevent burst if inter-request delays were configured if delay: self._process_queue(spider, slot) break def _download(self, slot, request, spider): # The order is very important for the following deferreds. Do not change! # 1. Create the download deferred dfd = mustbe_deferred(self.handlers.download_request, request, spider) #交給handler # 2. Notify response_downloaded listeners about the recent download # before querying queue for next request def _downloaded(response): self.signals.send_catch_log(signal=signals.response_downloaded, response=response, request=request, spider=spider) return response dfd.addCallback(_downloaded) #handler返回resp,通知。。。。。。。。。。。 # 3. After response arrives, remove the request from transferring # state to free up the transferring slot so it can be used by the # following requests (perhaps those which came from the downloader # middleware itself) slot.transferring.add(request) #request交給handler之後,加1 def finish_transferring(_): slot.transferring.remove(request) self._process_queue(spider, slot) return _ return dfd.addBoth(finish_transferring) #handler返回resp,減1
####直到填滿當前slot並行數:從slot queue取出的request,根據 scheme 選擇相應 handler ,比如 http 選擇 HTTPDownloadHandler 實際對應 \core\downloader\handlers\http11.py HTTP11DownloadHandler
C:\Program Files\Anaconda2\Lib\site-packages\scrapy\core\downloader\handlers\__init__.py
class DownloadHandlers(object): def __init__(self, crawler): self._schemes = {} # stores acceptable schemes on instancing handlers = without_none_values( crawler.settings.getwithbase(‘DOWNLOAD_HANDLERS‘)) for scheme, clspath in six.iteritems(handlers): #字典 {scheme: clspath} self._schemes[scheme] = clspath def download_request(self, request, spider): scheme = urlparse_cached(request).scheme handler = self._get_handler(scheme) #選擇 handler if not handler: raise NotSupported("Unsupported URL scheme ‘%s‘: %s" % (scheme, self._notconfigured[scheme])) return handler.download_request(request, spider)
C:\Program Files\Anaconda2\Lib\site-packages\scrapy\core\downloader\handlers\http11.py
最後跟蹤到如下,
其中 agent 是 from twisted.web.client import Agent
L{Agent} is a very basic HTTP client. It supports I{HTTP} and I{HTTPS} scheme URIs
d = agent.request( method, to_bytes(url, encoding=‘ascii‘), headers, bodyproducer) # set download latency d.addCallback(self._cb_latency, request, start_time) # response body is ready to be consumed d.addCallback(self._cb_bodyready, request) d.addCallback(self._cb_bodydone, request, url) # check download timeout self._timeout_cl = reactor.callLater(timeout, d.cancel) d.addBoth(self._cb_timeout, request, url, timeout) return d
正常則最終返回 response
return respcls(url=url, status=status, headers=headers, body=body, flags=flags)
停。
###fetch 將request交給handler拿到結果之後的回調 dwld.addCallbacks(_on_success) 和 dwld.addBoth(_on_complete)
C:\Program Files\Anaconda2\Lib\site-packages\scrapy\core\engine.py
class ExecutionEngine(object): def _download(self, request, spider): slot = self.slot slot.add_request(request) def _on_success(response): assert isinstance(response, (Response, Request)) if isinstance(response, Response): response.request = request # tie request to response received logkws = self.logformatter.crawled(request, response, spider) logger.log(*logformatter_adapter(logkws), extra={‘spider‘: spider}) self.signals.send_catch_log(signal=signals.response_received, response=response, request=request, spider=spider) return response #如果結果是response,發出信號 def _on_complete(_): slot.nextcall.schedule() #觸發同心跳操作 return _ dwld = self.downloader.fetch(request, spider) dwld.addCallbacks(_on_success) dwld.addBoth(_on_complete) return dwld #返回結果
3. d.addBoth(self._handle_downloader_output, request, spider) #拿到下載結果執行回調
###回調 d.addBoth(self._handle_downloader_output, request, spider)
C:\Program Files\Anaconda2\Lib\site-packages\scrapy\core\engine.py
class ExecutionEngine(object): def _handle_downloader_output(self, response, request, spider): assert isinstance(response, (Request, Response, Failure)), response #結果只能是 request/response/failure # downloader middleware can return requests (for example, redirects) if isinstance(response, Request): #結果是 request self.crawl(response, spider) return # response is a Response or Failure d = self.scraper.enqueue_scrape(response, request, spider) #結果是 response/failure,交給scraper d.addErrback(lambda f: logger.error(‘Error while enqueuing downloader output‘, exc_info=failure_to_exc_info(f), extra={‘spider‘: spider})) return d
###回調 d.addBoth(lambda _: slot.remove_request(request))
###回調 d.addBoth(lambda _: slot.nextcall.schedule())
調度 engine._next_request_from_scheduler() 取出request交給handler,結果是request,執行engine.craw(),結果是resp/fail,下一步看scraper.enqueue_scrape()