1. 程式人生 > >建立程序池與執行緒池concurrent.futures模組的使用

建立程序池與執行緒池concurrent.futures模組的使用

一、程序池。

當併發的任務數量遠遠大於計算機所能承受的範圍,即無法一次性開啟過多的任務數量就應該考慮去 限制程序數或執行緒數,從而保證伺服器不會因超載而癱瘓。這時候就出現了程序池和執行緒池。

二、concurrent.futures模組介紹

concurrent.futures模組提供了高度封裝的非同步呼叫介面

ThreadPoolExecutor:執行緒池,提供非同步呼叫

ProcessPoolExecutor:程序池,提供非同步呼叫

Both implement the same interface, which is defined by the abstract Executor class

三、基本方法:

submit(fn, *args, **kwargs):非同步提交任務

map(func, *iterables, timeout=None, chunksize=1):取代for迴圈submit的操作

shutdown(wait=True):相當於程序池的pool.close()+pool.join()操作

  • wait=True,等待池內所有任務執行完畢回收完資源後才繼續
  • wait=False,立即返回,並不會等待池內的任務執行完畢
  • 但不管wait引數為何值,整個程式都會等到所有任務執行完畢
  • submit和map必須在shutdown之前

result(timeout=None)

:取得結果

add_done_callback(fn):回撥函式

done():判斷某一個執行緒是否完成

cancle():取消某個任務

四、程序池程式碼例項——ProcessPoolExecutor

from concurrent.futures import ProcessPoolExecutor
from multiprocessing import current_process
import time

def func(i):
    print(f'程序 {current_process().name} 正在執行任務 {i}')
    time.sleep(1)
    return i**2

if __name__ == '__main__':
    pool = ProcessPoolExecutor(4)  # 程序池只有4個程序
    lt = []
    for i in range(20):  # 假設執行20個任務
        future = pool.submit(func,i)   # func任務要做20次,4個程序負責完成這個20個任務
        # print(future.result())   # 如果沒有結果就一直等待拿到結果,導致了所有任務都在序列
        lt.append(future)
    pool.shutdown() # 預設為True,關閉了池的入口,會等待所有的任務執行完,結束阻塞,
    for fu in lt:
        print(fu.result())  # 等待所有的任務都執行完了,一起把返回值打印出來

五、執行緒池程式碼示例——ThreadPoolExecutor

from concurrent.futures import ThreadPoolExecutor
from threading import currentThread
import time

def func(i):
    print(f'執行緒 {currentThread().name} 正在執行任務 {i}')
    time.sleep(1)
    return i**2

if __name__ == '__main__':
    fool = ThreadPoolExecutor(4)  # 執行緒池裡只有4個執行緒
    lt = []
    for i in range(20):
        futrue = fool.submit(func,i)   # func任務要做20次,4個執行緒負責完成這20次任務
        lt.append(futrue)
    fool.shutdown()  # 預設為True,關閉了池的入口,會等待所有的任務執行完,結束阻塞,
    for fu in lt:
        print(fu.result())   # 等待所有的任務都執行完了,一起把返回值打印出來

六、回撥函式add_done_callback(fn)

提交任務的兩種方式:
同步: 提交了一個任務,必須等任務執行完了(拿到返回值),才能執行下一行程式碼
非同步: 提交了一個任務,不要等執行完了,可以直接執行下一行程式碼。

ps:程序和執行緒回撥方法的使用寫一塊了,註釋掉的是程序的使用。

from concurrent.futures import ProcessPoolExecutor,ThreadPoolExecutor
from threading import currentThread
from multiprocessing import current_process
import time

def task(i):
    print(f'執行緒 {currentThread().name} 正在執行任務 {i}')
    # print(f'程序 {current_process().name} 正在執行任務 {i}')
    time.sleep(1)
    return i**2

def parse(futrue):
    # 處理拿到的結果
    print(futrue.result())

if __name__ == '__main__':
    pool = ThreadPoolExecutor(4)  # 執行緒池裡只有4個執行緒
    # pool = ProcessPoolExecutor(4)  # 程序池裡只有4個程序
    lt = []
    for i in range(20):
        futrue = pool.submit(task,i)  # task任務要做20次,分別由四個程序完成這20個任務
        futrue.add_done_callback(parse)
        # 為當前任務繫結一個函式,在當前任務執行結束的時候會觸發這個函式
        # 會把futrue物件作為引數傳給函式
        # 這個稱之為回撥函式,處理完了回來就呼叫這個函式。

跟上面執行緒池裡的例子相比:回撥函式的作用,不需要等待所有的任務執行完才打印返回值。每執行完一個任務直接列印結果,實現一個併發的效果,效率有所提升