1. 程式人生 > >python實現阻塞佇列

python實現阻塞佇列

怎麼實現阻塞佇列?當然是靠鎖,但是應該怎麼鎖?一把鎖能在not_empty,not_full,all_tasks_done三個條件之間共享。好比說,現在有執行緒A執行緒B,他們準備向佇列put任務,佇列的最大長度是5,執行緒A在put時,還沒完事,執行緒B就開始put,佇列就塞不下了,所以當執行緒A搶佔到put權時應該加把鎖,不讓執行緒B對佇列操作。鬼畜區經常看到的計數君,線上程中也同樣重要,每次put完unfinished要加一,get完unfinished要減一。

import threading
import time
from collections import deque


class BlockQueue:
    def __init__(self, maxsize=0):
        self.mutex = threading.Lock()
        self.maxsize = maxsize
        self.not_full = threading.Condition(self.mutex)
        self.not_empty = threading.Condition(self.mutex)
        self.all_task_done = threading.Condition(self.mutex)
        self.unfinished = 0

一、初始化函式:一把鎖三個條件(self.mutex,(self.not_full, self.not_empty, self.all_task_done)),最大長度與計數君(self.maxsize,self.unfinished)

    def task_done(self):
        with self.all_task_done:
            unfinish = self.unfinished - 1
            if unfinish <= 0:
                if unfinish < 0:
                    raise ValueError("The number of calls to task_done() is greater than the number of queue elements")
                self.all_task_done.notify_all()
            self.unfinished = unfinish

二、 task_done(self):每一次put完都會呼叫一次task_done,而且呼叫的次數不能比佇列的元素多。計數君對應的方法,unfinished<0時的意思是呼叫task_done的次數比列表的元素多,這種情況就會丟擲異常。

    def join(self):
        with self.all_task_done:
            while self.unfinished:
                self.all_task_done.wait()

三、join(self):阻塞方法,是一個十分重要的方法,但它的實現也不難,只要沒有完成任務就一直wait(),就是當計數君unfinished > 0 時就一直wait()知道unfinished=0跳出迴圈。

    def put(self, item, block=True, timeout=None):
        with self.not_full:
            if self.maxsize > 0:
                if not block:
                    if self._size() >= self.maxsize:
                        raise Exception("The BlockQueue is full")
                elif timeout is not None:
                    self.not_full.wait()
                elif timeout < 0:
                    raise Exception("The timeout period cannot be negative")
                else:
                    endtime = time.time() + timeout
                    while self._size() >= self.maxsize:
                        remaining = endtime - time.time()
                        if remaining <= 0.0:
                            raise Exception("The BlockQueue is full")
                        self.not_full.wait(remaining)
            self.queue.append(item)
            self.unfinished += 1
            self.not_empty.notify()

四、put(self, item, block=True, timeout=None):block=True一直阻塞直到有一個空閒的插槽可用,n秒內阻塞,如果在那個時間沒有空閒的插槽,則會引發完全的異常。Block=False如果一個空閒的槽立即可用,則在佇列上放置一個條目,否則就會引發完全的異常(在這種情況下,“超時”將被忽略)。有空位,新增到佇列沒結束的任務+1,他最後要喚醒not_empty.notify(),因為一開始是要在沒滿的情況下加鎖,滿了就等待not_full.wait,當put完以後就有東西了,每當一個item被新增到佇列時,通知not_empty等待獲取的執行緒會被通知。

    def get(self, block=True, timeout=None):
        with self.not_empty:
            if not block:
                if self._size() <= 0:
                    raise Exception("The queue is empty and you can't call get ()")
                elif timeout is None:
                    while not self._size():
                        self.not_empty.wait()
                elif timeout < 0:
                    raise ValueError("The timeout cannot be an integer")
                else:
                    endtime = time.time() + timeout
                    remaining = endtime - time.time()
                    if remaining <= 0.0:
                        raise Exception("The BlockQueue is empty")
                    self.not_empty.wait(remaining)
            item = self._get()
            self.not_full.notify()
            return item

五、如果可選的args“block”為True,並且timeout是None,則在必要時阻塞,直到有一個專案可用為止。如果“超時”是一個非負數,它會阻塞n秒,如果在那個時間內沒有可get()的項,則會丟擲空異常。否則'block'是False,如果立即可用,否則就會丟擲空異常,在這種情況下會忽略超時。同理要喚醒not_full.notify