淺析Scrapy框架運行的基本流程
一、Twisted的下載任務基本過程
首先需要我們導入三個基本的模塊
1 from twisted.internet import reactor2 from twisted.web.client import getPage 3 from twisted.internet import defer
-
-
getPage
的作用是自動幫我們創建socket
對象,模擬瀏覽器給服務器發送請求,放到reactor裏面,一旦下載到需要的數據,socket
將發生變化,就可以被reactor
監聽到。(如果下載完成,自動從事件循環中移除) -
defer
裏面的defer.Deferred()
的作用是創建一個特殊的socket
對象,它不會發送請求,如果放在事件循環裏面,將永遠不會被移除掉,需要我們自己手動移除,作用就是使事件循環一直監聽,不會停止。(不會發請求,手動移除)
1、利用getPage創建socket
def response(content): print(content) def task(): url = "http://www.baidu.com" d = getPage(url) # 創建socket對象,獲取返回的對象d,它其實也是一個Deferred對象 d.addCallback(response) #當頁面下載完成,自動執行函數response,來處理返回數據
但以上內容你可以理解只是創建了socket
,並且給了運行結束後應該執行的函數。但還沒有加到事件循環。
2.將socket添加到事件循環中
def response(content): print(content) @defer.inlineCallbacks # 添加事件循環 def task(): url = "http://www.baidu.com" d = getPage(url) d.addCallback(response) yield d
3、開始事件循環
def response(content): print(content) @defer.inlineCallbacks def task(): url = "http://www.baidu.com" d = getPage(url.encode(‘utf-8‘)) d.addCallback(response) yield d task() # 執行函數 reactor.run() # 開啟事件循環。不先執行函數的話,在內部循環中該函數是不會執行的。
4、開始事件循環(自動結束)
def response(content): print(content) @defer.inlineCallbacks def task(): url = "http://www.baidu.com" d = getPage(url.encode(‘utf-8‘)) d.addCallback(response) yield d d = task() dd = defer.DeferredList([d,]) dd.addBoth(lambda _:reactor.stop()) # 監聽d是否完成或者失敗,如果是,調用回調函數,終止事件循環 reactor.run()
5、並發操作
_closewait = None def response(content): print(content) @defer.inlineCallbacks def task(): """ 每個爬蟲的開始:stats_request :return: """ url = "http://www.baidu.com" d1 = getPage(url.encode(‘utf-8‘)) d1.addCallback(response) url = "http://www.cnblogs.com" d2 = getPage(url.encode(‘utf-8‘)) d2.addCallback(response) url = "http://www.bing.com" d3 = getPage(url.encode(‘utf-8‘)) d3.addCallback(response) global _closewait _close = defer.Deferred() yield _closewait spider1 = task() dd = defer.DeferredList([spider1,]) dd.addBoth(lambda _:reactor.stop()) reactor.run()
6、手動關閉
_closewait = None count = 0 def response(content): print(content) global count count += 1 if count == 3: # 當等於3時,說明發出去的url的響應都拿到了 _closewait.callback(None) @defer.inlineCallbacks def task(): """ 每個爬蟲的開始:stats_request :return: """ url = "http://www.baidu.com" d1 = getPage(url.encode(‘utf-8‘)) d1.addCallback(response) url = "http://www.cnblogs.com" d2 = getPage(url.encode(‘utf-8‘)) d2.addCallback(response) url = "http://www.bing.com" d3 = getPage(url.encode(‘utf-8‘)) d3.addCallback(response) global _closewait _close = defer.Deferred() yield _closewait spider = task() dd = defer.DeferredList([spider, ]) dd.addBoth(lambda _:reactor.stop()) reactor.run()
(2)將socket添加到事件循環中
(3)開始事件循環 (內部發送請求,並接受響應;當所有的socekt請求完成後,終止事件循環)
在理解了上面的內容之後,那麽接下來的部分將會變得輕松,基本的思想和上面差不多。
1、首先我們創建一個爬蟲ChoutiSpider
,如下代碼所示
。
有些人可能不知道start_requests
是什麽,其實這個跟我們用Scrapy創建的一樣,這個函數在我們繼承的scrapy.Spider
裏面,作用就是取start_url
列表裏面的url,並把url
和規定好的回調函數parse
傳給Request
類處理。需要註意的是這裏的Request
我們只是簡單的接收來自start_requests
傳來的值,不做真正的處理,例如發送url請求什麽的
from twisted.internet import defer from twisted.web.client import getPage from twisted.internet import reactor class Request(object): def __init__(self,url,callback): self.url = url self.callback = callback class ChoutiSpider(object): name = ‘chouti‘ def start_requests(self): start_url = [‘http://www.baidu.com‘,‘http://www.bing.com‘,] for url in start_url: yield Request(url,self.parse) def parse(self,response): print(response) #response是下載的頁面 yield Request(‘http://www.cnblogs.com‘,callback=self.parse)
2、創建引擎Engine,用來接收需要執行的spider對象
from twisted.internet import defer from twisted.web.client import getPage from twisted.internet import reactor class Request(object): def __init__(self,url,callback): self.url = url self.callback = callback class ChoutiSpider(object): name = ‘chouti‘ def start_requests(self): start_url = [‘http://www.baidu.com‘,‘http://www.bing.com‘,] for url in start_url: yield Request(url,self.parse) def parse(self,response): print(response) #response是下載的頁面 yield Request(‘http://www.cnblogs.com‘,callback=self.parse) import queue Q = queue.Queue() # 調度器 class Engine(object): def __init__(self): self._closewait = None @defer.inlineCallbacks def crawl(self,spider): start_requests = iter(spider.start_requests()) # 把生成器Request對象轉換為叠代器 while True: try: request = next(start_requests) # 把Request對象放到調度器裏面 Q.put(request) except StopIteration as e: break self._closewait = defer.Deferred() yield self._closewait
from twisted.internet import defer from twisted.web.client import getPage from twisted.internet import reactor class Request(object): def __init__(self,url,callback): self.url = url self.callback = callback class ChoutiSpider(object): name = ‘chouti‘ def start_requests(self): start_url = [‘http://www.baidu.com‘,‘http://www.bing.com‘,] for url in start_url: yield Request(url,self.parse) def parse(self,response): print(response) #response是下載的頁面 yield Request(‘http://www.cnblogs.com‘,callback=self.parse) import queue Q = queue.Queue() class Engine(object): def __init__(self): self._closewait = None self.max = 5 # 最大並發數 self.crawlling = [] # 正在爬取的爬蟲個數 def get_response_callback(self,content,request): # content就是返回的response, request就是一開始返回的叠代對象 self.crawlling.remove(request) result = request.callback(content) # 執行parse方法 import types if isinstance(result,types.GeneratorType): # 判斷返回值是否是yield,如 for req in result: # 果是則加入到隊列中 Q.put(req) def _next_request(self): """ 去調度器中取request對象,並發送請求 最大並發數限制 :return: """ # 什麽時候終止 if Q.qsize() == 0 and len(self.crawlling) == 0: self._closewait.callback(None) # 結束defer.Deferred return if len(self.crawlling) >= self.max: return while len(self.crawlling) < self.max: try: req = Q.get(block=False) # 設置為非堵塞 self.crawlling.append(req) d = getPage(req.url.encode(‘utf-8‘)) # 發送請求 # 頁面下載完成,執行get_response_callback,並且將一開始的叠代對象req作為參數傳過去 d.addCallback(self.get_response_callback,req) # 未達到最大並發數,可以再去調度器中獲取Request d.addCallback(lambda _:reactor.callLater(0, self._next_request)) # callLater表示多久後調用 except Exception as e: print(e) return @defer.inlineCallbacks def crawl(self,spider): # 將初始Request對象添加到調度器 start_requests = iter(spider.start_requests()) while True: try: request = next(start_requests) Q.put(request) except StopIteration as e: break # 去調度器中取request,並發送請求 # self._next_request() reactor.callLater(0, self._next_request) self._close = defer.Deferred() yield self._closewait spider = ChoutiSpider() _active = set() engine = Engine() d = engine.crawl(spider) _active.add(d) dd = defer.DeferredList(_active) dd.addBoth(lambda _:reactor.stop()) reactor.run()
上面其實就是Scrapy框架的基本實現過程,不過要想更進一步的了解,就需要模仿Scrapy的源碼,把各個函數的功能獨立封裝起來,如下。
三、封裝
from twisted.internet import reactor from twisted.web.client import getPage from twisted.internet import defer from queue import Queue class Request(object): """ 用於封裝用戶請求相關信息 """ def __init__(self,url,callback): self.url = url self.callback = callback class HttpResponse(object): # 對返回的content和request做進一步封裝,集合到response裏面。然後通過回調函數parse調用response.xxx可以調用不同的功能。具體過程略 def __init__(self,content,request): self.content = content self.request = request class Scheduler(object): """ 任務調度器 """ def __init__(self): self.q = Queue() def open(self): pass def next_request(self): # 取調度器裏面的值並發送 try: req = self.q.get(block=False) except Exception as e: req = None return req def enqueue_request(self,req): self.q.put(req) def size(self): return self.q.qsize() class ExecutionEngine(object): """ 引擎:所有調度 """ def __init__(self): self._closewait = None self.scheduler = None self.max = 5 self.crawlling = [] def get_response_callback(self,content,request): self.crawlling.remove(request) response = HttpResponse(content,request) result = request.callback(response) # 執行request裏面的回調函數(parse) import types if isinstance(result,types.GeneratorType): for req in result: self.scheduler.enqueue_request(req) def _next_request(self): if self.scheduler.size() == 0 and len(self.crawlling) == 0: self._closewait.callback(None) return while len(self.crawlling) < self.max: req = self.scheduler.next_request() if not req: return self.crawlling.append(req) d = getPage(req.url.encode(‘utf-8‘)) d.addCallback(self.get_response_callback,req) d.addCallback(lambda _:reactor.callLater(0,self._next_request)) @defer.inlineCallbacks def open_spider(self,start_requests): self.scheduler = Scheduler() # 創建調度器對象 while True: try: req = next(start_requests) except StopIteration as e: break self.scheduler.enqueue_request(req) # 放進調度器裏面 yield self.scheduler.open() # 使用那個裝飾器後一定要有yield,因此傳個 None 相當於yield None reactor.callLater(0,self._next_request) @defer.inlineCallbacks def start(self): self._closewait = defer.Deferred() yield self._closewait class Crawler(object): """ 用戶封裝調度器以及引擎的... """ def _create_engine(self): # 創建引擎對象 return ExecutionEngine() def _create_spider(self,spider_cls_path): """ :param spider_cls_path: spider.chouti.ChoutiSpider :return: """ module_path,cls_name = spider_cls_path.rsplit(‘.‘,maxsplit=1) import importlib m = importlib.import_module(module_path) cls = getattr(m,cls_name) return cls() @defer.inlineCallbacks def crawl(self,spider_cls_path): engine = self._create_engine() # 需要把任務交給引擎去做 spider = self._create_spider(spider_cls_path) # 通過路徑獲取spider,創建對象 start_requests = iter(spider.start_requests()) yield engine.open_spider(start_requests) # 把叠代器往調度器裏面放 yield engine.start() # 相當於self._closewait方法 class CrawlerProcess(object): """ 開啟事件循環 """ def __init__(self): self._active = set() def crawl(self,spider_cls_path): # 每個爬蟲過來只負責創建一個crawler對象 """ :param spider_cls_path: :return: """ crawler = Crawler() d = crawler.crawl(spider_cls_path) self._active.add(d) def start(self): # 開啟事件循環 dd = defer.DeferredList(self._active) dd.addBoth(lambda _:reactor.stop()) reactor.run() # 想run()起來,則要有裝飾器和yield class Commond(object): def run(self): crawl_process = CrawlerProcess() spider_cls_path_list = [‘spider.chouti.ChoutiSpider‘,‘spider.cnblogs.CnblogsSpider‘,] for spider_cls_path in spider_cls_path_list: crawl_process.crawl(spider_cls_path) # 創建多個defer對象,也就是把d添加到_active裏面 crawl_process.start() # 統一調用一次,相當於reactor.run() if __name__ == ‘__main__‘: cmd = Commond() cmd.run()
其實上面的代碼就是從下往上依次執行,每個類之間各個完成不同的功能。
配合下面的思維導圖,希望可以幫助理解。
淺析Scrapy框架運行的基本流程