1. 程式人生 > 其它 >Python爬蟲之非同步講解

Python爬蟲之非同步講解

目錄

1 非同步爬蟲

1.1 非同步瞭解

使用高效能爬蟲可以縮短爬取用時,提供爬取效率
目的:在爬蟲中使用非同步實現高效能的資料爬取操作
非同步爬蟲的方式有:

  • 多執行緒和多程序
    好處:可以為相關阻塞的操作單獨開啟執行緒或者程序,阻塞操作就可以非同步執行
    壞處:無法無限制的開啟多執行緒或者多程序(如果不限制的開啟了,會嚴重消耗CPU
    資源,這樣會導致響應外界效率變慢)
  • 執行緒池和程序池
    好處:我們可以降低系統對進場或者執行緒建立和銷燬的一個頻率,從而很好的降低系統的開銷
    壞處:池中執行緒或者程序的數量是有上限的,倘若遠遠超過了上限,爬取效率就會下降

2 多執行緒

2.1 多執行緒講解

多執行緒類似於同時執行多個不同程式,多執行緒執行,使用執行緒可以把佔據長時間的程式中的任務放到後臺去處理。
每個執行緒都有他自己的一組CPU暫存器,稱為執行緒的上下文,該上下文反映了執行緒上次執行該執行緒的CPU暫存器的狀態。
指令指標和堆疊指標暫存器是執行緒上下文中兩個最重要的暫存器,執行緒總是在程序得到上下文中執行的,這些地址都用於標誌擁有執行緒的程序地址空間中的記憶體。
執行緒可以被搶佔(中斷)。
在其他執行緒正在執行時,執行緒可以暫時擱置(也稱為睡眠) -- 這就是執行緒的退讓。
執行緒可以分為:

  • 核心執行緒:由作業系統核心建立和撤銷。
  • 使用者執行緒:不需要核心支援而在使用者程式中實現的執行緒。

2.2 thread模組

thread模組已被廢棄。使用者可以使用threading模組代替。所以,在 Python3 中不能再使用thread模組。為了相容性,Python3thread 重新命名為 _thread
呼叫 _thread 模組中的start_new_thread()函式來產生新執行緒。語法如下:
_thread.start_new_thread ( function, args[, kwargs] )
引數說明:

  • function - 執行緒函式。
  • args - 傳遞給執行緒函式的引數,它必須是個tuple
    型別
  • kwargs - 可選引數

使用例子:

import _thread
import time

# 定義一個函式
def print_time(threadName,delay):
    count=0
    while count<5:
        time.sleep(delay)
        count+=1
        print ("%s: %s" % ( threadName, time.ctime(time.time()) ))

try:
    _thread.start_new_thread(print_time,("test_thread_1",2))
    _thread.start_new_thread(print_time,("test_thread_2",4))
except:
    print("error:無法啟動執行緒")

# 讓指令碼不要停下來
while 1:
   pass

2.3 threading

Python3 通過兩個標準庫 _threadthreading 提供對執行緒的支援
_thread 提供了低級別的、原始的執行緒以及一個簡單的鎖,它相比於 threading 模組的功能還是比較有限的。
threading 模組除了包含 _thread 模組中的所有方法外,還提供的其他方法:

  • threading.currentThread(): 返回當前的執行緒變數。
  • threading.enumerate(): 返回一個包含正在執行的執行緒的list。正在執行指執行緒啟動後、結束前,不包括啟動前和終止後的執行緒。
  • threading.activeCount(): 返回正在執行的執行緒數量,與len(threading.enumerate())有相同的結果。

除了使用方法外,執行緒模組同樣提供了Thread類來處理執行緒,Thread類提供了以下方法:

  • run(): 用以表示執行緒活動的方法
  • start():啟動執行緒活動
  • join([time]): 等待至執行緒中止
    join:讓主執行緒等待子執行緒結束之後才能繼續執行,比如如下程式,看著是thread2呼叫了join方法,其實是當前執行緒在執行,所以當前main執行緒要等待thread2執行完畢後,才能執行main執行緒
thread2 = myThread(2, "Thread-2", 2)
thread2.start()
thread2.join()
  • isAlive(): 返回執行緒是否活動的
  • getName(): 返回執行緒名
  • setName(): 設定執行緒名

