1. 程式人生 > >淺析Scrapy框架運行的基本流程

淺析Scrapy框架運行的基本流程

enqueue ffffff yield 圖片 而且 name RoCE 最大 調度器

本篇博客將從Twisted的下載任務基本流程開始介紹,然後再一步步過渡到Scrapy框架的基本運行流程,其中還會需要我們自定義一個Low版的Scrapy框架。但內容不會涉及太多具體細節,而且需要註意的是示例代碼的運行過程不會Scrapy一模一樣,但不影響你對整體的把握。希望可以幫助那些剛入門爬蟲或者剛學習Scrapy的同學理清思路,做到對Scrapy的運行流程有個大概把握,這樣以後在繼續深入Scrapy框架或者擴展其應用時更加得心應手。(PS:大佬可忽略:))

一、Twisted的下載任務基本過程

首先需要我們導入三個基本的模塊

1 from twisted.internet import reactor   
2 from twisted.web.client import getPage 3 from twisted.internet import defer

  • reactor的作用是開啟事件循環,可以簡單理解為select或者epoll,循環監聽socket對象,一旦連接成功或者響應完成,就移除掉對應的socket,不再監聽。(終止條件,所有的socket都已移除)

  • getPage的作用是自動幫我們創建socket對象,模擬瀏覽器給服務器發送請求,放到reactor裏面,一旦下載到需要的數據,socket將發生變化,就可以被reactor監聽到。(如果下載完成,自動從事件循環中移除)

  • defer裏面的defer.Deferred()的作用是創建一個特殊的socket對象,它不會發送請求,如果放在事件循環裏面,將永遠不會被移除掉,需要我們自己手動移除,作用就是使事件循環一直監聽,不會停止。(不會發請求,手動移除)

1、利用getPage創建socket

def response(content):
      print(content)


def task():
      url = "http://www.baidu.com"
      d = getPage(url)  # 創建socket對象,獲取返回的對象d,它其實也是一個Deferred對象
      d.addCallback(response)  #
當頁面下載完成,自動執行函數response,來處理返回數據

上面代碼的內部執行過程就是自動創建socket對象,並且發送請求,如果請求成功拿到返回的值,自動調用函數response來處理返回的內容。

但以上內容你可以理解只是創建了socket,並且給了運行結束後應該執行的函數。但還沒有加到事件循環。

2.將socket添加到事件循環中

def response(content):
      print(content)

@defer.inlineCallbacks  # 添加事件循環
def task():
      url = "http://www.baidu.com"
      d = getPage(url)
      d.addCallback(response)
      yield d

3、開始事件循環

def response(content):
      print(content)

@defer.inlineCallbacks
def task():
      url = "http://www.baidu.com"
      d = getPage(url.encode(utf-8))
      d.addCallback(response)
      yield d


  task()  # 執行函數
  reactor.run()  # 開啟事件循環。不先執行函數的話,在內部循環中該函數是不會執行的。

但是上面的代碼運行後,無法自動終止,需要有一個東西來監聽著,當我們發的所有url回來時終止事件循環

4、開始事件循環(自動結束)

def response(content):
      print(content)
        
        
@defer.inlineCallbacks
def task():
    url = "http://www.baidu.com"
    d = getPage(url.encode(utf-8))
    d.addCallback(response)
    yield d


  d = task()
  dd = defer.DeferredList([d,])
  dd.addBoth(lambda _:reactor.stop())  # 監聽d是否完成或者失敗,如果是,調用回調函數,終止事件循環

  reactor.run()

上面的代碼此時已經實現了基本的功能,但我們還需要再加最後一個功能,就是並發。讓函數task可以一次性發送多個請求,並且在所有請求都回來後再終止事件循環。這時就要用到defer.Deferred

5、並發操作

_closewait = None

def response(content):

      print(content)
      
