1. 程式人生 > >Twisted使用和scrapy源碼剖析

Twisted使用和scrapy源碼剖析

我們 系列 www 延遲 crawler b16 目的 下一個 tar

1.Twisted是用Python實現的基於事件驅動的網絡引擎框架。

事件驅動編程是一種編程範式,這裏程序的執行流由外部事件來決定。它的特點是包含一個事件循環,當外部事件發生時使用回調機制來觸發相應的處理。另外兩種常見的編程範式是(單線程)同步以及多線程編程。

from twisted.internet import reactor   # 事件循環(終止條件,所有的socket都已經移除)
from twisted.web.client import getPage # socket對象(如果下載完成,自動從時間循環中移除...)
from twisted.internet import defer # defer.Deferred 特殊的socket對象 (不會發請求,手動移除)
1.利用getPage創建socket
2.將socket添加到事件循環中
3.開始事件循環(無法自動結束)
def response(content):
    print(content)

@defer.inlineCallbacks
def task():
    url = "http://www.baidu.com"
    d = getPage(url.encode(utf-8))
    d.addCallback(response)
    yield d

def done(*args,**kwargs):
    reactor.stop()#在這裏加上也無法自動結束
task() reactor.run() ######################## 1.利用getPage創建socket 2.將socket添加到事件循環中 3.開始事件循環(自動結束) def response(content): print(content) @defer.inlineCallbacks def task(): url = "http://www.baidu.com" d = getPage(url.encode(utf-8)) d.addCallback(response) yield d def done(*args,**kwargs): reactor.stop() d
= task() dd = defer.DeferredList([d,]) dd.addBoth(done) reactor.run()

Twisted實現了設計模式中的反應堆(reactor)模式,這種模式在單線程環境中調度多個事件源產生的事件到它們各自的事件處理例程中去。

在異步版的URL獲取器中,reactor.run()啟動reactor事件循環。

Twisted的核心就是reactor事件循環。Reactor可以感知網絡、文件系統以及定時器事件。它等待然後處理這些事件,從特定於平臺的行為中抽象出來,並提供統一的接口,使得在網絡協議棧的任何位置對事件做出響應都變得簡單。

2.Deferred對象以抽象化的方式表達了一種思想,即結果還尚不存在。它同樣能夠幫助管理產生這個結果所需要的回調鏈。當從函數中返回時,Deferred對象承諾在某個時刻函數將產生一個結果。返回的Deferred對象中包含所有註冊到事件上的回調引用,因此在函數間只需要傳遞這一個對象即可,跟蹤這個對象比單獨管理所有的回調要簡單的多。

Deferred對象創建時包含兩個添加回調的階段。第一階段,addCallbacks將response添加到歸屬的回調鏈中。然後addBoth再將done同時添加到這兩個回調鏈上。

# 1.利用getPage創建socket
# 2.將socket添加到事件循環中
# 3.開始事件循環(自動結束)
def response(content):
    print(content)

@defer.inlineCallbacks
def task():
    url = "http://www.baidu.com"
    d = getPage(url.encode(utf-8))
    d.addCallback(response)
    yield d
    url = "http://www.baidu.com"
    d = getPage(url.encode(utf-8))
    d.addCallback(response)
    yield d

def done(*args,**kwargs):
    reactor.stop()

li = []
for i in range(10):
    d = task()
    li.append(d)
dd = defer.DeferredList(li)
dd.addBoth(done)
reactor.run()
#########################
# 1.利用getPage創建socket
# 2.將socket添加到事件循環中
# 3.開始事件循環(自動結束)
def response(content):
    print(content)

@defer.inlineCallbacks
def task():
    url = "http://www.baidu.com"
    d1 = getPage(url.encode(utf-8))
    d1.addCallback(response)

    url = "http://www.baidu.com"
    d2 = getPage(url.encode(utf-8))
    d2.addCallback(response)

    url = "http://www.baidu.com"
    d3 = getPage(url.encode(utf-8))
    d3.addCallback(response)
    yield defer.Deferred()

def done(*args,**kwargs):
    reactor.stop()

d=task()
dd = defer.DeferredList([d,])
dd.addBoth(done)
reactor.run()

3.自定義scrapy框架

