1. 程式人生 > 實用技巧 ><scrapy>scrapy原始碼剖析

<scrapy>scrapy原始碼剖析

  • 前戲Twisted使用
    • 1.簡單實現版本1.0
      • # 事件迴圈,監聽socket變化(終止條件,所有socket都已經移除)
        from twisted.internet import reactor
        # 建立socket物件,如果下載完成,自動從事件迴圈中移除
        from twisted.web.client import getPage
        # defer.Deferred 特殊的socket的物件,不給任何地址發請求,不會自動移除,需要手動移除
        from twisted.internet import defer
        
        
        # 1.利用getPage建立socket
        # 2.將socket新增到事件迴圈
        # 3.開始事件迴圈(內部發送請求,並接受響應,當所有socket請求完成,終止事件迴圈)
        
        # 1.利用getPage建立socket
        def response(content):
        	print(content)
        
        # 2.將socket新增到事件迴圈
        # 這個裝飾器和yield d表示將socket已經新增到事件迴圈
        @defer.inlineCallbacks
        def task():
        	url = "http://www.baidu.com"
        	# 根據域名找到你的ip,並建立socket,返回值d和defer.Deferred類似,不會自動終止的特殊socket物件
        	d = getPage(url.encode('utf-8'))
        	# 利用socket發請求,請求完成拿到值,執行response函式
        	d.addCallback(response)
        
        	yield d
        
        # 執行task函式
        task()
        # 3.開始事件迴圈
        reactor.run()
    • 版本2.0
      • # 事件迴圈,監聽socket變化(終止條件,所有socket都已經移除)
        from twisted.internet import reactor
        # 建立socket物件,如果下載完成,自動從事件迴圈中移除
        from twisted.web.client import getPage
        # defer.Deferred 特殊的socket的物件,不給任何地址發請求,不會自動移除,需要手動移除
        from twisted.internet import defer
        
        # 版本1.0
        # 1.1.利用getPage建立socket
        # 1.2.將socket新增到事件迴圈
        # 1.3.開始事件迴圈(內部發送請求,並接受響應,當所有socket請求完成,終止事件迴圈)
        
        # 版本2.0
        # 2.1.解決不能自動終止的問題
        
        # 1.1. 利用getPage建立socket
        def response(content):
        	print(content)
        
        # 1.2.將socket新增到事件迴圈
        # 這個裝飾器和yield d表示將socket已經新增到事件迴圈
        @defer.inlineCallbacks
        def task():
        	url = "http://www.baidu.com"
        	# 根據域名找到你的ip,並建立socket,返回值d和defer.Deferred類似,不會自動終止的特殊socket物件
        	d = getPage(url.encode('utf-8'))
        	# 利用socket發請求,請求完成拿到值,執行response函式
        	d.addCallback(response)
        
        	yield d
        
        def done(*args,**kwargs):
        	# 終止事件迴圈
        	reactor.stop()
        
        # 執行task函式
        d = task()
        # 監聽d是否完成,需要用列表[d,]加入
        dd = defer.DeferredList([d,])
        # 監聽d是否完成,如果完成就會呼叫addBoth的回撥函式
        # 2.1:利用回撥函式done終止事件迴圈
        dd.addBoth(done)
        
        
        # 1.3.開始事件迴圈
        reactor.run()
    • 版本3.0
        • # 事件迴圈,監聽socket變化(終止條件,所有socket都已經移除)
          from twisted.internet import reactor
          # 建立socket物件,如果下載完成,自動從事件迴圈中移除
          from twisted.web.client import getPage
          # defer.Deferred 特殊的socket的物件,不給任何地址發請求,不會自動移除,需要手動移除
          from twisted.internet import defer
          
          
          # 版本1.0
          # 1.1.利用getPage建立socket
          # 1.2.將socket新增到事件迴圈
          # 1.3.開始事件迴圈(內部發送請求,並接受響應,當所有socket請求完成,終止事件迴圈)
          
          # 版本2.0
          # 2.1.解決不能自動終止的問題
          
          # 版本3.0
          # 3.1.1.解決併發,非同步IO的問題--利用多個socket
          
          # 1.1. 利用getPage建立socket
          def response(content):
          	print(content)
          
          
          # 1.2.將socket新增到事件迴圈
          # 這個裝飾器和yield d表示將socket已經新增到事件迴圈
          @defer.inlineCallbacks
          def task():
          	url = "http://www.baidu.com"
          	# 根據域名找到你的ip,並建立socket,返回值d和defer.Deferred類似,不會自動終止的特殊socket物件
          	d = getPage(url.encode('utf-8'))
          	# 利用socket發請求,請求完成拿到值,執行response函式
          	d.addCallback(response)
          
          	yield d
          
          
          def done(*args, **kwargs):
          	# 終止事件迴圈
          	reactor.stop()
          
          
          # 執行task函式
          # d = task()
          
          # 3.1.1.同時監聽多個d,利用多個socket,解決併發的問題,非同步IO的問題,全部發出去了,等請求回來
          li = []
          for i in range(10):
          	d = task()
          	li.append(d)
          
          dd = defer.DeferredList(li)
          
          # 監聽d是否完成,需要用列表[d,]加入
          # dd = defer.DeferredList([d,])
          
          # 監聽d是否完成,如果完成就會呼叫addBoth的回撥函式
          # 2.1:利用回撥函式done終止事件迴圈
          dd.addBoth(done)
          
          # 1.3.開始事件迴圈
          reactor.run()

        版本3.1-另一種方法解決併發問題-以及多爬蟲同時爬取的併發問題
        • 有bug是因為_close只有一個,後面會進行封裝,不用多關注
        • # 事件迴圈,監聽socket變化(終止條件,所有socket都已經移除)
          from twisted.internet import reactor
          # 建立socket物件,如果下載完成,自動從事件迴圈中移除
          from twisted.web.client import getPage
          # defer.Deferred 特殊的socket的物件,不給任何地址發請求,不會自動移除,需要手動移除
          from twisted.internet import defer
          
          # 版本1.0
          # 1.1.利用getPage建立socket
          # 1.2.將socket新增到事件迴圈
          # 1.3.開始事件迴圈(內部發送請求,並接受響應,當所有socket請求完成,終止事件迴圈)
          
          # 版本2.0
          # 2.1.解決不能自動終止的問題
          
          # 版本3.1
          # 3.1.解決併發,非同步IO的問題--利用task中加入特殊socket物件
          # 3.2 加入多個爬蟲同時執行的功能--類似scrapy crawl all
          
          
          _close = None
          count = 0
          
          
          # 1.1. 利用getPage建立socket
          def response(content):
          	print(content)
          	global count
          	count += 1
          	if count == 3:
          		# 使特殊socket物件終止
          		_close.callback(None)
          
          
          # 1.2.將socket新增到事件迴圈
          # 這個裝飾器和yield d表示將socket已經新增到事件迴圈
          @defer.inlineCallbacks
          def task():
          	# 3.1:建立多個socket,因為defer.Deferred()特殊物件,不會自動停止
          	# 設定_close 全域性變數,以便能請求全部返還能夠手動終止
          	# 利用全域性變數count的計數,去控制特殊物件的終止,只有全部終止才會結束
          	global _close
          
          	# 這個相當於scrapy中的start_url
          	url = "http://www.baidu.com"
          	d1 = getPage(url.encode('utf-8'))
          	d1.addCallback(response)
          
          	url = "http://www.cnblogs.com"
          	d2 = getPage(url.encode('utf-8'))
          	d2.addCallback(response)
          
          	url = "http://www.bing.com"
          	d3 = getPage(url.encode('utf-8'))
          	d3.addCallback(response)
          
          	_close = defer.Deferred()
          
          	yield _close
          
          
          def done(*args, **kwargs):
          	# 終止事件迴圈
          	reactor.stop()
          
          
          # 3.2:同時建立多個task即可實現,scrapy爬蟲同時執行,2個爬蟲有各自的start_url是併發的
          # 執行task函式
          spider1 = task()
          spider2 = task()
          
          # 監聽d是否完成,需要用列表[d,]加入
          dd = defer.DeferredList([spider1,spider2])
          
          # 監聽d是否完成,如果完成就會呼叫addBoth的回撥函式
          # 2.1:利用回撥函式done終止事件迴圈
          dd.addBoth(done)
          
          # 1.3.開始事件迴圈
          reactor.run()
                        
  • scrapy經驗 + Twisted功能
    • Low
      • # 事件迴圈,監聽socket變化(終止條件,所有socket都已經移除)
        from twisted.internet import reactor
        # 建立socket物件,如果下載完成,自動從事件迴圈中移除
        from twisted.web.client import getPage
        # defer.Deferred 特殊的socket的物件,不給任何地址發請求,不會自動移除,需要手動移除
        from twisted.internet import defer
        import queue
        
        Q = queue.Queue()
        
        
        class Request(object):
        	# 這裡的callback = parse
        	def __init__(self, url, callback):
        		self.url = url
        		self.callback = callback
        
        
        class HttpResponse(object):
        
        	def __init__(self, content, request):
        		self.content = content
        		self.request = request
        		self.url = request.url
        		self.text = str(content, encoding='utf-8')
        
        
        class ChoutiSpider(object):
        	name = 'chouti'
        
        	def start_requests(self):
        		start_url = ['http://www.baidu.com', 'http://www.bing.com', ]
        		for url in start_url:
        			# 執行Request函式
        			yield Request(url, self.parse)
        
        	def parse(self, response):
        		# 1.crawling移除
        		# 2.獲取parse yield返回值
        		# 3.再次去佇列中獲取
        		print(response.text)  # 執行HttpResponse()中的方法
        		yield Request('http://www.cnblogs.com', callback=self.parse)
        
        
        class Engine(object):
        	def __init__(self):
        		self._close = None
        		self.spider = None
        		self.max = 5  # 最大併發數
        		self.crawling = []  # 表示正在爬取的爬蟲
        
        	def get_response_callback(self, content, request):
        		# getPage的返回值response,傳入的req ---url 和 callback
        		self.crawling.remove(request)  # 刪除已經下載完成的url callback
        		rep = HttpResponse(content, request)  # 將返回值傳遞過去
        		# 生成器或空
        		result = request.callback(rep)  # 呼叫spider中的parse方法 = parse(rep)
        		import types
        		# 判斷返回值是不是生成器
        		if isinstance(result, types.GeneratorType):
        			for req in result:
        				Q.put(req)  # 將新請求加入佇列
        
        	def _next_request(self):
        		# 判斷終止條件
        		if Q.qsize() == 0 and len(self.crawling) == 0:
        			self._close.callback(None)  # 手動停止
        			return
        
        		# 傳送過程中,會有最大併發數的限制,迴圈取url並下載
        		if len(self.crawling) >= self.max:  # 超過最大併發數,直接返回
        			return
        		while len(self.crawling) < self.max:  # 低於最大併發數
        			try:
        				req = Q.get(block=False)  # 取資料,如果為空會報錯,加入block不會等佇列中的資料
        				self.crawling.append(req)  # 將取到的url加入記錄正在爬取數量的列表crawling
        				d = getPage(req.url.encode('utf-8'))  # getPage建立socket物件,傳送請求進行下載
        
        				# 5.等頁面下載完成執行使用者自己定義的回撥函式,處理response
        				d.addCallback(self.get_response_callback, req)  # d為請求的結果
        				# 未達到最大併發數,可以再去排程器中獲取Request
        				# d.addCallback(self._next_request)  # 上一個方法執行玩,進行遞迴呼叫,繼續取url
        				d.addCallback(lambda _: reactor.callLater(0, self._next_request))  # 多久後呼叫
        			except Exception as e:  # 如果佇列為空,直接返回,不再迴圈取
        				return
        
        	# 這個裝飾器和yield self._close表示將socket已經新增到事件迴圈
        	@defer.inlineCallbacks
        	def crawl(self, spider):
        		# 3.將初始Request物件新增到排程器---將初始urL加入佇列
        		start_requests = iter(spider.start_requests())  # 迭代器---執行spider中start_request函式
        		while True:
        			try:
        				request = next(start_requests)  # 取迭代器中的下一個值 url和callback
        				Q.put(request)  # 將取到的值放入佇列
        			except StopIteration as e:  # 如果佇列取完,就跳出迴圈
        				break
        
        		# 4.反覆去排程器中取request併發送請求進行下載,下載完成後執行回撥函式
        		# self._next_request()
        		reactor.callLater(0, self._next_request)  # scrapy內部的寫法
        
        		self._close = defer.Deferred()  # 特殊socket不會自動結束,只能手動結束
        		yield self._close
        
        
        # 爬蟲物件
        spider = ChoutiSpider()
        
        _active = set()
        # 1.建立引擎
        engine = Engine()
        # 2.將爬蟲放入引擎進行處理,執行引擎中crawl函式
        d = engine.crawl(spider)
        
        _active.add(d)
        # 監聽爬蟲d是否完成,如果完成執行addBoth終止socket
        dd = defer.DeferredList(_active)
        # 終止socket
        dd.addBoth(lambda _: reactor.stop())
        
        reactor.run()
          
    • High
  • Scrapy原始碼剖析