1. 程式人生 > 實用技巧 >python-執行緒池-程序池

python-執行緒池-程序池

執行緒池程序池 concurrent.futures

  • 什麼是池
    • 要在程式開始的時候,還沒提交任務先建立幾個執行緒或者程序放在一個池子裡,這就是池
  • 為什麼要用池
    • 如果先開好程序/執行緒,那麼有任務之後就可以直接使用這個池中的資料了
    • 並且開好的執行緒或者程序會一直存在在池中,可以被多個任務反覆利用。這樣極大的減少了開啟\關閉\排程執行緒/程序的時間開銷
    • 池中的執行緒/程序個數控制了作業系統需要排程的任務個數,控制池中的單位,有利於跳作業系統的效率,減輕作業系統的負擔

concurrent.futures模組

from concurrent.futures import ThreadPoolExecutor,ProcessPoolExecutor

ThreadPoolExecutor執行緒池 ProcessPoolExecutor程序池

  • 例項化建立池向池中提交任務:submit
  • 執行緒池
from concurrent.futures import ThreadPoolExecutor,ProcessPoolExecutor

from threading import current_thread


def func():
    print(current_thread().ident)

t_pool = ThreadPoolExecutor(4)  # 執行緒池中執行緒的數量為4

for i in range(20):
    t_pool.submit(func)  # submit非同步提交任務
import time, random
from concurrent.futures import ThreadPoolExecutor, ProcessPoolExecutor
from threading import current_thread


def func():
    print(current_thread().ident, 'start')
    time.sleep(random.randint(1, 4))
    print(current_thread().ident, 'end')


t_pool = ThreadPoolExecutor(4)  # 執行緒池中執行緒的數量為4

for i in range(20):
    t_pool.submit(func)  # submit非同步提交任務
  • 傳引數(可以位置傳參,關鍵字傳參)
import time, random
from concurrent.futures import ThreadPoolExecutor, ProcessPoolExecutor
from threading import current_thread


def func(name,name2):
    print(current_thread().ident, name)
    time.sleep(random.randint(1, 4))
    print(current_thread().ident, name2)


t_pool = ThreadPoolExecutor(4)  # 執行緒池中執行緒的數量為4

for i in range(20):
    t_pool.submit(func, 'alex', 'taibai')  # submit非同步提交任務
  • 開啟程序池(程序池不要忘了加if __name__ = '__main__':
import time, random
import os
from concurrent.futures import ThreadPoolExecutor, ProcessPoolExecutor


def func(name,name2):
    print(os.getpid(), name)  # 當前程序pid
    time.sleep(random.randint(1, 4))
    print(os.getpid(), name2)

if __name__ == '__main__':  # 記得加
    t_pool = ProcessPoolExecutor(4)  # 程序池中程序的數量為4

    for i in range(20):
        t_pool.submit(func, 'alex', 'taibai')  # submit非同步提交任務
  • 獲取任務結果
from concurrent.futures import ThreadPoolExecutor, ProcessPoolExecutor
import time
import os
import random


def func(a, b):
    print(os.getpid(), 'start', a, b)
    time.sleep(random.randint(1, 4))
    print(os.getpid(), 'end')
    return a+b


if __name__ == '__main__':
    tp = ProcessPoolExecutor(4)
    future_l = {}
    for i in range(20):  # 非同步非阻塞的
        ret = tp.submit(func, i, i+1)
        future_l[i] = ret
        #  print(ret.result())  # 變成串行了
    for key in future_l:  # 同步阻塞的
        print(key, future_l[key].result())
  • map(只適合傳遞簡單的引數,並且必須是一個可迭代的資料型別作為引數)
# 效果和上面獲取任務結果一樣,只是少了個for迴圈
from concurrent.futures import ThreadPoolExecutor, ProcessPoolExecutor
import time
import os
import random


def func(a):
    b = a+1
    print(os.getpid(), 'start', a, b)
    time.sleep(random.randint(1, 4))
    print(os.getpid(), 'end')
    return a+b


if __name__ == '__main__':
    tp = ProcessPoolExecutor(4)
    ret = tp.map(func, range(20))
    for key in ret:  # 同步阻塞
        print(key)
  • 回撥函式 ret.add_done_callback(函式) (效率最高)
from concurrent.futures import ThreadPoolExecutor
from threading import current_thread
import time
import random


def func(a, b):
    print(current_thread().ident, 'start', a, b)
    time.sleep(random.randint(1, 4))
    print(current_thread().ident, 'end')
    return a+b  # 可以做個標識  return (a, a+b)

def print_func(ret):  # 非同步阻塞
    print(ret.result())


if __name__ == '__main__':
    tp = ThreadPoolExecutor(4)  # 開四個執行緒
    for i in range(20):  # 非同步非阻塞
        ret = tp.submit(func, i, i+1)  # 往執行緒池中提交任務
        ret.add_done_callback(print_func)  # 回撥函式
        # ret這個任務再執行完畢的瞬間立即觸發print_func函式,並且把任務的返回值物件傳遞到print_func做引數
        # 非同步阻塞 回撥函式 給ret物件繫結一個回撥函式,等待ret對應的任務有了結果之後立即呼叫print_func這個函式,就可以對結果立即進行處理,而不用按照順序接收處理結果。。非同步阻塞
  • shutdown
from concurrent.futures import ThreadPoolExecutor, ProcessPoolExecutor
import time
import os
import random


def func(a, b):
    print(os.getpid(), 'start', a, b)
    time.sleep(random.randint(1, 4))
    print(os.getpid(), 'end')
    return a+b


if __name__ == '__main__':
    tp = ProcessPoolExecutor(4)
    future_l = {}
    for i in range(20):  # 非同步非阻塞的
        ret = tp.submit(func, i, i+1)
        future_l[i] = ret
    tp.shutdown()  # 關閉執行緒池,等待執行緒池中所有的任務執行完畢
        #  print(ret.result())  # 變成串行了
    for key in future_l:  # 同步阻塞的
        print(key, future_l[key].result())