1. 程式人生 > 程式設計 >python執行緒池如何使用

python執行緒池如何使用

執行緒池的使用

執行緒池的基類是 concurrent.futures 模組中的 Executor,Executor 提供了兩個子類,即 ThreadPoolExecutor 和ProcessPoolExecutor,其中 ThreadPoolExecutor 用於建立執行緒池,而 ProcessPoolExecutor 用於建立程序池。

如果使用執行緒池/程序池來管理併發程式設計,那麼只要將相應的 task 函式提交給執行緒池/程序池,剩下的事情就由執行緒池/程序池來搞定。

Exectuor 提供瞭如下常用方法:

  • submit(fn,*args,**kwargs):將 fn 函式提交給執行緒池。*args 代表傳給 fn 函式的引數,*kwargs 代表以關鍵字引數的形式為 fn 函式傳入引數。
  • map(func,*iterables,timeout=None,chunksize=1):該函式類似於全域性函式 map(func,*iterables),只是該函式將會啟動多個執行緒,以非同步方式立即對 iterables 執行 map 處理。
  • shutdown(wait=True):關閉執行緒池。

程式將 task 函式提交(submit)給執行緒池後,submit 方法會返回一個 Future 物件,Future 類主要用於獲取執行緒任務函式的返回值。由於執行緒任務會在新執行緒中以非同步方式執行,因此,執行緒執行的函式相當於一個“將來完成”的任務,所以 Python 使用 Future 來代表。

實際上,在 Java 的多執行緒程式設計中同樣有 Future,此處的 Future 與 Java 的 Future 大同小異。

Future 提供瞭如下方法:

  • cancel():取消該 Future 代表的執行緒任務。如果該任務正在執行,不可取消,則該方法返回 False;否則,程式會取消該任務,並返回 True。
  • cancelled():返回 Future 代表的執行緒任務是否被成功取消。
  • running():如果該 Future 代表的執行緒任務正在執行、不可被取消,該方法返回 True。
  • done():如果該 Funture 代表的執行緒任務被成功取消或執行完成,則該方法返回 True。
  • result(timeout=None):獲取該 Future 代表的執行緒任務最後返回的結果。如果 Future 代表的執行緒任務還未完成,該方法將會阻塞當前執行緒,其中 timeout 引數指定最多阻塞多少秒。
  • exception(timeout=None):獲取該 Future 代表的執行緒任務所引發的異常。如果該任務成功完成,沒有異常,則該方法返回 None。
  • add_done_callback(fn):為該 Future 代表的執行緒任務註冊一個“回撥函式”,當該任務成功完成時,程式會自動觸發該 fn 函式。

在用完一個執行緒池後,應該呼叫該執行緒池的 shutdown() 方法,該方法將啟動執行緒池的關閉序列。呼叫 shutdown() 方法後的執行緒池不再接收新任務,但會將以前所有的已提交任務執行完成。當執行緒池中的所有任務都執行完成後,該執行緒池中的所有執行緒都會死亡。

使用執行緒池來執行執行緒任務的步驟如下:

a、呼叫 ThreadPoolExecutor 類的構造器建立一個執行緒池。

b、定義一個普通函式作為執行緒任務。

c、呼叫 ThreadPoolExecutor 物件的 submit() 方法來提交執行緒任務。

d、當不想提交任何任務時,呼叫 ThreadPoolExecutor 物件的 shutdown() 方法來關閉執行緒池。

下面程式示範瞭如何使用執行緒池來執行執行緒任務:

from concurrent.futures import ThreadPoolExecutor
import threading
import time
# 定義一個準備作為執行緒任務的函式
def action(max):
  my_sum = 0
  for i in range(max):
    print(threading.current_thread().name + ' ' + str(i))
    my_sum += i
  return my_sum
# 建立一個包含2條執行緒的執行緒池
pool = ThreadPoolExecutor(max_workers=2)
# 向執行緒池提交一個task,50會作為action()函式的引數
future1 = pool.submit(action,50)
# 向執行緒池再提交一個task,100會作為action()函式的引數
future2 = pool.submit(action,100)
# 判斷future1代表的任務是否結束
print(future1.done())
time.sleep(3)
# 判斷future2代表的任務是否結束
print(future2.done())
# 檢視future1代表的任務返回的結果
print(future1.result())
# 檢視future2代表的任務返回的結果
print(future2.result())
# 關閉執行緒池
pool.shutdown()