使用例子:

import threading
import time

exitFlag = 0

class myThread (threading.Thread):
    def __init__(self, threadID, name, counter):
        threading.Thread.__init__(self)
        self.threadID = threadID
        self.name = name
        self.counter = counter
    def run(self):
        print ("開始執行緒:" + self.name)
        print_time(self.name, self.counter, 5)
        print ("退出執行緒:" + self.name)

def print_time(threadName, delay, counter):
    while counter:
        if exitFlag:
            threadName.exit()
        time.sleep(delay)
        print ("%s: %s" % (threadName, time.ctime(time.time())))
        counter -= 1
# 建立新執行緒
thread1 = myThread(1, "Thread-1", 1)
thread2 = myThread(2, "Thread-2", 2)

# 開啟新執行緒
thread1.start()
thread2.start()
print("=========================")
thread1.join()
thread2.join()
print ("退出主執行緒")

3 執行緒池

3.1 單執行緒序列

單執行緒序列就是阻塞連續執行命令,假如有一個耗時時間長,就會一直等待到執行完畢,如下操作大概耗時8秒

import time

def get_page(str):
    print('正在下載:',str)
    time.sleep(2)
    print('下載成功:',str)

name_list=['xiaozi','aa','bb','cc']
start_time=time.time()
for i in range(len(name_list)):
    get_page(name_list[i])

end_time=time.time()

print(f'消耗時間secode:{end_time-start_time}')

3.2 使用執行緒池

匯入執行緒池使用:from multiprocessing.dummy import Pool
如下操作,就是使用執行緒池後大概2秒

import time
# 匯入執行緒池
from multiprocessing.dummy import Pool
start_time=time.time()
def get_page(str):
    print('正在下載:',str)
    time.sleep(2)
    print('下載成功:',str)

name_list=['xiaozi','aa','bb','cc']
# 例項化一個執行緒池
pool=Pool(4)
# 第一個引數是要阻塞的函式,第二個引數是可迭代物件
# 如果第一個引數即阻塞函式有返回值,那麼就會通過map返回回去
pool.map(get_page,name_list)

end_time=time.time()

print(f'消耗時間secode:{end_time-start_time}')

4 協程操作

最推薦的不是執行緒池,而是單執行緒和協程一起操作

4.1 協程基本概念

使用協程中的一般概念:

  • event_loop:事件迴圈,相當於一個無限迴圈,我們可以把一些函式註冊到這個事件迴圈上,當滿足某些條件的時候,函式就會被迴圈執行
  • coroutine:協程物件,我們可以將協程物件註冊到事件迴圈中,它會被事件迴圈呼叫。可以使用async關鍵字來定義一個方法,這個方法在呼叫時不會立即被執行,而是返回一個協程物件
  • task:任務,它是對協程物件的進一步封裝,包含了任務的各個狀態
  • future:代表將來執行或還沒有執行的任務,實際上和task沒有本質區別
  • async:定義一個協程,不會立即執行
  • await:用來掛起阻塞方法的執行

4.2 協程基本操作

4.2.1 協程物件

使用async定義一個協程物件,並建立一個事件迴圈物件

import asyncio
#定義協程物件
async def get_request(url):
    print("正在請求的url是:",url)
    print('請求成功的url:',url)
    return url
#得到協程物件
coroutine_obj=get_request('www.baidu.com')
#建立一個事件迴圈物件
loop=asyncio.get_event_loop()
#將協程物件註冊到loop中,並啟動loop
loop.run_until_complete(coroutine_obj)
loop.close()

4.2.2 task物件

task物件需要loop物件基礎上建立起來

import asyncio
#定義協程物件
async def get_request(url):
    print("正在請求的url是:",url)
    print('請求成功的url:',url)
    return url
#得到協程物件
coroutine_obj=get_request('www.baidu.com')

#建立一個事件迴圈物件
loop=asyncio.get_event_loop()
#基於loop建立了一個task物件
task=loop.create_task(coroutine_obj)
print(task)
#基於loop註冊任務
loop.run_until_complete(task)
print(task)
loop.close()

4.2.3 future物件

future物件與task物件不同的是建立基於asyncio空間來建立的