@defer.inlineCallbacks
def task():
      """
      每個爬蟲的開始:stats_request
      :return:
      """
      url = "http://www.baidu.com"
      d1 = getPage(url.encode(utf-8))
      d1.addCallback(response)

      url = "http://www.cnblogs.com"
      d2 = getPage(url.encode(utf-8))
      d2.addCallback(response)

      url = "http://www.bing.com"
      d3 = getPage(url.encode(utf-8))
      d3.addCallback(response)

      global _closewait
      _close = defer.Deferred()
      yield _closewait


spider1 = task()
dd = defer.DeferredList([spider1,])
dd.addBoth(lambda _:reactor.stop())

reactor.run()

調用defer.Deferred,使事件循環一直運行,直到發出去的每個請求都返回。但是上面的代碼,在請求都回來之後,事件循環還是不會停止,需要我們手動關閉defer.Deferred

6、手動關閉defer.Deferred

_closewait = None
count = 0
def response(content):

      print(content)
      global count
      count += 1
      if count == 3:  # 當等於3時,說明發出去的url的響應都拿到了
          _closewait.callback(None)

@defer.inlineCallbacks
def task():
      """
      每個爬蟲的開始:stats_request
      :return:
      """
      url = "http://www.baidu.com"
      d1 = getPage(url.encode(utf-8))
      d1.addCallback(response)

      url = "http://www.cnblogs.com"
      d2 = getPage(url.encode(utf-8))
      d2.addCallback(response)

      url = "http://www.bing.com"
      d3 = getPage(url.encode(utf-8))
      d3.addCallback(response)

      global _closewait
      _close = defer.Deferred()
      yield _closewait


spider = task()
dd = defer.DeferredList([spider, ])
dd.addBoth(lambda _:reactor.stop())

reactor.run()

總結上面的流程:

(1)利用getPage創建socket

(2)將socket添加到事件循環中

(3)開始事件循環 (內部發送請求,並接受響應;當所有的socekt請求完成後,終止事件循環)

二、low版Scrapy框架

在理解了上面的內容之後,那麽接下來的部分將會變得輕松,基本的思想和上面差不多。

1、首先我們創建一個爬蟲ChoutiSpider,如下代碼所示

有些人可能不知道start_requests是什麽,其實這個跟我們用Scrapy創建的一樣,這個函數在我們繼承的scrapy.Spider裏面,作用就是取start_url列表裏面的url,並把url和規定好的回調函數parse傳給Request類處理。需要註意的是這裏的Request我們只是簡單的接收來自start_requests傳來的值,不做真正的處理,例如發送url請求什麽的

from twisted.internet import defer
from twisted.web.client import getPage
from twisted.internet import reactor


        
class Request(object):

    def __init__(self,url,callback):
        self.url = url
        self.callback = callback
        

