1. 程式人生 > 實用技巧 >5分鐘完全掌握Python協程

5分鐘完全掌握Python協程

本文的文字及圖片來源於網路,僅供學習、交流使用,不具有任何商業用途,如有問題請及時聯絡我們以作處理

1. 協程相關的概念

1.1 程序和執行緒

程序(Process)是應用程式啟動的例項,擁有程式碼、資料和檔案和獨立的記憶體空間,是作業系統最小資源管理單元。每個程序下面有一個或者多個執行緒(Thread),來負責執行程式的計算,是最小的執行單元。

重點是:作業系統會負責程序的資源的分配;控制權主要在作業系統。另一方面,執行緒做為任務的執行單元,有新建、可執行runnable(呼叫start方法,進入排程池,等待獲取cpu使用權)、執行running(得到cpu使用權開始執行程式) 阻塞blocked(放棄了cpu 使用權,再次等待) 死亡dead5中不同的狀態。執行緒的轉態也是由作業系統進行控制。執行緒如果存在資源共享的情況下,就需要加鎖,比如生產者和消費者模式,生產者生產資料多共享佇列,消費者從共享佇列中消費資料。

執行緒和程序在得到和放棄cpu使用權時,cpu使用權的切換都需損耗效能,因為某個執行緒為了能夠在再次獲得cpu使用權時能繼續執行任務,必須記住上一次執行的所有狀態。另外執行緒還有鎖的問題。

1.2 並行和併發

並行和併發,聽起來都像是同時執行不同的任務。但是這個同時的含義是不一樣的。

  • 並行:多核CPU才有可能真正的同時執行,就是獨立的資源來完成不同的任務,沒有先後順序。
  • 併發(concurrent):是看上去的同時執行,實際微觀層面是順序執行,是作業系統對程序的排程以及cpu的快速上下文切換,每個程序執行一會然後停下來,cpu資源切換到另一個程序,只是切換的時間很短,看起來是多個任務同時在執行。要實現大併發,需要把任務切成小的任務。

上面說的多核cpu可能同時執行,這裡的可能是和作業系統排程有關,如果作業系統排程到同一個cpu,那就需要cpu進行上下文切換。當然多核情況下,作業系統排程會盡可能考慮不同cpu。

這裡的上下文切換可以理解為需要保留不同執行任務的狀態和資料。所有的併發處理都有排隊等候,喚醒,執行至少三個這樣的步驟

1.3 協程

我們知道執行緒的提出是為了能夠在多核cpu的情況下,達到並行的目的。而且執行緒的執行完全是作業系統控制的。而協程(Coroutine)是執行緒下的,控制權在於使用者,本質是為了能讓多組過程能不獨自佔用完所有資源,在一個執行緒內交叉執行,達到高併發的目的。

協程的優勢:

  • 協程最大的優勢就是協程極高的執行效率。因為子程式切換不是執行緒切換,而是由程式自身控制,因此,沒有執行緒切換的開銷,和多執行緒比,執行緒數量越多,協程的效能優勢就越明顯
  • 第二大優勢就是不需要多執行緒的鎖機制,因為只有一個執行緒,也不存在同時寫變數衝突,在協程中控制共享資源不加鎖,只需要判斷狀態就好了,所以執行效率比多執行緒高很多。

協程和執行緒區別:

  • 協程都沒參與多核CPU並行處理,協程是不併行
  • 執行緒在多核處理器上是並行在單核處理器是受作業系統排程的
  • 協程需要保留上一次呼叫的狀態
  • 執行緒的狀態有作業系統來控制

我們姑且也過一遍這些文字上的概念,show your code的時候再聯絡起來,就會更清晰的。

2. python中的執行緒

python中的執行緒由於歷史原因,即使在多核cpu的情況下並不能達真正的並行。這個原因就是全域性直譯器鎖GIL(global interpreter lock),準確的說GIL不是python的特性,而是cpython引入的一個概念。cpython直譯器在解析多執行緒時,會上GIL鎖,保證同一時刻只有一個執行緒獲取CPU使用權。

  • 為什麼需要GIL python中一切都是物件,Cpython中物件的回收,是通過物件的引用計數來判斷,當物件的引用計數為0時,就會進行垃圾回收,自動釋放記憶體。但是如果多執行緒的情況,引用計數就變成了一個共享的變數 Cpython是當下最流行的Python的直譯器,使用引用計數來管理記憶體,在Python中,一切都是物件,引用計數就是指向物件的指標數,當這個數字變成0,則會進行垃圾回收,自動釋放記憶體。但是問題是Cpython是執行緒不安全的。