上面程式中,第 13 行程式碼建立了一個包含兩個執行緒的執行緒池,接下來的兩行程式碼只要將 action() 函式提交(submit)給執行緒池,該執行緒池就會負責啟動執行緒來執行 action() 函式。這種啟動執行緒的方法既優雅,又具有更高的效率。

當程式把 action() 函式提交給執行緒池時,submit() 方法會返回該任務所對應的 Future 物件,程式立即判斷 futurel 的 done() 方法,該方法將會返回 False(表明此時該任務還未完成)。接下來主程式暫停 3 秒,然後判斷 future2 的 done() 方法,如果此時該任務已經完成,那麼該方法將會返回 True。

程式最後通過 Future 的 result() 方法來獲取兩個非同步任務返回的結果。

讀者可以自己執行此程式碼檢視執行結果,這裡不再演示。

當程式使用 Future 的 result() 方法來獲取結果時,該方法會阻塞當前執行緒,如果沒有指定 timeout 引數,當前執行緒將一直處於阻塞狀態,直到 Future 代表的任務返回。

獲取執行結果

前面程式呼叫了 Future 的 result() 方法來獲取執行緒任務的運回值,但該方法會阻塞當前主執行緒,只有等到錢程任務完成後,result() 方法的阻塞才會被解除。

如果程式不希望直接呼叫 result() 方法阻塞執行緒,則可通過 Future 的 add_done_callback() 方法來添加回調函式,該回調函式形如 fn(future)。當執行緒任務完成後,程式會自動觸發該回調函式,並將對應的 Future 物件作為引數傳給該回調函式。

下面程式使用 add_done_callback() 方法來獲取執行緒任務的返回值:

from concurrent.futures import ThreadPoolExecutor
import threading
import time
# 定義一個準備作為執行緒任務的函式
def action(max):
  my_sum = 0
  for i in range(max):
    print(threading.current_thread().name + ' ' + str(i))
    my_sum += i
  return my_sum
# 建立一個包含2條執行緒的執行緒池
with ThreadPoolExecutor(max_workers=2) as pool:
  # 向執行緒池提交一個task,50會作為action()函式的引數
  future1 = pool.submit(action,50)
  # 向執行緒池再提交一個task,100會作為action()函式的引數
  future2 = pool.submit(action,100)
  def get_result(future):
    print(future.result())
  # 為future1新增執行緒完成的回撥函式
  future1.add_done_callback(get_result)
  # 為future2新增執行緒完成的回撥函式
  future2.add_done_callback(get_result)
  print('--------------')

上面主程式分別為 future1、future2 添加了同一個回撥函式,該回調函式會線上程任務結束時獲取其返回值。

主程式的最後一行程式碼列印了一條橫線。由於程式並未直接呼叫 future1、future2 的 result() 方法,因此主執行緒不會被阻塞,可以立即看到輸出主執行緒打印出的橫線。接下來將會看到兩個新執行緒併發執行,當執行緒任務執行完成後,get_result() 函式被觸發,輸出執行緒任務的返回值。

另外,由於執行緒池實現了上下文管理協議(Context Manage Protocol),因此,程式可以使用 with 語句來管理執行緒池,這樣即可避免手動關閉執行緒池,如上面的程式所示。

此外,Exectuor 還提供了一個 map(func,chunksize=1) 方法,該方法的功能類似於全域性函式 map(),區別在於執行緒池的 map() 方法會為 iterables 的每個元素啟動一個執行緒,以併發方式來執行 func 函式。這種方式相當於啟動 len(iterables) 個執行緒,井收集每個執行緒的執行結果。

例如,如下程式使用 Executor 的 map() 方法來啟動執行緒,並收集執行緒任務的返回值:

from concurrent.futures import ThreadPoolExecutor
import threading
import time
# 定義一個準備作為執行緒任務的函式
def action(max):
  my_sum = 0
  for i in range(max):
    print(threading.current_thread().name + ' ' + str(i))
    my_sum += i
  return my_sum
