1. 程式人生 > 實用技巧 >最神奇的分散式併發爬蟲方式,程式碼簡潔到極致。

最神奇的分散式併發爬蟲方式,程式碼簡潔到極致。

演示最神奇的分散式併發爬蟲方式,這是個單檔案,沒有依賴任何其他模組,只依賴了2個pip公有包,所以下面的程式碼可以直接複製執行。

這個檔案可以精確控制0.01到100之間的任意頻率。注意是100頻率,說的不是100併發,而是每秒真的下載完成100個網頁,100請求併發和100qps不一樣,前者控制不精確,後者能使任意響應時長的網站或介面,都能控制在1秒下載100個網頁。

除非是網頁介面的響應時間永遠固定為1秒。100請求併發才剛好等於每秒鐘下載完成100個網頁。

否則如果介面消耗時間如果是0.1秒到20秒之間任意不穩定的時間徘徊,那麼100qps的用法遠強於100請求併發。

 1 """
 2 沒有手寫如何併發,單檔案就能實現分散式的任意精確頻率併發的爬蟲,祕訣在於@task_deco裝飾器。
3 """ 4 5 import time 6 import re 7 from function_scheduling_distributed_framework import task_deco, BrokerEnum,LogManager # pip install function_scheduling_distributed_framework 8 from proxypool_framework.contrib.proxy_client import ProxyClient # pip isntall proxypool_framework 9 from parsel import
Selector # pip install parsel 10 11 12 logger = LogManager('aqdys_spider').get_logger_and_add_handlers() 13 14 15 @task_deco('aqdys_list_page', broker_kind=BrokerEnum.PERSISTQUEUE, qps=5, concurrent_num=300) 16 def crawl_list_page(vidio_type, page): 17 p = '' if page == 1 else page 18 url = f'
http://aqdyec.com/{vidio_type}/index{p}.html' 19 resp_text = ProxyClient(request_retry_times=6, is_use_proxy=False).request('get', url, timeout=30).content.decode('gbk') 20 sel = Selector(resp_text) 21 for li in sel.xpath('//*[@id="contents"]/li'): 22 try: 23 item = dict() 24 item['name'] = li.xpath('./h5/a/text()').extract_first() 25 # print(item['name']) 26 item['_id'] = re.search(r'(.*?\d+)', item['name']).group(1) 27 item['players'] = li.xpath('./p[1]/text()').extract()[-1] 28 item['picture'] = li.xpath('./a/img/@src').extract_first() 29 item['detail_url'] = 'http://aqdyec.com' + li.xpath('./h5/a/@href').extract_first() 30 # logger.debug(item) 31 crawl_detail_page.push(item) 32 print(item) 33 except Exception as e: 34 logger.error(e) 35 if page == 1: # 翻頁。 36 last_p = re.search(r'<a href="/.*?/index(\d+).html">最後一頁</a>', resp_text).group(1) 37 for px in range(2, int(last_p) + 1): 38 crawl_list_page.push(vidio_type, px) 39 40 41 @task_deco('aqdys_detail_page', broker_kind=BrokerEnum.PERSISTQUEUE, qps=10, concurrent_num=300) 42 def crawl_detail_page(item): 43 resp_text = ProxyClient(is_use_proxy=False).request('get', url=item['detail_url'], timeout=30).content.decode('gbk') 44 item['add_time'] = re.search('<span id="addtime">(.*?)</span>', resp_text).group(1) 45 item['crawel_time'] = time.strftime('%y-%m-%d %H:%M:%S') 46 item['player_addr'] = 'http://aqdyec.com' + re.search('''title='第1集' href='(.*?)' target="_blank">''', resp_text).group(1) 47 logger.info(f'用print模擬儲存到資料庫 {item}') 48 49 50 if __name__ == '__main__': 51 pass 52 # crawl_list_page('shebao', 1) 53 crawl_list_page.clear() # 清空佇列 54 crawl_list_page.push('shebao', 1) # 社保 任務推送,推送首頁 55 crawl_list_page.push('lusi', 1) # 擼絲 56 crawl_list_page.push('lunli', 1) # 倫理 57 crawl_list_page.consume() # 啟動其列表頁消費 58 crawl_detail_page.consume() # 啟動先i起來也消費