考慮下如果有兩個執行緒A和B同時引用一個物件obj,這個時候obj的引用計數為2;A打算撤銷對obj的引用,完成第一步時引用計數減去1時,這時發生了執行緒切換,A掛起等待,還沒執行銷燬物件操作。B進入執行狀態,這個時候B也對obj撤銷引用,並完成引用計數減1,銷燬物件,這個時候obj的引用數為0,釋放記憶體。如果此時A重新喚醒,要繼續銷燬物件,可是這個時候已經沒有物件了。所以為了保證不出現資料汙染,才引入GIL。

每個執行緒使用前都會去獲取GIL許可權,使用完釋放GIL許可權。釋放執行緒的時機由python的另一個機制check_interval來決定。

在多核cpu時,因為需要獲取和釋放GIL鎖,會存在效能上額外的損耗。特別是由於排程控制的原因,比如一個執行緒釋放了鎖,排程接著又分配cpu資源給同一個執行緒,該執行緒發起申請時,又重新獲得GIL,而其他執行緒實際上都在等待,白白浪費了申請和釋放鎖的操作耗時。

python中的執行緒比較適合I/O密集型的操作(磁碟IO或者網路IO)。

  • 執行緒的使用
importos
importtime
importsys
fromconcurrentimportfutures
defto_do(info):
foriinrange(100000000):
pass
returninfo[0]
MAX_WORKERS=10
param_list=[]
foriinrange(5):
param_list.append(('text%s'%i,'info%s'%i))
workers=min(MAX_WORKERS,len(param_list))
#with預設會等所有任務都完成才返回,所以這裡會阻塞
withfutures.ThreadPoolExecutor(workers)asexecutor:
results=executor.map(to_do,sorted(param_list))
#列印所有
forresultinresults:
print(result)
#非阻塞的方式,適合不需要返回結果的情況
workers=min(MAX_WORKERS,len(param_list))
executor=futures.ThreadPoolExecutor(workers)
results=[]
foridx,paraminenumerate(param_list):
result=executor.submit(to_do,param)
results.append(result)
print('result%s'%idx)
#手動等待所有任務完成
executor.shutdown()
print('='*10)
forresultinresults:
print(result.result())

3. python中的程序

python提供的multiprocessing包來規避GIL的缺點,實現在多核cpu上並行的目的。multiprocessing還提供程序之間資料和記憶體共享的機制。這裡介紹的concurrent.futures的實現。用法和執行緒基本一樣,ThreadPoolExecutor改成ProcessPoolExecutor

importos
importtime
importsys
fromconcurrentimportfutures
defto_do(info):
foriinrange(10000000):
pass
returninfo[0]
start_time=time.time()
MAX_WORKERS=10
param_list=[]
foriinrange(5):
param_list.append(('text%s'%i,'info%s'%i))
workers=min(MAX_WORKERS,len(param_list))
#with預設會等所有任務都完成才返回,所以這裡會阻塞
withfutures.ProcessPoolExecutor(workers)asexecutor:
results=executor.map(to_do,sorted(param_list))
#列印所有
forresultinresults:
print(result)
print(time.time()-start_time)
#耗時0.3704512119293213s,而執行緒版本需要14.935384511947632s

4. python中的協程

4.1 簡單協程

我們先來看下python是怎麼實現協程的。答案是yield。以下例子的功能是實現計算移動平均數

fromcollectionsimportnamedtuple
Result=namedtuple('Result','countaverage')
#協程函式
defaverager():
total=0.0
count=0
average=None
whileTrue:
term=yieldNone#暫停,等待主程式傳入資料喚醒
iftermisNone:
break#決定是否退出
total+=term
count+=1
average=total/count#累計狀態,包括上一次的狀態
returnResult(count,average)
#協程的觸發
coro_avg=averager()
#預啟用協程
next(coro_avg)
#呼叫者給協程提供資料
coro_avg.send(10)
coro_avg.send(30)
coro_avg.send(6.5)
try:
coro_avg.send(None)
exceptStopIterationasexc:#執行完成,會丟擲StopIteration異常,返回值包含在異常的屬性value裡
result=exc.value
print(result)

yield關鍵字有兩個含義:產出和讓步; 把yield的右邊的值產出給呼叫方,同時做出讓步,暫停執行,讓程式繼續執行。

上面的例子可知

  • 協程用yield來控制流程,接收和產出資料
  • next():預啟用協程
  • send:協程從呼叫方接收資料
  • StopIteration:控制協程結束, 同時獲取返回值