技術分享圖片
from twisted.internet import reactor   # 事件循環(終止條件,所有的socket都已經移除)
from twisted.web.client import getPage # socket對象(如果下載完成,自動從時間循環中移除...)
from twisted.internet import defer     # defer.Deferred 特殊的socket對象 (不會發請求,手動移除)

class Request(object):
    def __init__(self,url,callback):
        self.url = url
        self.callback = callback

class HttpResponse(object):
    def __init__(self,content,request):
        self.content = content
        self.request = request
        self.url = request.url
        self.text = str(content,encoding=utf-8)

class ChoutiSpider(object):
    name = chouti
    def start_requests(self):
        start_url = [http://www.baidu.com,http://www.bing.com,]
        for url in start_url:
            yield Request(url,self.parse)

    def parse(self,response):
        print(-----response----->,response) #response是下載的頁面
        yield Request(http://www.cnblogs.com,callback=self.parse)
        #1 crawling移除
        #2 獲取parse yield值
        #3 再次去隊列中獲取

import queue
Q = queue.Queue()

class Engine(object):
    def __init__(self):
        self._close = None
        self.max = 5
        self.crawlling = []

    def get_response_callback(self,content,request):
        self.crawlling.remove(request)
        rep = HttpResponse(content,request)
        result = request.callback(rep)#content和request
        # print(result)#<generator object ChoutiSpider.parse at 0x000001F694A2C9E8>
        import types
        if isinstance(result,types.GeneratorType):
            for req in result:
                # print(‘-------------->‘,req)
                Q.put(req)

    def _next_request(self):
        """
        去取request對象,並發送請求
        最大並發數限制
        :return:
        """
        print(---->request,self.crawlling,Q.qsize())
        if Q.qsize() == 0 and len(self.crawlling) == 0:
            self._close.callback(None)
            return

        if len(self.crawlling) >= self.max:
            return
        while len(self.crawlling) < self.max:
            try:
                req = Q.get(block=False)
                self.crawlling.append(req)
                d = getPage(req.url.encode(utf-8))
                # 頁面下載完成,get_response_callback,調用用戶spider中定義的parse方法,並且將新請求添加到調度器
                d.addCallback(self.get_response_callback,req)
                # 未達到最大並發數,可以再去調度器中獲取Request
                d.addCallback(lambda _:reactor.callLater(0, self._next_request))
            except Exception as e:
                # print(e)
                return

    @defer.inlineCallbacks
    def crawl(self,spider):
        # 將初始Request對象添加到調度器
        start_requests = iter(spider.start_requests())
        while True:
            try:
                request = next(start_requests)
                Q.put(request)
            except StopIteration as e:
                break

        # 去調度器中取request,並發送請求
        # self._next_request()
        reactor.callLater(0, self._next_request)
        self._close = defer.Deferred()
        yield self._close

spider = ChoutiSpider()
_active = set()
engine = Engine()
d = engine.crawl(spider)
_active.add(d)

dd = defer.DeferredList(_active)
dd.addBoth(lambda _:reactor.stop())
reactor.run()
View Code

4. 根據源碼重寫engine

from twisted.internet import reactor   # 事件循環(終止條件,所有的socket都已經移除)
from twisted.web.client import getPage # socket對象(如果下載完成,自動從時間循環中移除...)
from twisted.internet import defer     # defer.Deferred 特殊的socket對象 (不會發請求,手動移除)
from queue import Queue

class Request(object):
    """
    用於封裝用戶請求相關信息
    """
    def __init__(self,url,callback):
        self.url = url
        self.callback = callback

class HttpResponse(object):
    def __init__(self,content,request):
        self.content = content
        self.request = request

class Scheduler(object):
    """
    任務調度器
    """
    def __init__(self):
        self.q = Queue()

    def open(self):
        pass

    def next_request(self):
        try:
            req = self.q.get(block=False)
        except Exception as e:
            req = None
        return req

    def enqueue_request(self,req):
        self.q.put(req)

    def size(self):
        return self.q.qsize()

class ExecutionEngine(object):
    """
    引擎:所有調度
    """
    def __init__(self):
        self._close = None
        self.scheduler = None
        self.max = 5
        self.crawlling = []

    def get_response_callback(self,content,request):
        self.crawlling.remove(request)
        response = HttpResponse(content,request)
        result = request.callback(response)
        import types
        if isinstance(result,types.GeneratorType):
            for req in result:
                self.scheduler.enqueue_request(req)

    def _next_request(self):
        if self.scheduler.size() == 0 and len(self.crawlling) == 0:
            self._close.callback(None)
            return

        while len(self.crawlling) < self.max:
            req = self.scheduler.next_request()
            if not req:
                return
            self.crawlling.append(req)
            d = getPage(req.url.encode(utf-8))
            d.addCallback(self.get_response_callback,req)
            d.addCallback(lambda _:reactor.callLater(0,self._next_request))

    @defer.inlineCallbacks
    def open_spider(self,start_requests):
        self.scheduler = Scheduler()
        yield self.scheduler.open()
        while True:
            try:
                req = next(start_requests)
            except StopIteration as e:
                break
            self.scheduler.enqueue_request(req)
        reactor.callLater(0,self._next_request)

    @defer.inlineCallbacks
    def start(self):
        self._close = defer.Deferred()
        yield self._close

class Crawler(object):
    """
    用戶封裝調度器以及引擎的...
    """
    def _create_engine(self):
        return ExecutionEngine()

    def _create_spider(self,spider_cls_path):
        """
        :param spider_cls_path:  spider.chouti.ChoutiSpider
        :return:
        """
        module_path,cls_name = spider_cls_path.rsplit(.,maxsplit=1)
        import importlib
        m = importlib.import_module(module_path)
        cls = getattr(m,cls_name)
        return cls()

    @defer.inlineCallbacks
    def crawl(self,spider_cls_path):
        engine = self._create_engine()
        spider = self._create_spider(spider_cls_path)
        start_requests = iter(spider.start_requests())
        yield engine.open_spider(start_requests)
        yield engine.start()

class CrawlerProcess(object):
    """
    開啟事件循環
    """
    def __init__(self):
        self._active = set()

    def crawl(self,spider_cls_path):
        """
        :param spider_cls_path:
        :return:
        """
        crawler = Crawler()
        d = crawler.crawl(spider_cls_path)
        self._active.add(d)

    def start(self):
        dd = defer.DeferredList(self._active)
        dd.addBoth(lambda _:reactor.stop())
        reactor.run()

class Commond(object):
    def run(self):
        crawl_process = CrawlerProcess()
        spider_cls_path_list = [spider.chouti.ChoutiSpider,spider.cnblogs.CnblogsSpider,]
        for spider_cls_path in spider_cls_path_list:
            crawl_process.crawl(spider_cls_path)
        crawl_process.start()

if __name__ == __main__:
    cmd = Commond()
    cmd.run()

@defer.inlineCallbacks是一個裝飾器並用來裝飾生成器函數.inlineCallbacks 的主要的目的就是把一個生成器變成一系列的異步的callbacks.

當我們調用一個用inlineCallbacks 修飾的函數的時候,我們不需要調用下一個或者發送或者拋出我們自己.這個裝飾器會幫我們完成這些並會確保我們的生成器會一直運行到底(假設它並沒有拋出異常).

一個被inlineCallbacks修飾的函數會返回deferred.因為我們不知道生成器什麽時候會停止運行,這個被修飾過的函數是一個異步的函數,最適合返回的是deferred.註意這個返回的deferred 不是yield 語句返回的deferred,它是這個生成器全部運行完畢之後才觸發的deferred.

使用了callLater 在一段時間之後去觸發deferred.這是一個很方便的把非阻塞的延遲放入callback 鏈的方法,一般來說,在我們的生成器中我們會不斷的返回一個已經被觸發過的deferred.

以上代碼執行以下連個spider。

技術分享圖片
from engine import Request
class ChoutiSpider(object):

    name = chouti

    def start_requests(self):
        start_url = [http://www.baidu.com,http://www.bing.com,]
        for url in start_url:
            yield Request(url,self.parse)

    def parse(self,response):
        print(response) #response是下載的頁面
        yield Request(http://www.cnblogs.com,callback=self.parse)
spider1 技術分享圖片
from engine import Request
class CnblogsSpider(object):

    name = cnblogs

    def start_requests(self):
        start_url = [http://www.cnblogs.com,]
        for url in start_url:
            yield Request(url,self.parse)

    def parse(self,response):
        print(response) #response是下載的頁面
        yield Request(http://www.cnblogs.com,callback=self.parse)
spider2

Twisted使用和scrapy源碼剖析