import asyncio
#定義協程物件
async def get_request(url):
    print("正在請求的url是:",url)
    print('請求成功的url:',url)
    return url
#得到協程物件
coroutine_obj=get_request('www.baidu.com')

#建立一個事件迴圈物件
loop=asyncio.get_event_loop()
#基於loop建立了一個task物件
future=asyncio.ensure_future(coroutine_obj)
print(future)
loop.run_until_complete(future)
print(future)
loop.close()

4.2.4 繫結回撥

在使用task或者future繫結回撥時,需要先定義回撥函式

4.2.4.1 定義回撥函式

回撥函式中返回的result方法就是任務物件中封裝的協程物件對應的函式返回值
注意:回撥函式必須有返回值,不然result方法就沒有值

def callback_func(task):
    print(task.result())

4.2.4.2 繫結回撥

在使用task或者future繫結回撥時,都可以使用方法繫結task.add_done_callback(callback_func)

import asyncio
#定義協程物件
async def get_request(url):
    print("正在請求的url是:",url)
    print('請求成功的url:',url)
    return url
#得到協程物件
coroutine_obj=get_request('www.baidu.com')
loop=asyncio.get_event_loop()
future=asyncio.ensure_future(coroutine_obj)
#把回撥函式繫結到任務物件中
future.add_done_callback(callback_func)
loop.run_until_complete(future)
loop.close()

4.2.5 非同步多工

首先說明下async\await的使用
正常的函式在執行時是不會中斷的,所以要寫一個能夠中斷的函式,就需要新增async關鍵字
async用來宣告一個函式為非同步函式,非同步函式的特點是能在函式執行過程中掛起,去執行其他非同步函式,等到掛起條件(假設掛起條件是sleep(5))消失後,也就是5秒到了再回來執行。
await用來用來宣告程式掛起,比如非同步程式執行到某一步時需要等待的時間很長,就將此掛起,去執行其他的非同步程式。await後面只能跟非同步程式或有__await__屬性的物件,因為非同步程式與一般程式不同。假設有兩個非同步函式async aasync ba中的某一步有await,當程式碰到關鍵字await b()後,非同步程式掛起後去執行另一個非同步b程式,就是從函式內部跳出去執行其他函式,當掛起條件消失後,不管b是否執行完,要馬上從b程式中跳出來,回到原程式執行原來的操作。
如果await後面跟的b函式不是非同步函式,那麼操作就只能等b執行完再返回,無法在b執行的過程中返回。如果要在b執行完才返回,也就不需要用await關鍵字了,直接呼叫b函式就行。所以這就需要await``後面跟的是非同步函數了。 在一個非同步函式中,可以不止一次掛起,也就是可以用多個``await

另外多工時,對於run_until_complete方法需要這樣用asyncio.wait()方法處理:loop.run_until_complete(asyncio.wait(task_list))
程式碼示例:

import time
import asyncio
async def get_request(url):
    print("正在請求的url是:",url)
    #在非同步協程中如果出現了同步模組相關程式碼,那麼就無法實現非同步
    # time.sleep(2)
    #當在asyncio中遇到阻塞操作就必須進行手動掛起
    await asyncio.sleep(2)
    print('請求成功的url:',url)    
start_time=time.time()
urls=['www.baidu.com','www.sogou.com','www.goubanjia.com']

#任務列表
task_list=[]
for url in urls:
    coroutine_obj=get_request(url)
    future=asyncio.ensure_future(coroutine_obj)
    task_list.append(future)
loop=asyncio.get_event_loop()
loop.run_until_complete(asyncio.wait(task_list))
loop.close()
print(time.time()-start_time)

4.2.6 aiohttp模組

由於在使用非同步多工時,就不能用request.get(),因為此方法是同步的,需要使用aiohttp模組了
在使用aiohttp模組先安裝環境:pip intall aiohttp,使用該模組中的ClientSession
使用時需要用async修飾為非同步,並用await修飾耗時操作

async def get_page(url):
	async with aiohttp.ClientSession() as session:
	async with await session.get(url) as resp:
	#此處是和同步獲取文字方法不一樣地方
	#text()獲取響應資料,read()獲取二進位制響應資料,json()返回的是json物件
	page_text=await resp.text()