我們來回顧下1.3中協程的概念:本質是為了能讓多組過程能不獨自佔用完所有資源,在一個執行緒內交叉執行,達到高併發的目的。。上面的例子怎麼解釋呢?

  • 可以把一個協程單次一個任務,即移動平均
  • 每個任務可以拆分成小步驟(也可以說是子程式), 即每次算一個數的平均
  • 如果多個任務需要執行呢?怎麼呼叫控制器在呼叫方
  • 如果有10個,可以想象,呼叫在控制的時候隨機的給每個任務send的一個數據化,就會是多個任務在交叉執行,達到併發的目的。

4.2 asyncio協程應用包

asyncio即非同步I/O, 如在高併發(如百萬併發)網路請求。非同步I/O即你發起一個I/O操作不必等待執行結束,可以做其他事情。asyncio底層是協程的方式來實現的。我們先來看一個例子,瞭解下asyncio的五臟六腑。

importtime
importasyncio
now=lambda:time.time()
#async定義協程
asyncdefdo_some_work(x):
print("waiting:",x)
#await掛起阻塞,相當於yield,通常是耗時操作
awaitasyncio.sleep(x)
return"Doneafter{}s".format(x)
#回撥函式,和yield產出類似功能
defcallback(future):
print("callback:",future.result())
start=now()
tasks=[]
foriinrange(1,4):
#定義多個協程,同時預啟用
coroutine=do_some_work(i)
task=asyncio.ensure_future(coroutine)
task.add_done_callback(callback)
tasks.append(task)
#定一個迴圈事件列表,把任務協程放在裡面,
loop=asyncio.get_event_loop()
try:
#非同步執行協程,直到所有操作都完成,也可以通過asyncio.gather來收集多個任務
loop.run_until_complete(asyncio.wait(tasks))
fortaskintasks:
print("Taskret:",task.result())
exceptKeyboardInterruptase:#協程任務的狀態控制
print(asyncio.Task.all_tasks())
fortaskinasyncio.Task.all_tasks():
print(task.cancel())
loop.stop()
loop.run_forever()
finally:
loop.close()
print("Time:",now()-start)

上面涉及到的幾個概念:

  • event_loop 事件迴圈:程式開啟一個無限迴圈,把一些函式註冊到事件迴圈上,當滿足事件發生的時候,呼叫相應的協程函式
  • coroutine 協程:協程物件,指一個使用async關鍵字定義的函式,它的呼叫不會立即執行函式,而是會返回一個協程物件。協程物件需要註冊到事件迴圈,由事件迴圈呼叫。
  • task任務:一個協程物件就是一個原生可以掛起的函式,任務則是對協程進一步封裝,其中包含了任務的各種狀態
  • future: 代表將來執行或沒有執行的任務的結果。它和task上沒有本質上的區別
  • async/await 關鍵字:python3.5用於定義協程的關鍵字,async定義一個協程,await用於掛起阻塞的非同步呼叫介面。從上面可知,asyncio通過事件的方式幫我們實現了協程呼叫方的控制權處理,包括send給協程資料等。我們只要通過async定義協程,await定義阻塞,然後封裝成future的task,放入迴圈的事件列表中,就等著返回資料。

再來看一個http下載的例子,比如你想下載5個不同的url(同樣的,你想接收外部的百萬的請求)

importtime
importasyncio
fromaiohttpimportClientSession
tasks=[]
url="https://www.baidu.com/{}"
asyncdefhello(url):
asyncwithClientSession()assession:
asyncwithsession.get(url)asresponse:
response=awaitresponse.read()
#print(response)
print('HelloWorld:%s'%time.time())
if__name__=='__main__':
loop=asyncio.get_event_loop()
foriinrange(5):
task=asyncio.ensure_future(hello(url.format(i)))
tasks.append(task)
loop.run_until_complete(asyncio.wait(tasks))

4.3 協程的應用場景

  • 支撐高併發I/O情況,如寫支撐高併發的服務端
  • 代替執行緒,提供併發效能
  • tornado和gevent都實現了類似功能, 之前文章提到Twisted也是

5. 總結

本文分享關於python協程的概念和asyncio包的初步使用情況,同時也介紹了基本的相關概念,如程序、執行緒、併發、並行等。希望對你有幫助,歡迎交流(@mintel)。簡要總結如下:

  • 併發和並行不一樣,並行是同時執行多個任務, 併發是在極短時間內處理多個任務
  • 多核cpu,程序是並行,python執行緒受制於GIL,不能並行,反而因為上下文切換更耗時,協程正好可以彌補
  • 協程也不是並行,只是任務交替執行任務,在存在阻塞I/O情況,能夠非同步執行,提高效率
  • asyncio 非同步I/O庫,可用於開發高併發應用

想要獲取更多Python學習資料可以加
QQ:2955637827私聊
或加Q群630390733
大家一起來學習討論吧!