# 建立一個包含4條執行緒的執行緒池
with ThreadPoolExecutor(max_workers=4) as pool:
  # 使用執行緒執行map計算
  # 後面元組有3個元素,因此程式啟動3條執行緒來執行action函式
  results = pool.map(action,(50,100,150))
  print('--------------')
  for r in results:
print(r)

上面程式使用 map() 方法來啟動 3 個執行緒(該程式的執行緒池包含 4 個執行緒,如果繼續使用只包含兩個執行緒的執行緒池,此時將有一個任務處於等待狀態,必須等其中一個任務完成,執行緒空閒出來才會獲得執行的機會),map() 方法的返回值將會收集每個執行緒任務的返回結果。

執行上面程式,同樣可以看到 3 個執行緒併發執行的結果,最後通過 results 可以看到 3 個執行緒任務的返回結果。

通過上面程式可以看出,使用 map() 方法來啟動執行緒,並收集執行緒的執行結果,不僅具有程式碼簡單的優點,而且雖然程式會以併發方式來執行 action() 函式,但最後收集的 action() 函式的執行結果,依然與傳入引數的結果保持一致。也就是說,上面 results 的第一個元素是 action(50) 的結果,第二個元素是 action(100) 的結果,第三個元素是 action(150) 的結果。

例項擴充套件:

# coding:utf-8

import Queue
import threading
import sys
import time
import math


class WorkThread(threading.Thread):

def __init__(self,task_queue):
threading.Thread.__init__(self)
self.setDaemon(True)
self.task_queue = task_queue
self.start()
self.idle = True

def run(self):
sleep_time = 0.01 # 第1次無任務可做時休息10毫秒
multiply = 0
while True:
try:
# 從佇列中取一個任務
func,args,kwargs = self.task_queue.get(block=False)
self.idle = False
multiply = 0
# 執行之
func(*args,**kwargs)
except Queue.Empty:
time.sleep(sleep_time * math.pow(2,multiply))
self.idle = True
multiply += 1
continue
except:
print sys.exc_info()
raise


class ThreadPool:

def __init__(self,thread_num=10,max_queue_len=1000):
self.max_queue_len = max_queue_len
self.task_queue = Queue.Queue(max_queue_len) # 任務等待佇列
self.threads = []
self.__create_pool(thread_num)

def __create_pool(self,thread_num):
for i in xrange(thread_num):
thread = WorkThread(self.task_queue)
self.threads.append(thread)

def add_task(self,func,**kwargs):
'''新增一個任務,返回任務等待佇列的長度
呼叫該方法前最後先呼叫isSafe()判斷一下等待的任務是不是很多,以防止提交的任務被拒絕
'''
try:
self.task_queue.put((func,kwargs))
except Queue.Full:
raise # 佇列已滿時直接丟擲異常,不給執行
return self.task_queue.qsize()

def isSafe(self):
'''等待的任務數量離警界線還比較遠
'''
return self.task_queue.qsize() < 0.9 * self.max_queue_len

def wait_for_complete(self):
'''等待提交到執行緒池的所有任務都執行完畢
'''
#首先任務等待佇列要變成空
while not self.task_queue.empty():
time.sleep(1)
# 其次,所以計算執行緒要變成idle狀態
while True:
all_idle = True
for th in self.threads:
if not th.idle:
all_idle = False
break
if all_idle:
break
else:
time.sleep(1)


if __name__ == '__main__':
def foo(a,b):
print a + b
time.sleep(0.01)
thread_pool = ThreadPool(10,100)
'''在Windows上測試不通過,Windows上Queue.Queue不是執行緒安全的'''
size = 0
for i in xrange(10000):
try:
size = thread_pool.add_task(foo,i,2 * i)
except Queue.Full:
print 'queue full,queue size is ',size
time.sleep(2)

到此這篇關於python執行緒池如何使用的文章就介紹到這了,更多相關python中的執行緒池詳解內容請搜尋我們以前的文章或繼續瀏覽下面的相關文章希望大家以後多多支援我們!