Python如何實現執行緒間通訊
問題
你的程式中有多個執行緒,你需要在這些執行緒之間安全地交換資訊或資料
解決方案
從一個執行緒向另一個執行緒傳送資料最安全的方式可能就是使用 queue 庫中的隊列了。建立一個被多個執行緒共享的 Queue 物件,這些執行緒通過使用 put() 和 get() 操作來向佇列中新增或者刪除元素。 例如:
from queue import Queue from threading import Thread # A thread that produces data def producer(out_q): while True: # Produce some data ... out_q.put(data) # A thread that consumes data def consumer(in_q): while True: # Get some data data = in_q.get() # Process the data ... # Create the shared queue and launch both threads q = Queue() t1 = Thread(target=consumer,args=(q,)) t2 = Thread(target=producer,)) t1.start() t2.start()
Queue 物件已經包含了必要的鎖,所以你可以通過它在多個執行緒間多安全地共享資料。 當使用佇列時,協調生產者和消費者的關閉問題可能會有一些麻煩。一個通用的解決方法是在佇列中放置一個特殊的值,當消費者讀到這個值的時候,終止執行。例如:
from queue import Queue from threading import Thread # Object that signals shutdown _sentinel = object() # A thread that produces data def producer(out_q): while running: # Produce some data ... out_q.put(data) # Put the sentinel on the queue to indicate completion out_q.put(_sentinel) # A thread that consumes data def consumer(in_q): while True: # Get some data data = in_q.get() # Check for termination if data is _sentinel: in_q.put(_sentinel) break # Process the data ...
本例中有一個特殊的地方:消費者在讀到這個特殊值之後立即又把它放回到佇列中,將之傳遞下去。這樣,所有監聽這個佇列的消費者執行緒就可以全部關閉了。 儘管佇列是最常見的執行緒間通訊機制,但是仍然可以自己通過建立自己的資料結構並新增所需的鎖和同步機制來實現執行緒間通訊。最常見的方法是使用 Condition
變數來包裝你的資料結構。下邊這個例子演示瞭如何建立一個執行緒安全的優先順序佇列
import heapq import threading class PriorityQueue: def __init__(self): self._queue = [] self._count = 0 self._cv = threading.Condition() def put(self,item,priority): with self._cv: heapq.heappush(self._queue,(-priority,self._count,item)) self._count += 1 self._cv.notify() def get(self): with self._cv: while len(self._queue) == 0: self._cv.wait() return heapq.heappop(self._queue)[-1]
使用佇列來進行執行緒間通訊是一個單向、不確定的過程。通常情況下,你沒有辦法知道接收資料的執行緒是什麼時候接收到的資料並開始工作的。不過佇列物件提供一些基本完成的特性,比如下邊這個例子中的 task_done()
和 join()
:
from queue import Queue from threading import Thread # A thread that produces data def producer(out_q): while running: # Produce some data ... out_q.put(data) # A thread that consumes data def consumer(in_q): while True: # Get some data data = in_q.get() # Process the data ... # Indicate completion in_q.task_done() # Create the shared queue and launch both threads q = Queue() t1 = Thread(target=consumer,)) t1.start() t2.start() # Wait for all produced items to be consumed q.join()
如果一個執行緒需要在一個“消費者”執行緒處理完特定的資料項時立即得到通知,你可以把要傳送的資料和一個 Event
放到一起使用,這樣“生產者”就可以通過這個Event
物件來監測處理的過程了。示例如下:
from queue import Queue from threading import Thread,Event # A thread that produces data def producer(out_q): while running: # Produce some data ... # Make an (data,event) pair and hand it to the consumer evt = Event() out_q.put((data,evt)) ... # Wait for the consumer to process the item evt.wait() # A thread that consumes data def consumer(in_q): while True: # Get some data data,evt = in_q.get() # Process the data ... # Indicate completion evt.set()
討論
基於簡單佇列編寫多執行緒程式在多數情況下是一個比較明智的選擇。從執行緒安全佇列的底層實現來看,你無需在你的程式碼中使用鎖和其他底層的同步機制,這些只會把你的程式弄得亂七八糟。此外,使用佇列這種基於訊息的通訊機制可以被擴充套件到更大的應用範疇,比如,你可以把你的程式放入多個程序甚至是分散式系統而無需改變底層的佇列結構。 使用執行緒佇列有一個要注意的問題是,向佇列中新增資料項時並不會複製此資料項,執行緒間通訊實際上是線上程間傳遞物件引用。如果你擔心物件的共享狀態,那你最好只傳遞不可修改的資料結構(如:整型、字串或者元組)或者一個物件的深拷貝。例如:
from queue import Queue from threading import Thread import copy # A thread that produces data def producer(out_q): while True: # Produce some data ... out_q.put(copy.deepcopy(data)) # A thread that consumes data def consumer(in_q): while True: # Get some data data = in_q.get() # Process the data ...
Queue
物件提供一些在當前上下文很有用的附加特性。比如在建立 Queue 物件時提供可選的 size
引數來限制可以新增到佇列中的元素數量。對於“生產者”與“消費者”速度有差異的情況,為佇列中的元素數量新增上限是有意義的。比如,一個“生產者”產生專案的速度比“消費者” “消費”的速度快,那麼使用固定大小的佇列就可以在佇列已滿的時候阻塞佇列,以免未預期的連鎖效應擴散整個程式造成死鎖或者程式執行失常。在通訊的執行緒之間進行“流量控制”是一個看起來容易實現起來困難的問題。如果你發現自己曾經試圖通過擺弄佇列大小來解決一個問題,這也許就標誌著你的程式可能存在脆弱設計或者固有的可伸縮問題。 get()
和 put()
方法都支援非阻塞方式和設定超時,例如:
import queue q = queue.Queue() try: data = q.get(block=False) except queue.Empty: ... try: q.put(item,block=False) except queue.Full: ... try: data = q.get(timeout=5.0) except queue.Empty: ...
這些操作都可以用來避免當執行某些特定佇列操作時發生無限阻塞的情況,比如,一個非阻塞的 put()
方法和一個固定大小的佇列一起使用,這樣當佇列已滿時就可以執行不同的程式碼。比如輸出一條日誌資訊並丟棄。
def producer(q): ... try: q.put(item,block=False) except queue.Full: log.warning('queued item %r discarded!',item)
如果你試圖讓消費者執行緒在執行像 q.get()
這樣的操作時,超時自動終止以便檢查終止標誌,你應該使用 q.get()
的可選引數 timeout
,如下:
_running = True def consumer(q): while _running: try: item = q.get(timeout=5.0) # Process item ... except queue.Empty: pass
最後,有 q.qsize()
, q.full()
, q.empty()
等實用方法可以獲取一個佇列的當前大小和狀態。但要注意,這些方法都不是執行緒安全的。可能你對一個佇列使用 empty()
判斷出這個佇列為空,但同時另外一個執行緒可能已經向這個佇列中插入一個數據項。所以,你最好不要在你的程式碼中使用這些方法。
以上就是Python如何實現執行緒間通訊的詳細內容,更多關於Python 執行緒間通訊的資料請關注我們其它相關文章!