python實現阻塞佇列
怎麼實現阻塞佇列?當然是靠鎖,但是應該怎麼鎖?一把鎖能在not_empty,not_full,all_tasks_done三個條件之間共享。好比說,現在有執行緒A執行緒B,他們準備向佇列put任務,佇列的最大長度是5,執行緒A在put時,還沒完事,執行緒B就開始put,佇列就塞不下了,所以當執行緒A搶佔到put權時應該加把鎖,不讓執行緒B對佇列操作。鬼畜區經常看到的計數君,線上程中也同樣重要,每次put完unfinished要加一,get完unfinished要減一。
import threading import time from collections import deque class BlockQueue: def __init__(self, maxsize=0): self.mutex = threading.Lock() self.maxsize = maxsize self.not_full = threading.Condition(self.mutex) self.not_empty = threading.Condition(self.mutex) self.all_task_done = threading.Condition(self.mutex) self.unfinished = 0
一、初始化函式:一把鎖三個條件(self.mutex,(self.not_full, self.not_empty, self.all_task_done)),最大長度與計數君(self.maxsize,self.unfinished)
def task_done(self): with self.all_task_done: unfinish = self.unfinished - 1 if unfinish <= 0: if unfinish < 0: raise ValueError("The number of calls to task_done() is greater than the number of queue elements") self.all_task_done.notify_all() self.unfinished = unfinish
二、 task_done(self):每一次put完都會呼叫一次task_done,而且呼叫的次數不能比佇列的元素多。計數君對應的方法,unfinished<0時的意思是呼叫task_done的次數比列表的元素多,這種情況就會丟擲異常。
def join(self):
with self.all_task_done:
while self.unfinished:
self.all_task_done.wait()
三、join(self):阻塞方法,是一個十分重要的方法,但它的實現也不難,只要沒有完成任務就一直wait(),就是當計數君unfinished > 0 時就一直wait()知道unfinished=0跳出迴圈。
def put(self, item, block=True, timeout=None):
with self.not_full:
if self.maxsize > 0:
if not block:
if self._size() >= self.maxsize:
raise Exception("The BlockQueue is full")
elif timeout is not None:
self.not_full.wait()
elif timeout < 0:
raise Exception("The timeout period cannot be negative")
else:
endtime = time.time() + timeout
while self._size() >= self.maxsize:
remaining = endtime - time.time()
if remaining <= 0.0:
raise Exception("The BlockQueue is full")
self.not_full.wait(remaining)
self.queue.append(item)
self.unfinished += 1
self.not_empty.notify()
四、put(self, item, block=True, timeout=None):block=True一直阻塞直到有一個空閒的插槽可用,n秒內阻塞,如果在那個時間沒有空閒的插槽,則會引發完全的異常。Block=False如果一個空閒的槽立即可用,則在佇列上放置一個條目,否則就會引發完全的異常(在這種情況下,“超時”將被忽略)。有空位,新增到佇列沒結束的任務+1,他最後要喚醒not_empty.notify(),因為一開始是要在沒滿的情況下加鎖,滿了就等待not_full.wait,當put完以後就有東西了,每當一個item被新增到佇列時,通知not_empty等待獲取的執行緒會被通知。
def get(self, block=True, timeout=None):
with self.not_empty:
if not block:
if self._size() <= 0:
raise Exception("The queue is empty and you can't call get ()")
elif timeout is None:
while not self._size():
self.not_empty.wait()
elif timeout < 0:
raise ValueError("The timeout cannot be an integer")
else:
endtime = time.time() + timeout
remaining = endtime - time.time()
if remaining <= 0.0:
raise Exception("The BlockQueue is empty")
self.not_empty.wait(remaining)
item = self._get()
self.not_full.notify()
return item
五、如果可選的args“block”為True,並且timeout是None,則在必要時阻塞,直到有一個專案可用為止。如果“超時”是一個非負數,它會阻塞n秒,如果在那個時間內沒有可get()的項,則會丟擲空異常。否則'block'是False,如果立即可用,否則就會丟擲空異常,在這種情況下會忽略超時。同理要喚醒not_full.notify