1. 程式人生 > 實用技巧 >.NET Core TPL 資料流 BatchBlock管道封裝

.NET Core TPL 資料流 BatchBlock管道封裝

Python執行緒池與程式池

前言

  前面我們已經將執行緒併發程式設計與程式並行程式設計全部摸了個透,其實我第一次學習他們的時候感覺非常困難甚至是吃力。因為概念實在是太多了,各種鎖,資料共享同步,各種方法等等讓人十分頭痛。所以這邊要告訴你一個好訊息,前面的所有學習的知識點其實都是為本章知識點做鋪墊,在學習了本章節的內容後關於如何使用多執行緒併發與多程式並行就採取本章節中介紹的方式即可。

  這裡要介紹一點與之前內容不同的地方,即如果使用佇列進行由程式池建立的程式之間資料共享的話不管是multiprocessing模組下的Queue還是queue模組下的Queue都不能為程式池中所建立的程式進行資料共享,我們需要用到另一個佇列即multiprocessing.Manager()

中的Queue當然這個我也會在下面介紹到。那麼開始學習吧!

  官方檔案

執行器

  最早期的Python2中是沒有執行緒池這一概念的,只有程式池。直到Python3的出現才引入了執行緒池,其實關於他們的使用都是非常簡單,而且介面也是高度統一甚至說一模一樣的。而執行緒池與程式池的作用即是為了讓我們能夠更加便捷的管理執行緒或程式。

我們先說一下,如果需要使用執行緒池或程式池,需要匯入模組concurrent.futures

  from concurrent.futures import ThreadPoolExecutor # 執行緒池執行器

  from concurrent.futures import ProcessPoolExecutor

# 程式池執行器

  這裡介紹一下,關於執行緒池或者程式池創建出的執行緒與程式與我們使用multiprocessing模組或者threading模組中建立的執行緒或程式有什麼區別。我們以多執行緒為例:

import threading

