1. 程式人生 > 其它 >第八章 高效能非同步爬蟲

第八章 高效能非同步爬蟲

高效能非同步爬蟲
目的:在爬蟲中使用非同步實現高效能的資料爬取操作。

同步爬蟲:(阻塞)
import requests
headers = {
    'User-Agent': 'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_12_0) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/72.0.3626.121 Safari/537.36'
}
urls = [
    'http://xmdx.sc.chinaz.net/Files/DownLoad/jianli/201904/jianli10231.rar',
    'http://zjlt.sc.chinaz.net/Files/DownLoad/jianli/201904/jianli10229.rar
', 'http://xmdx.sc.chinaz.net/Files/DownLoad/jianli/201904/jianli10231.rar' ] def get_content(url): print('正在爬取:',url) #get方法是一個阻塞的方法 response = requests.get(url=url,headers=headers) if response.status_code == 200 : return response.content def parse_content(content): print('響應資料的長度為:
',len(content)) for url in urls: content = get_content(url) parse_content(content)

非同步爬蟲的方式:
- 1.多執行緒,多程序(不建議):
好處:可以為相關阻塞的操作單獨開啟執行緒或者程序,阻塞操作就可以非同步執行。
弊端:無法無限制的開啟多執行緒或者多程序。(無法對大量的url開啟程序或執行緒 會佔用大量cpu資源 影響其他使用)
- 2.執行緒池、程序池(適當的使用):
好處:我們可以降低系統對程序或者執行緒建立和銷燬的一個頻率,從而很好的降低系統的開銷。
弊端:池中執行緒或程序的數量是有上限。
# import time
# #使用單執行緒序列方式執行
#
# def get_page(str):
#     print("正在下載 :",str)
#     time.sleep(2)
#     print('下載成功:',str)
#
# name_list =['xiaozi','aa','bb','cc']
#
# start_time = time.time()
#
# for i in range(len(name_list)):
#     get_page(name_list[i])
#
# end_time = time.time()
# print('%d second'% (end_time-start_time))


import time
#匯入執行緒池模組對應的類
from multiprocessing.dummy import Pool


#使用執行緒池方式執行(開始計時)
start_time = time.time()
def get_page(str): # 模擬網路請求
    print("正在下載 :",str)
    time.sleep(2)
    print('下載成功:',str)

name_list =['xiaozi','aa','bb','cc'] # 模擬四個url

#例項化一個執行緒池物件
pool = Pool(4)
#將列表中每一個列表元素傳遞給get_page進行處理。
# 將會發生阻塞的函式傳遞到第一個引數中
pool.map(get_page,name_list)
pool.close()
pool.join()
end_time = time.time()
print(end_time-start_time)
# 非同步爬蟲之執行緒池案例應用
import
requests from lxml import etree import re from multiprocessing.dummy import Pool #需求:爬取梨視訊的視訊資料 headers = { 'User-Agent':'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_12_0) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/71.0.3578.98 Safari/537.36' } #原則:執行緒池處理的是阻塞且較為耗時的操作(不是處理所有的操作) #對下述url發起請求解析出視訊詳情頁的url和視訊的名稱 url = 'https://www.pearvideo.com/category_5' page_text = requests.get(url=url,headers=headers).text # 資料解析 解析詳情頁url和視訊名稱 tree = etree.HTML(page_text) li_list = tree.xpath('//ul[@id="listvideoListUl"]/li') urls = [] #儲存所有視訊的連結and名字 # 在li標籤中進行區域性資料解析 for li in li_list: detail_url = 'https://www.pearvideo.com/'+li.xpath('./div/a/@href')[0] # 詳情頁url name = li.xpath('./div/a/div[2]/text()')[0]+'.mp4' # 視訊名稱 #對詳情頁的url發起請求(視訊儲存在詳情頁中的) detail_page_text = requests.get(url=detail_url,headers=headers).text #從詳情頁中解析出視訊的地址(url)通過正則進行解析 因為是ajax請求的 ex = 'srcUrl="(.*?)",vdoUrl' video_url = re.findall(ex,detail_page_text)[0] # 視訊連結 dic = { 'name':name, 'url':video_url } urls.append(dic) #對視訊連結發起請求獲取視訊的二進位制資料,然後將視訊資料進行返回 def get_video_data(dic): url = dic['url'] print(dic['name'],'正在下載......') data = requests.get(url=url,headers=headers).content #持久化儲存操作 with open(dic['name'],'wb') as fp: fp.write(data) print(dic['name'],'下載成功!') #使用執行緒池對視訊資料進行請求(較為耗時的阻塞操作) pool = Pool(4) pool.map(get_video_data,urls) # 將urls傳入到get_video_data函式 pool.close() pool.join()

- 3.單執行緒+非同步協程(推薦):
event_loop:事件迴圈,相當於一個無限迴圈,我們可以把一些函式註冊到這個事件迴圈上,
當滿足某些條件的時候,函式就會被迴圈執行。

coroutine:協程物件,我們可以將協程物件註冊到事件迴圈中,它會被事件迴圈呼叫。
我們可以使用 async 關鍵字來定義一個方法,這個方法在呼叫時不會立即被執行,而是返回
一個協程物件。

task:任務,它是對協程物件的進一步封裝,包含了任務的各個狀態。

future:代表將來執行或還沒有執行的任務,實際上和 task 沒有本質區別。

async 定義一個協程.

await 用來掛起阻塞方法的執行。
單任務協程相關操作:
import asyncio

async def request(url):
    print('正在請求的url是',url)
    print('請求成功,',url)
    return url
#async修飾的函式,呼叫之後返回的一個協程物件c
c = request('www.baidu.com')

# 一 event_loop使用:事件迴圈,相當於一個無限迴圈,我們可以把一些函式註冊到這個事件迴圈上,
# 當滿足某些條件的時候,函式就會被迴圈執行。
# #建立一個事件迴圈物件
# loop = asyncio.get_event_loop()
#
# #將協程物件註冊到loop中,然後啟動loop
# loop.run_until_complete(c)

# 二 task的使用
# 建立一個事件迴圈物件
# loop = asyncio.get_event_loop()
# #基於loop建立了一個task物件
# task = loop.create_task(c)
# print(task)# 此處的任務物件還沒被執行
#
# loop.run_until_complete(task) # 開啟任務
#
# print(task) # 此處的任務物件已經被執行

# 三 future的使用
# 建立一個事件迴圈物件
# loop = asyncio.get_event_loop()
# task = asyncio.ensure_future(c)
# print(task) # 此處的任務物件還沒被執行
# loop.run_until_complete(task)
# print(task)# 此處的任務物件已經被執行

def callback_func(task): # 回撥函式
    #result返回的就是任務物件中封裝的協程物件對應函式的返回值
    print(task.result()) # 接收result物件的返回值url

#繫結回撥
loop = asyncio.get_event_loop()
task = asyncio.ensure_future(c)
#將回調函式繫結到任務物件中 當任務函式執行後會執行回撥函式
task.add_done_callback(callback_func)
loop.run_until_complete(task) # 任務函式

# 多工非同步協程

import
asyncio import time async def request(url): print('正在下載',url) #在非同步協程中如果出現了同步模組相關的程式碼,那麼就無法實現非同步,變成了同步。 # time.sleep(2) #基於非同步模組 當在asyncio中遇到阻塞操作必須進行手動掛起 await asyncio.sleep(2) print('下載完畢',url) start = time.time() # 多個協程物件 urls = [ 'www.baidu.com', 'www.sogou.com', 'www.goubanjia.com' ] #任務列表:存放多個任務物件 stasks = [] for url in urls: c = request(url) # 協程物件 task = asyncio.ensure_future(c) # 任務物件 stasks.append(task) # 任務列表存放多個任務物件 # 註冊到多工迴圈物件 loop = asyncio.get_event_loop() #需要將任務列表封裝到wait中 固定的語法格式 loop.run_until_complete(asyncio.wait(stasks)) print(time.time()-start) # 多工執行耗時

# aiohttp模組 基於網路非同步請求模組 requests是基於同步的

import
requests import asyncio import time start = time.time() urls = [ 'http://127.0.0.1:5000/bobo','http://127.0.0.1:5000/jay','http://127.0.0.1:5000/tom' ] async def get_page(url): print('正在下載',url) #requests.get是基於同步,必須使用基於非同步的網路請求模組進行指定url的請求傳送
   response = requests.get(url=url) # 這段程式碼是基於同步模組的程式碼 所以此程式沒有實現非同步操作

#aiohttp:基於非同步網路請求的模組 完成多工非同步操作
    
    
    print('下載完畢:',response.text)

tasks = []

for url in urls:
    c = get_page(url)
    task = asyncio.ensure_future(c)
    tasks.append(task)

loop = asyncio.get_event_loop()
loop.run_until_complete(asyncio.wait(tasks))

end = time.time()

print('總耗時:',end-start) # 總耗時6秒
#環境安裝:pip install aiohttp
#使用該模組中的ClientSession
import requests
import asyncio
import time
import aiohttp

start = time.time()
# urls = [
#     'http://127.0.0.1:5000/bobo','http://127.0.0.1:5000/jay','http://127.0.0.1:5000/tom',
#     'http://127.0.0.1:5000/bobo', 'http://127.0.0.1:5000/jay', 'http://127.0.0.1:5000/tom',
#     'http://127.0.0.1:5000/bobo', 'http://127.0.0.1:5000/jay', 'http://127.0.0.1:5000/tom',
#     'http://127.0.0.1:5000/bobo', 'http://127.0.0.1:5000/jay', 'http://127.0.0.1:5000/tom',
#
# ]
from multiprocessing.dummy import Pool
pool = Pool(2)

urls = []
for i in range(10):
    urls.append('http://127.0.0.1:5000/bobo')
print(urls)
async def get_page(url):
    async with aiohttp.ClientSession() as session: # 返回session物件
        #get()、post(): 可以使用post get請求
        #新增引數  :ua偽裝和請求引數:headers,params/data,proxy='http://ip:port'
        async with await session.get(url) as response: # 使用session物件傳送請求 返回一個響應物件response
            #text()返回字串形式的響應資料
            #read()返回的二進位制形式的響應資料
            #json()返回的就是json物件
            #注意:獲取響應資料操作之前一定要使用await進行手動掛起
            page_text = await response.text()
            print(page_text)

tasks = []

for url in urls:
    c = get_page(url)
    task = asyncio.ensure_future(c)
    tasks.append(task)

loop = asyncio.get_event_loop()

loop.run_until_complete(asyncio.wait(tasks))

end = time.time()

print('總耗時:',end-start) # 總耗時2秒