Python爬蟲之非同步講解
1 非同步爬蟲
1.1 非同步瞭解
使用高效能爬蟲可以縮短爬取用時,提供爬取效率
目的:在爬蟲中使用非同步實現高效能的資料爬取操作
非同步爬蟲的方式有:
- 多執行緒和多程序
好處:可以為相關阻塞的操作單獨開啟執行緒或者程序,阻塞操作就可以非同步執行
壞處:無法無限制的開啟多執行緒或者多程序(如果不限制的開啟了,會嚴重消耗CPU
- 執行緒池和程序池
好處:我們可以降低系統對進場或者執行緒建立和銷燬的一個頻率,從而很好的降低系統的開銷
壞處:池中執行緒或者程序的數量是有上限的,倘若遠遠超過了上限,爬取效率就會下降
2 多執行緒
2.1 多執行緒講解
多執行緒類似於同時執行多個不同程式,多執行緒執行,使用執行緒可以把佔據長時間的程式中的任務放到後臺去處理。
每個執行緒都有他自己的一組CPU暫存器,稱為執行緒的上下文,該上下文反映了執行緒上次執行該執行緒的CPU暫存器的狀態。
指令指標和堆疊指標暫存器是執行緒上下文中兩個最重要的暫存器,執行緒總是在程序得到上下文中執行的,這些地址都用於標誌擁有執行緒的程序地址空間中的記憶體。
執行緒可以被搶佔(中斷)。
在其他執行緒正在執行時,執行緒可以暫時擱置(也稱為睡眠) -- 這就是執行緒的退讓。
執行緒可以分為:
- 核心執行緒:由作業系統核心建立和撤銷。
- 使用者執行緒:不需要核心支援而在使用者程式中實現的執行緒。
2.2 thread模組
thread
模組已被廢棄。使用者可以使用threading
模組代替。所以,在 Python3
中不能再使用thread
模組。為了相容性,Python3
將 thread
重新命名為 _thread
呼叫 _thread
模組中的start_new_thread()
函式來產生新執行緒。語法如下:
_thread.start_new_thread ( function, args[, kwargs] )
引數說明:
function
- 執行緒函式。args
- 傳遞給執行緒函式的引數,它必須是個tuple
kwargs
- 可選引數
使用例子:
import _thread
import time
# 定義一個函式
def print_time(threadName,delay):
count=0
while count<5:
time.sleep(delay)
count+=1
print ("%s: %s" % ( threadName, time.ctime(time.time()) ))
try:
_thread.start_new_thread(print_time,("test_thread_1",2))
_thread.start_new_thread(print_time,("test_thread_2",4))
except:
print("error:無法啟動執行緒")
# 讓指令碼不要停下來
while 1:
pass
2.3 threading
Python3
通過兩個標準庫 _thread
和 threading
提供對執行緒的支援
_thread
提供了低級別的、原始的執行緒以及一個簡單的鎖,它相比於 threading
模組的功能還是比較有限的。
threading
模組除了包含 _thread
模組中的所有方法外,還提供的其他方法:
threading.currentThread():
返回當前的執行緒變數。threading.enumerate():
返回一個包含正在執行的執行緒的list
。正在執行指執行緒啟動後、結束前,不包括啟動前和終止後的執行緒。threading.activeCount():
返回正在執行的執行緒數量,與len(threading.enumerate())
有相同的結果。
除了使用方法外,執行緒模組同樣提供了Thread類
來處理執行緒,Thread類
提供了以下方法:
run():
用以表示執行緒活動的方法start():
啟動執行緒活動join([time]):
等待至執行緒中止
join
:讓主執行緒
等待子執行緒
結束之後才能繼續執行,比如如下程式,看著是thread2
呼叫了join
方法,其實是當前執行緒在執行,所以當前main
執行緒要等待thread2
執行完畢後,才能執行main
執行緒
thread2 = myThread(2, "Thread-2", 2)
thread2.start()
thread2.join()
isAlive():
返回執行緒是否活動的getName():
返回執行緒名setName():
設定執行緒名
使用例子:
import threading
import time
exitFlag = 0
class myThread (threading.Thread):
def __init__(self, threadID, name, counter):
threading.Thread.__init__(self)
self.threadID = threadID
self.name = name
self.counter = counter
def run(self):
print ("開始執行緒:" + self.name)
print_time(self.name, self.counter, 5)
print ("退出執行緒:" + self.name)
def print_time(threadName, delay, counter):
while counter:
if exitFlag:
threadName.exit()
time.sleep(delay)
print ("%s: %s" % (threadName, time.ctime(time.time())))
counter -= 1
# 建立新執行緒
thread1 = myThread(1, "Thread-1", 1)
thread2 = myThread(2, "Thread-2", 2)
# 開啟新執行緒
thread1.start()
thread2.start()
print("=========================")
thread1.join()
thread2.join()
print ("退出主執行緒")
3 執行緒池
3.1 單執行緒序列
單執行緒序列就是阻塞連續執行命令,假如有一個耗時時間長,就會一直等待到執行完畢,如下操作大概耗時8秒
import time
def get_page(str):
print('正在下載:',str)
time.sleep(2)
print('下載成功:',str)
name_list=['xiaozi','aa','bb','cc']
start_time=time.time()
for i in range(len(name_list)):
get_page(name_list[i])
end_time=time.time()
print(f'消耗時間secode:{end_time-start_time}')
3.2 使用執行緒池
匯入執行緒池使用:from multiprocessing.dummy import Pool
如下操作,就是使用執行緒池後大概2秒
import time
# 匯入執行緒池
from multiprocessing.dummy import Pool
start_time=time.time()
def get_page(str):
print('正在下載:',str)
time.sleep(2)
print('下載成功:',str)
name_list=['xiaozi','aa','bb','cc']
# 例項化一個執行緒池
pool=Pool(4)
# 第一個引數是要阻塞的函式,第二個引數是可迭代物件
# 如果第一個引數即阻塞函式有返回值,那麼就會通過map返回回去
pool.map(get_page,name_list)
end_time=time.time()
print(f'消耗時間secode:{end_time-start_time}')
4 協程操作
最推薦的不是執行緒池,而是單執行緒和協程一起操作
4.1 協程基本概念
使用協程中的一般概念:
event_loop:
事件迴圈,相當於一個無限迴圈,我們可以把一些函式註冊到這個事件迴圈上,當滿足某些條件的時候,函式就會被迴圈執行coroutine:
協程物件,我們可以將協程物件註冊到事件迴圈中,它會被事件迴圈呼叫。可以使用async
關鍵字來定義一個方法,這個方法在呼叫時不會立即被執行,而是返回一個協程物件task:
任務,它是對協程物件的進一步封裝,包含了任務的各個狀態future:
代表將來執行或還沒有執行的任務,實際上和task
沒有本質區別async:
定義一個協程,不會立即執行
await:
用來掛起阻塞方法的執行
4.2 協程基本操作
4.2.1 協程物件
使用async
定義一個協程物件,並建立一個事件迴圈物件
import asyncio
#定義協程物件
async def get_request(url):
print("正在請求的url是:",url)
print('請求成功的url:',url)
return url
#得到協程物件
coroutine_obj=get_request('www.baidu.com')
#建立一個事件迴圈物件
loop=asyncio.get_event_loop()
#將協程物件註冊到loop中,並啟動loop
loop.run_until_complete(coroutine_obj)
loop.close()
4.2.2 task物件
task物件需要loop物件基礎上建立起來
import asyncio
#定義協程物件
async def get_request(url):
print("正在請求的url是:",url)
print('請求成功的url:',url)
return url
#得到協程物件
coroutine_obj=get_request('www.baidu.com')
#建立一個事件迴圈物件
loop=asyncio.get_event_loop()
#基於loop建立了一個task物件
task=loop.create_task(coroutine_obj)
print(task)
#基於loop註冊任務
loop.run_until_complete(task)
print(task)
loop.close()
4.2.3 future物件
future物件與task物件不同的是建立基於asyncio空間來建立的
import asyncio
#定義協程物件
async def get_request(url):
print("正在請求的url是:",url)
print('請求成功的url:',url)
return url
#得到協程物件
coroutine_obj=get_request('www.baidu.com')
#建立一個事件迴圈物件
loop=asyncio.get_event_loop()
#基於loop建立了一個task物件
future=asyncio.ensure_future(coroutine_obj)
print(future)
loop.run_until_complete(future)
print(future)
loop.close()
4.2.4 繫結回撥
在使用task
或者future
繫結回撥時,需要先定義回撥函式
4.2.4.1 定義回撥函式
回撥函式中返回的result
方法就是任務物件
中封裝的協程物件
對應的函式返回值
注意:
回撥函式必須有返回值,不然result
方法就沒有值
def callback_func(task):
print(task.result())
4.2.4.2 繫結回撥
在使用task
或者future
繫結回撥時,都可以使用方法繫結task.add_done_callback(callback_func)
import asyncio
#定義協程物件
async def get_request(url):
print("正在請求的url是:",url)
print('請求成功的url:',url)
return url
#得到協程物件
coroutine_obj=get_request('www.baidu.com')
loop=asyncio.get_event_loop()
future=asyncio.ensure_future(coroutine_obj)
#把回撥函式繫結到任務物件中
future.add_done_callback(callback_func)
loop.run_until_complete(future)
loop.close()
4.2.5 非同步多工
首先說明下async\await
的使用
正常的函式在執行時是不會中斷的,所以要寫一個能夠中斷的函式,就需要新增async
關鍵字
async
用來宣告一個函式為非同步函式
,非同步函式的特點是能在函式執行過程中掛起,去執行其他非同步函式,等到掛起條件(假設掛起條件是sleep(5))消失後,也就是5秒到了再回來執行。
await
用來用來宣告程式掛起
,比如非同步程式執行到某一步時需要等待的時間很長,就將此掛起,去執行其他的非同步程式。await
後面只能跟非同步程式或有__await__
屬性的物件,因為非同步程式與一般程式不同。假設有兩個非同步函式async a
,async b
,a
中的某一步有await
,當程式碰到關鍵字await b()
後,非同步程式掛起後去執行另一個非同步b
程式,就是從函式內部跳出去執行其他函式,當掛起條件消失後,不管b
是否執行完,要馬上從b
程式中跳出來,回到原程式執行原來的操作。
如果await
後面跟的b
函式不是非同步函式,那麼操作就只能等b執行
完再返回,無法在b
執行的過程中返回。如果要在b
執行完才返回,也就不需要用await
關鍵字了,直接呼叫b函式
就行。所以這就需要await``後面跟的是非同步函數了。 在一個非同步函式中,可以不止一次掛起,也就是可以用多個``await
另外多工時,對於run_until_complete
方法需要這樣用asyncio.wait()
方法處理:loop.run_until_complete(asyncio.wait(task_list))
程式碼示例:
import time
import asyncio
async def get_request(url):
print("正在請求的url是:",url)
#在非同步協程中如果出現了同步模組相關程式碼,那麼就無法實現非同步
# time.sleep(2)
#當在asyncio中遇到阻塞操作就必須進行手動掛起
await asyncio.sleep(2)
print('請求成功的url:',url)
start_time=time.time()
urls=['www.baidu.com','www.sogou.com','www.goubanjia.com']
#任務列表
task_list=[]
for url in urls:
coroutine_obj=get_request(url)
future=asyncio.ensure_future(coroutine_obj)
task_list.append(future)
loop=asyncio.get_event_loop()
loop.run_until_complete(asyncio.wait(task_list))
loop.close()
print(time.time()-start_time)
4.2.6 aiohttp模組
由於在使用非同步多工時,就不能用request.get()
,因為此方法是同步的,需要使用aiohttp
模組了
在使用aiohttp
模組先安裝環境:pip intall aiohttp
,使用該模組中的ClientSession
使用時需要用async
修飾為非同步,並用await
修飾耗時操作
async def get_page(url):
async with aiohttp.ClientSession() as session:
async with await session.get(url) as resp:
#此處是和同步獲取文字方法不一樣地方
#text()獲取響應資料,read()獲取二進位制響應資料,json()返回的是json物件
page_text=await resp.text()