def task():
ident = threading.get_ident()
print(ident)
# 銷燬當前執行任務的執行緒 if __name__ == '__main__': for i in range(10):
t1 = threading.Thread(target=task,) # 領任務
t1.start() # 等待CPU排程,而不是立即執行 # 執行 # ==== 執行結果 ==== Ps:可以看到每個執行緒的id號都不一樣,這也印證了圖上說的。 """
10392
12068
5708
13864
2604
7196
7324
9728
9664
472
"""
import threading
from concurrent.futures import ThreadPoolExecutor # 執行緒池執行器 def task():
ident = threading.get_ident()
print(ident)
# 結束任務,不銷燬當前執行任務的執行緒,直到所有任務都執行完畢。 if __name__ == '__main__':
pool = ThreadPoolExecutor(max_workers=2) # 這裡代表有2個執行緒可以領取任務
for i in range(10):
pool.submit(task) # 執行器啟動任務,將這些任務給2個人分配,也就是說task這個任務會被這2個執行緒不斷的執行,直到執行完畢後這2個執行緒才會死亡 # ==== 執行結果 ==== Ps:可以看到這裡都讓這2個執行緒把任務接了,記憶體開銷相比於上面的要小。 """
7272
7272
7272
7272
11596
7272
11596
11596
11596
11596
"""

方法大全


執行器方法大全
submit(fn, *args, **kwargs) 排程可呼叫物件 fn,以 fn(*args **kwargs) 方式執行並返回 Future 對像代表可呼叫物件的執行。(非同步提交!極為牛逼!)
map(func, *iterables, timeout=None, chunksize=1) 類似於 map(func, *iterables)
shutdown(wait=True) 等待,類似join()方法,並且在所有的任務完成後關閉執行器。wait=True為關閉,為False則是不關閉執行器的意思。
Ps:其實對於執行緒池或程式池來說,他們的池都有一個官方的名稱叫做執行器,介面都是一樣的。那麼接下來我就將執行緒池程式池這樣的名字換做執行器了,也是方便理解。

基本使用


  其實關於執行器的使用,我們有兩種方式,一種是依賴於with語句,一種是不依賴於with語句,那麼我在這裡推薦使用依賴於wait語句的執行器。

  不依賴於with語句的執行器使用:

import threading
from concurrent.futures import ThreadPoolExecutor # 執行緒池執行器 def task():
print("執行了") if __name__ == '__main__':
pool = ThreadPoolExecutor(max_workers=2) # 這裡代表有2個執行緒可以領取任務 , 對於執行緒池來講它是預設值是CPU核心數+4,對於程式池來講最大開啟的程式數是CPU核心數。
for i in range(10):
pool.submit(task) # 執行器啟動任務,將這些任務給2個人分配,也就是說task這個任務會被這2個執行緒不斷的執行,直到執行完畢後這2個執行緒才會死亡 # ==== 執行結果 ==== Ps:可以看到這裡都讓這2個執行緒把任務接了,記憶體開銷相比於上面的要小。 """
執行了
執行了
執行了
執行了
執行了
執行了
執行了
執行了
執行了
執行了
"""

  依賴於with語句的執行器使用:

import threading
from concurrent.futures import ThreadPoolExecutor # 執行緒池執行器 def task():
print("執行了")
# 銷燬 if __name__ == '__main__':
with ThreadPoolExecutor(max_workers=2) as pool: # 這裡代表有2個執行緒可以領取任務 , 對於執行緒池來講它是預設值是CPU核心數+4,對於程式池來講最大開啟的程式數是CPU核心數。
for i in range(10):
pool.submit(task) # 執行器啟動任務,將這些任務給2個人分配,也就是說task這個任務會被這2個執行緒不斷的執行,直到執行完畢後這2個執行緒才會死亡 # ==== 執行結果 ==== Ps:可以看到這裡都讓這2個執行緒把任務接了,記憶體開銷相比於上面的要小。 """
執行了
執行了
執行了
執行了
執行了
執行了
執行了
執行了
執行了
執行了
"""

期程物件

方法大全


期程物件(由執行器執行的任務的返回結果)方法大全
方法/屬性名稱 功能描述
cancel() 嘗試取消呼叫。 如果呼叫正在執行或已結束執行不能被取消則該方法將返回 False,否則呼叫會被取消並且該方法將返回 True
cancelled() 如果呼叫成功取消返回 True
running() 如果呼叫正在執行而且不能被取消那麼返回 True
done() 如果呼叫已被取消或正常結束那麼返回 True
result(timeout=None) 即獲取任務的返回結果,最大等待timeout秒,如不設定則死等,超時觸發CancelledError異常。
add_done_callback(fn) 增加回調函式fn,這個fn應該至少有一個形參來接收當前期程物件。
exception(timeout=None) 返回由呼叫引發的異常。如果呼叫還沒完成那麼這個方法將等待 timeout 秒。如果在 timeout 秒內沒有執行完成,concurrent.futures.TimeoutError將會被觸發。timeout 可以是整數或浮點數。如果 timeout 沒有指定或為 None,那麼等待時間就沒有限制。
Ps:還有一些期程物件的方法沒有舉例出來。詳情參見檔案

期程物件的作用


  我們可以看到,我們上面的函式並沒有返回值,如果有返回值的話怎麼辦呢?

import threading
from concurrent.futures import ThreadPoolExecutor # 執行緒池執行器 def task():
print("執行了")
return "玫瑰花"
# 銷燬 if __name__ == '__main__':
with ThreadPoolExecutor(max_workers=2) as pool:
res = pool.submit(task)
print(res) # <Future at 0x2539ea97850 state=finished returned str> 這個就是期程物件,可以看到他裡面還有當前任務的執行狀態。 finished = 執行完了的意思
print(res.result()) # 通過該方法就可以拿到任務的返回結果 # ==== 執行結果 ==== """
執行了
<Future at 0x2539ea97850 state=finished returned str>
玫瑰花
"""

  期程物件,也被稱為未來物件,是一個非常重要的概念。這裡可以記一筆,在Django框架中也有些地方採取了期程物件這樣的設定,這是後話,後面再聊。

期程物件如何獲取返回結果


  我們嘗試著將它的任務數量增多,發現使用期程物件直接獲取任務結果會導致阻塞,怎麼解決?

import time
import threading
from concurrent.futures import ThreadPoolExecutor # 執行緒池執行器 def task(x):
print("執行了,這是第%s個任務"%x)
time.sleep(3)
return "玫瑰花"
# 銷燬 if __name__ == '__main__':
with ThreadPoolExecutor(max_workers=2) as pool:
for i in range(10):
res = pool.submit(task,i)
print(res.result()) # 每次獲取結果的時候都是阻塞,怎麼辦?這個速率就變得非常的Low逼了。 # ==== 執行結果 ==== """
執行了,這是第0個任務
玫瑰花
執行了,這是第1個任務
玫瑰花
執行了,這是第2個任務
玫瑰花
執行了,這是第3個任務
玫瑰花
執行了,這是第4個任務
玫瑰花
執行了,這是第5個任務
玫瑰花
執行了,這是第6個任務
玫瑰花
執行了,這是第7個任務
玫瑰花
執行了,這是第8個任務
玫瑰花
執行了,這是第9個任務
玫瑰花
"""

  我這裡有一個辦法,可以值得嘗試一下。就是執行器本身有個方法shutdown(wait=True),它會導致當前主執行緒的阻塞。那麼我們就可以這樣操作,主程式阻塞住,再將啟程物件全部放到一個列表中,當所有任務處理完畢後阻塞通行,這個時候我們再迴圈這個列表拿出其中的結果。

import time
import threading
from concurrent.futures import ThreadPoolExecutor # 執行緒池執行器 def task(x):
print("執行了,這是第%s個任務"%x)
time.sleep(3)
return "玫瑰花"
# 銷燬 if __name__ == '__main__': res_list = [] # 用於存放所有期程物件 with ThreadPoolExecutor(max_workers=2) as pool:
for i in range(10):
res = pool.submit(task,i)
res_list.append(res) # 將期程物件放入列表 pool.shutdown(wait=True) # 代表必須將所有子執行緒的任務跑完再繼續向下執行主執行緒。 for i in res_list:
print(i.result()) # ==== 執行結果 ==== """
執行了,這是第0個任務
執行了,這是第1個任務
執行了,這是第2個任務
執行了,這是第3個任務
執行了,這是第4個任務
執行了,這是第5個任務
執行了,這是第6個任務
執行了,這是第7個任務
執行了,這是第8個任務
執行了,這是第9個任務
玫瑰花
玫瑰花
玫瑰花
玫瑰花
玫瑰花
玫瑰花
玫瑰花
玫瑰花
玫瑰花
玫瑰花
"""

  如果你覺得這種方法很贊,我只能送你兩個字,太low了。我們注意執行器的submit()方法,這玩意兒是非同步提交。非同步提交的結果需要用到回撥函式來進行呼叫,我們來看一下它有多牛逼。

回撥函式


import time
import threading
from concurrent.futures import ThreadPoolExecutor # 執行緒池執行器 def task(x):
print("執行了,這是第%s個任務"%x)
time.sleep(3)
return "玫瑰花"
# 銷燬 def callback(res): # 必須有一個形參,來接收期程物件
print(res.result()) # 列印結果,即task任務的返回結果 if __name__ == '__main__': with ThreadPoolExecutor(max_workers=2) as pool:
for i in range(10):
res = pool.submit(task,i)
res.add_done_callback(callback) # <--- 增加回調函式,當期程物件中的任務處理狀態完畢後將自動呼叫回撥函式 # ==== 執行結果 ==== # 非同步提交牛逼不?只要任務返回了我們立馬就可以獲取到結果進行處理。 """
執行了,這是第0個任務
執行了,這是第1個任務
玫瑰花
玫瑰花
執行了,這是第2個任務
執行了,這是第3個任務
玫瑰花
玫瑰花
執行了,這是第4個任務
執行了,這是第5個任務
玫瑰花
玫瑰花
執行了,這是第6個任務
執行了,這是第7個任務
玫瑰花
玫瑰花
執行了,這是第8個任務
執行了,這是第9個任務
玫瑰花
玫瑰花
"""

擴充套件:程式池執行器任務資料共享

  當我們使用程式池執行器啟動多程式執行任務時,如果想用資料共享,單純multiprocessing.Queue程式佇列並不支援。

import multiprocessing
from concurrent.futures import ProcessPoolExecutor # 程式池執行器 def task_1(q):
q.put("玫瑰花")
print("放完了...") def task_2(q):
print(q.get())
print("取到了") if __name__ == '__main__': q = multiprocessing.Queue() with ProcessPoolExecutor(max_workers=2) as pool:
pool.submit(task_1,q)
pool.submit(task_2,q) # ==== 執行結果 ==== # 阻塞住 """ """

  這個時候我們需要用到multiprocessing中的Manager()中的Queue

from multiprocessing import Manager
from concurrent.futures import ProcessPoolExecutor # 程式池執行器 def task_1(q):
q.put("玫瑰花")
print("放完了...") def task_2(q):
print(q.get())
print("取到了") if __name__ == '__main__': q = Manager().Queue() with ProcessPoolExecutor(max_workers=2) as pool:
pool.submit(task_1,q)
pool.submit(task_2,q) # ==== 執行結果 ==== # 成功 """
放完了...
玫瑰花
取到了
"""