class ChoutiSpider(object):
    name = chouti

    def start_requests(self):
        start_url = [http://www.baidu.com,http://www.bing.com,]
        for url in start_url:
            yield Request(url,self.parse)

    def parse(self,response):
        print(response) #response是下載的頁面
      
        yield Request(http://www.cnblogs.com,callback=self.parse)

2、創建引擎Engine,用來接收需要執行的spider對象

from twisted.internet import defer
from twisted.web.client import getPage
from twisted.internet import reactor


class Request(object):

    def __init__(self,url,callback):
        self.url = url
        self.callback = callback
        

class ChoutiSpider(object):
    name = chouti

    def start_requests(self):
        start_url = [http://www.baidu.com,http://www.bing.com,]
        for url in start_url:
            yield Request(url,self.parse)

    def parse(self,response):
        print(response) #response是下載的頁面
      
        yield Request(http://www.cnblogs.com,callback=self.parse)
        
        
import queue
Q = queue.Queue()  # 調度器

class Engine(object):

    def __init__(self):
        self._closewait = None
    
    @defer.inlineCallbacks
    def crawl(self,spider):
       
        start_requests = iter(spider.start_requests())  # 把生成器Request對象轉換為叠代器
        while True:
            try:
                request = next(start_requests)  # 把Request對象放到調度器裏面
                Q.put(request)
            except StopIteration as e:
                break
                

        self._closewait = defer.Deferred()
        yield self._closewait
       

3、更加具體的實現過程

讓引擎實現把Request對象加到調度器Q(隊列)裏面,然後再從調度器取值進行發送、接收和數據處理。

from twisted.internet import defer
from twisted.web.client import getPage
from twisted.internet import reactor

        
class Request(object):

    def __init__(self,url,callback):
        self.url = url
        self.callback = callback


class ChoutiSpider(object):
    name = chouti

    def start_requests(self):
        start_url = [http://www.baidu.com,http://www.bing.com,]
        for url in start_url:
            yield Request(url,self.parse)

    def parse(self,response):
        print(response) #response是下載的頁面
        
        yield Request(http://www.cnblogs.com,callback=self.parse)
        
        
import queue
Q = queue.Queue()


class Engine(object):

    def __init__(self):
        self._closewait = None
        self.max = 5  # 最大並發數
        self.crawlling = []  # 正在爬取的爬蟲個數
    
    def get_response_callback(self,content,request):
        # content就是返回的response, request就是一開始返回的叠代對象
        self.crawlling.remove(request)
        result = request.callback(content)  # 執行parse方法
        import types
        if isinstance(result,types.GeneratorType):  # 判斷返回值是否是yield,如
            for req in result:                      # 果是則加入到隊列中
                Q.put(req)
    
    def _next_request(self):
        """
        去調度器中取request對象,並發送請求
        最大並發數限制
        :return:
        """
        # 什麽時候終止
        if Q.qsize() == 0 and len(self.crawlling) == 0:
            self._closewait.callback(None)  # 結束defer.Deferred 
            return

        if len(self.crawlling) >= self.max:
            return
        while len(self.crawlling) < self.max:
            try:
                req = Q.get(block=False)  # 設置為非堵塞
                self.crawlling.append(req)
                d = getPage(req.url.encode(utf-8))  # 發送請求
                # 頁面下載完成,執行get_response_callback,並且將一開始的叠代對象req作為參數傳過去
                d.addCallback(self.get_response_callback,req)
                # 未達到最大並發數,可以再去調度器中獲取Request
                d.addCallback(lambda _:reactor.callLater(0, self._next_request))  # callLater表示多久後調用
                
            except Exception as e:
                print(e)
                return
    
    @defer.inlineCallbacks
    def crawl(self,spider):
        # 將初始Request對象添加到調度器
        start_requests = iter(spider.start_requests())
        while True:
            try:
                request = next(start_requests)
                Q.put(request)
            except StopIteration as e:
                break
                
        # 去調度器中取request,並發送請求
        # self._next_request()
        reactor.callLater(0, self._next_request)

        self._close = defer.Deferred()
        yield self._closewait
     
    
spider = ChoutiSpider()
_active = set()
engine = Engine()

d = engine.crawl(spider)  
_active.add(d)
dd = defer.DeferredList(_active)
dd.addBoth(lambda _:reactor.stop())

reactor.run()

上面其實就是Scrapy框架的基本實現過程,不過要想更進一步的了解,就需要模仿Scrapy的源碼,把各個函數的功能獨立封裝起來,如下。

三、封裝

from twisted.internet import reactor   
from twisted.web.client import getPage 
from twisted.internet import defer     
from queue import Queue

class Request(object):
    """
    用於封裝用戶請求相關信息
    """
    def __init__(self,url,callback):
        self.url = url
        self.callback = callback

class HttpResponse(object):  # 對返回的content和request做進一步封裝,集合到response裏面。然後通過回調函數parse調用response.xxx可以調用不同的功能。具體過程略

    def __init__(self,content,request):
        self.content = content
        self.request = request

class Scheduler(object): 
    """
    任務調度器
    """
       def __init__(self):
        self.q = Queue()

    def open(self):
        pass

    def next_request(self):  # 取調度器裏面的值並發送
        try:
            req = self.q.get(block=False)
        except Exception as e:
            req = None
        return req

    def enqueue_request(self,req):
        self.q.put(req)

    def size(self):
        return self.q.qsize()

class ExecutionEngine(object):
    """
    引擎:所有調度
    """
    def __init__(self):
        self._closewait = None
        self.scheduler = None
        self.max = 5
        self.crawlling = []
    
    def get_response_callback(self,content,request):
        self.crawlling.remove(request) 
        response = HttpResponse(content,request)
        result = request.callback(response)  # 執行request裏面的回調函數(parse)
        import types
        if isinstance(result,types.GeneratorType):
            for req in result:
                self.scheduler.enqueue_request(req)
    
    def _next_request(self):
        if self.scheduler.size() == 0 and len(self.crawlling) == 0: 
            self._closewait.callback(None)
            return
        
        while len(self.crawlling) < self.max:
            req = self.scheduler.next_request()
            if not req:  
                return
            self.crawlling.append(req)
            d = getPage(req.url.encode(utf-8))  
            d.addCallback(self.get_response_callback,req)
            d.addCallback(lambda _:reactor.callLater(0,self._next_request))
    
    @defer.inlineCallbacks
    def open_spider(self,start_requests):
        self.scheduler = Scheduler()  # 創建調度器對象
        
        while True:
            try:
                req = next(start_requests)
            except StopIteration as e:
                break
            self.scheduler.enqueue_request(req)  # 放進調度器裏面
        yield self.scheduler.open()  # 使用那個裝飾器後一定要有yield,因此傳個   None 相當於yield None
        reactor.callLater(0,self._next_request)   

    @defer.inlineCallbacks
    def start(self):  
        self._closewait = defer.Deferred()
        yield self._closewait

class Crawler(object):
    """
    用戶封裝調度器以及引擎的...
    """
    def _create_engine(self):  # 創建引擎對象 
        return ExecutionEngine()

    def _create_spider(self,spider_cls_path):  
        """

        :param spider_cls_path:  spider.chouti.ChoutiSpider
        :return:
        """
        module_path,cls_name = spider_cls_path.rsplit(.,maxsplit=1)
        import importlib
        m = importlib.import_module(module_path)
        cls = getattr(m,cls_name)
        return cls()

    @defer.inlineCallbacks
    def crawl(self,spider_cls_path): 
        engine = self._create_engine()  # 需要把任務交給引擎去做
        spider = self._create_spider(spider_cls_path)  # 通過路徑獲取spider,創建對象
        start_requests = iter(spider.start_requests()) 
        yield engine.open_spider(start_requests)  # 把叠代器往調度器裏面放
        yield engine.start()  # 相當於self._closewait方法  

class CrawlerProcess(object):
    """
    開啟事件循環
    """
    def __init__(self):
        self._active = set()
        
    def crawl(self,spider_cls_path):  # 每個爬蟲過來只負責創建一個crawler對象
        """                          
        :param spider_cls_path:
        :return:
        """
        crawler = Crawler()
        d = crawler.crawl(spider_cls_path)
        self._active.add(d)
    
    def start(self):  # 開啟事件循環
        dd = defer.DeferredList(self._active)
        dd.addBoth(lambda _:reactor.stop())

        reactor.run()  # 想run()起來,則要有裝飾器和yield

class Commond(object):

        def run(self):
            crawl_process = CrawlerProcess() 
            spider_cls_path_list =  [spider.chouti.ChoutiSpider,spider.cnblogs.CnblogsSpider,]  
            for spider_cls_path in spider_cls_path_list:
                crawl_process.crawl(spider_cls_path)  # 創建多個defer對象,也就是把d添加到_active裏面
            crawl_process.start()  # 統一調用一次,相當於reactor.run()


if __name__ == __main__:
    cmd = Commond()
    cmd.run()

其實上面的代碼就是從下往上依次執行,每個類之間各個完成不同的功能。

配合下面的思維導圖,希望可以幫助理解。

技術分享圖片

淺析Scrapy框架運行的基本流程