1. 程式人生 > >day9--隊列queue

day9--隊列queue

行程 pty 宋體 implement stack prior 是否阻塞 likely 控制

queue隊列

Queue是python標準庫中的線程安全的隊列(FIFO)實現,提供了一個適用於多線程編程的先進先出的數據結構,即隊列,用來在生產者和消費者線程之間的信息傳遞。一個線程放入數據,另外一個線程取數據。

class queue.Queue(maxsize=0) #先入先出

class queue.LifoQueue(maxsize=0) #後入先出(Last in first out)

class queue.PriorityQueue(maxsize=0) #存儲數據時可設置優先級的隊列

隊列中的方法:

1.queue.Queue.get() #獲取隊列數據,當隊列是空的時候,會卡主,等待數據的放入,沒有數據放入,會一直阻塞

get(self, block=True, timeout=None) #默認狀態下,block()如果沒有數據是阻塞的

import queue

q = queue.Queue()
q.put(1)
q.put(2)
print(q.get())
print(q.get())
print(q.get())
運行程序:
1
2
阻塞................

隊列就是用來存取數據的,當數據取完之後,就會等待新的數據放入,get()就會一直等待,知道數據放入。要想不等待,可以使用下面方法:

當然,使用get()加上參數block=False也能實現和get_nowait()一樣的功能。

block=True(False)設置當隊列是空的時候,是否阻塞,True阻塞,False不阻塞,報錯。timeout=None(time)設置阻塞時間,即等待一段時間,如果在這段時間內,沒有數據放入,就報錯。

2.get_nowait() #獲取數據,如果隊列是空的,則報錯

import queue

q = queue.Queue()
q.put(1)
q.put(2)
print(q.get())
print(q.get())
print(q.get_nowait())
執行結果如下:
1
2
Traceback (most recent call last):
File "/home/zhuzhu/day9/隊列.py", line 8, in <module>
print(q.get_nowait())
File "/usr/lib/python3.5/queue.py", line 192, in get_nowait
return self.get(block=False)
File "/usr/lib/python3.5/queue.py", line 161, in get
raise Empty
queue.Empty

上面使用,get_nowait(),如果隊列是空的,則報錯,可以用異常來抓取異常,然後可以繼續執行。

3.queue.Queue.qsize() #判斷隊列裏面元素的個數

import queue

q = queue.Queue()
print(q.qsize())
q.put(1)
print(q.qsize())
q.put(2)
print(q.qsize())
執行結果:
0
1
2

q.qsize()是判斷隊列的長度,如果長度為0,說明隊列是空的,這個時候使用get()就要註意,程序會阻塞。

4.q.qut() #向隊列中放入數據

put(self, item, block=True, timeout=None)

put()和get()差不多一樣,put()當隊列滿的時候,會報錯,block是設置阻塞狀態是否開啟,timeout是設置阻塞時間,默認一直阻塞。

5.q.empty(self) #判斷隊列是否是空Return True if the queue is empty, False otherwise (not reliable!).

6.q.full() #判斷隊列是否是滿的Return True if the queue is full, False otherwise (not reliable!)

7.put_nowait() 等價於put(block=False) #如果隊列滿了,則報錯Put an item into the queue without blocking

下面來看一下LifoQueue,後進先出的情形:

import queue

q = queue.LifoQueue()
q.put(1)
q.put(2)
q.put(3)

print("第一個取出:",q.get())
print("第二個取出:",q.get())

上面是LifoQueue(maxsize=0)的情形,後進入的先被取出。

下面來看一下PriorityQueue的情形,有優先級的queue:

import queue

q = queue.PriorityQueue()

q.put((3,"alex"))
q.put((1,"geng"))
q.put((8,"zeng"))

print("第一個取出",q.get())
print("第二個取出:",q.get())
print("第三個取出:",q.get())
執行結果:
第一個取出 (1, ‘geng‘)
第二個取出: (3, ‘alex‘)
第三個取出: (8, ‘zeng‘)

上面程序中,是有優先級的放入,put((等級,內容)),存放以元組形式放入,前一個是登記,後面一個是消息。用來VIP優先級的情形。

生產者消費者模型

在並發編程中使用生產者和消費者模式能夠解決絕大多數並發問題。該模式通過平衡生產線程和消費線程的工作能力來提高程序的整體處理數據的速度。

為什麽要使用生產者和消費者模式

在線程世界裏,生產者就是生產數據的線程,消費者就是消費數據的線程。在多線程開發當中,如果生產者處理速度很快,而消費者處理速度很慢,那麽生產者就必須等待消費者處理完,才能繼續生產數據。同樣的道理,如果消費者的處理能力大於生產者,那麽消費者就必須等待生產者。為了解決這個問題於是引入了生產者和消費者模式。

什麽是生產者消費者模式

生產者消費者模式是通過一個容器來解決生產者和消費者的強耦合問題。生產者和消費者彼此之間不直接通訊,而通過阻塞隊列來進行通訊,所以生產者生產完數據之後不用等待消費者處理,直接扔給阻塞隊列,消費者不找生產者要數據,而是直接從阻塞隊列裏取,阻塞隊列就相當於一個緩沖區,平衡了生產者和消費者的處理能力。

下面來學習一個最基本的生產者消費者模型的例子:

‘‘‘生產者消費者模型就是兩個線程,一個生產,另外一個消費,兩者相互配合,有交互‘‘‘
import queue,time,threading

def producer(name):
    ‘‘‘定義生產者模型‘‘‘
    count = 1                                                    #初始化變量
    while True:
        q.put("骨頭%s" %count)                                   #生成骨頭
        print("[%s]生成了骨頭%s" %(name,count))
        count += 1                                               #每次生產一個
        time.sleep(0.5)                                          #定義產能,生產效率

def consumer(name):
    ‘‘‘定義消費者模型‘‘‘
    while True:
        print("\033[31m[%s] 吃了[%s]\033[0m" %(name,q.get()))
        time.sleep(1)                                             #定義消費效率


if __name__ == "__main__":
    try:
        q = queue.Queue(maxsize=10)                              #初始化一個Queue,並且定義最大容量
        p = threading.Thread(target=producer,args=("geng",))     #初始化生產者線程
        p.start()
    except KeyboardInterrupt as f:
        print("生產者線程斷開了!!")

    try:
        c=threading.Thread(target=consumer,args=("alex",))
        c.start()
    except KeyboardInterrupt as e:
        print("線程斷開了!!!")
執行結果:
[geng]生成了骨頭1
[alex] 吃了[骨頭1]
[geng]生成了骨頭2
[geng]生成了骨頭3
[alex] 吃了[骨頭2]
[geng]生成了骨頭4
[alex] 吃了[骨頭3]
[geng]生成了骨頭5
[geng]生成了骨頭6
[alex] 吃了[骨頭4]
[geng]生成了骨頭7
[geng]生成了骨頭8
[alex] 吃了[骨頭5]
[geng]生成了骨頭9
[geng]生成了骨頭10
[alex] 吃了[骨頭6]
[geng]生成了骨頭11
[geng]生成了骨頭12
[alex] 吃了[骨頭7]
[geng]生成了骨頭13
[geng]生成了骨頭14

上面就是生產者和消費者的簡單模型,使用了queue(隊列),生成者就是生成商品,然後放到隊列中;消費者就是去這個隊列中根據條件取數,這樣不斷生產和取數,就是簡單的生產者消費者模型,其中time.sleep()是生成效率和消費效率,控制程序的節奏,而count+=1代表消費者生產的能力,每次只生成一個,如果把這個調成10,那麽效率就很高,每次生成完成之後,都要等待很久。當然,要調效率,要修改一下代碼。

隊列queue的源代碼如下:

‘‘‘A multi-producer, multi-consumer queue.‘‘‘

try:
    import threading
except ImportError:
    import dummy_threading as threading
from collections import deque
from heapq import heappush, heappop
from time import monotonic as time

__all__ = [Empty, Full, Queue, PriorityQueue, LifoQueue]

class Empty(Exception):
    Exception raised by Queue.get(block=0)/get_nowait().
    pass

class Full(Exception):
    Exception raised by Queue.put(block=0)/put_nowait().
    pass

class Queue:
    ‘‘‘Create a queue object with a given maximum size.

    If maxsize is <= 0, the queue size is infinite.
    ‘‘‘

    def __init__(self, maxsize=0):
        self.maxsize = maxsize
        self._init(maxsize)

        # mutex must be held whenever the queue is mutating.  All methods
        # that acquire mutex must release it before returning.  mutex
        # is shared between the three conditions, so acquiring and
        # releasing the conditions also acquires and releases mutex.
        self.mutex = threading.Lock()

        # Notify not_empty whenever an item is added to the queue; a
        # thread waiting to get is notified then.
        self.not_empty = threading.Condition(self.mutex)

        # Notify not_full whenever an item is removed from the queue;
        # a thread waiting to put is notified then.
        self.not_full = threading.Condition(self.mutex)

        # Notify all_tasks_done whenever the number of unfinished tasks
        # drops to zero; thread waiting to join() is notified to resume
        self.all_tasks_done = threading.Condition(self.mutex)
        self.unfinished_tasks = 0

    def task_done(self):
        ‘‘‘Indicate that a formerly enqueued task is complete.

        Used by Queue consumer threads.  For each get() used to fetch a task,
        a subsequent call to task_done() tells the queue that the processing
        on the task is complete.

        If a join() is currently blocking, it will resume when all items
        have been processed (meaning that a task_done() call was received
        for every item that had been put() into the queue).

        Raises a ValueError if called more times than there were items
        placed in the queue.
        ‘‘‘
        with self.all_tasks_done:
            unfinished = self.unfinished_tasks - 1
            if unfinished <= 0:
                if unfinished < 0:
                    raise ValueError(task_done() called too many times)
                self.all_tasks_done.notify_all()
            self.unfinished_tasks = unfinished

    def join(self):
        ‘‘‘Blocks until all items in the Queue have been gotten and processed.

        The count of unfinished tasks goes up whenever an item is added to the
        queue. The count goes down whenever a consumer thread calls task_done()
        to indicate the item was retrieved and all work on it is complete.

        When the count of unfinished tasks drops to zero, join() unblocks.
        ‘‘‘
        with self.all_tasks_done:
            while self.unfinished_tasks:
                self.all_tasks_done.wait()

    def qsize(self):
        ‘‘‘Return the approximate size of the queue (not reliable!).‘‘‘
        with self.mutex:
            return self._qsize()

    def empty(self):
        ‘‘‘Return True if the queue is empty, False otherwise (not reliable!).

        This method is likely to be removed at some point.  Use qsize() == 0
        as a direct substitute, but be aware that either approach risks a race
        condition where a queue can grow before the result of empty() or
        qsize() can be used.

        To create code that needs to wait for all queued tasks to be
        completed, the preferred technique is to use the join() method.
        ‘‘‘
        with self.mutex:
            return not self._qsize()

    def full(self):
        ‘‘‘Return True if the queue is full, False otherwise (not reliable!).

        This method is likely to be removed at some point.  Use qsize() >= n
        as a direct substitute, but be aware that either approach risks a race
        condition where a queue can shrink before the result of full() or
        qsize() can be used.
        ‘‘‘
        with self.mutex:
            return 0 < self.maxsize <= self._qsize()

    def put(self, item, block=True, timeout=None):
        ‘‘‘Put an item into the queue.

        If optional args block is true and timeout is None (the default),
        block if necessary until a free slot is available. If timeout is
        a non-negative number, it blocks at most timeout seconds and raises
        the Full exception if no free slot was available within that time.
        Otherwise (block is false), put an item on the queue if a free slot
        is immediately available, else raise the Full exception (timeout
        is ignored in that case).
        ‘‘‘
        with self.not_full:
            if self.maxsize > 0:
                if not block:
                    if self._qsize() >= self.maxsize:
                        raise Full
                elif timeout is None:
                    while self._qsize() >= self.maxsize:
                        self.not_full.wait()
                elif timeout < 0:
                    raise ValueError("‘timeout‘ must be a non-negative number")
                else:
                    endtime = time() + timeout
                    while self._qsize() >= self.maxsize:
                        remaining = endtime - time()
                        if remaining <= 0.0:
                            raise Full
                        self.not_full.wait(remaining)
            self._put(item)
            self.unfinished_tasks += 1
            self.not_empty.notify()

    def get(self, block=True, timeout=None):
        ‘‘‘Remove and return an item from the queue.

        If optional args block is true and timeout is None (the default),
        block if necessary until an item is available. If timeout is
        a non-negative number, it blocks at most timeout seconds and raises
        the Empty exception if no item was available within that time.
        Otherwise (block is false), return an item if one is immediately
        available, else raise the Empty exception (timeout is ignored
        in that case).
        ‘‘‘
        with self.not_empty:
            if not block:
                if not self._qsize():
                    raise Empty
            elif timeout is None:
                while not self._qsize():
                    self.not_empty.wait()
            elif timeout < 0:
                raise ValueError("‘timeout‘ must be a non-negative number")
            else:
                endtime = time() + timeout
                while not self._qsize():
                    remaining = endtime - time()
                    if remaining <= 0.0:
                        raise Empty
                    self.not_empty.wait(remaining)
            item = self._get()
            self.not_full.notify()
            return item

    def put_nowait(self, item):
        ‘‘‘Put an item into the queue without blocking.

        Only enqueue the item if a free slot is immediately available.
        Otherwise raise the Full exception.
        ‘‘‘
        return self.put(item, block=False)

    def get_nowait(self):
        ‘‘‘Remove and return an item from the queue without blocking.

        Only get an item if one is immediately available. Otherwise
        raise the Empty exception.
        ‘‘‘
        return self.get(block=False)

    # Override these methods to implement other queue organizations
    # (e.g. stack or priority queue).
    # These will only be called with appropriate locks held

    # Initialize the queue representation
    def _init(self, maxsize):
        self.queue = deque()

    def _qsize(self):
        return len(self.queue)

    # Put a new item in the queue
    def _put(self, item):
        self.queue.append(item)

    # Get an item from the queue
    def _get(self):
        return self.queue.popleft()


class PriorityQueue(Queue):
    ‘‘‘Variant of Queue that retrieves open entries in priority order (lowest first).

    Entries are typically tuples of the form:  (priority number, data).
    ‘‘‘

    def _init(self, maxsize):
        self.queue = []

    def _qsize(self):
        return len(self.queue)

    def _put(self, item):
        heappush(self.queue, item)

    def _get(self):
        return heappop(self.queue)


class LifoQueue(Queue):
    ‘‘‘Variant of Queue that retrieves most recently added entries first.‘‘‘

    def _init(self, maxsize):
        self.queue = []

    def _qsize(self):
        return len(self.queue)

    def _put(self, item):
        self.queue.append(item)

    def _get(self):
        return self.queue.pop()

看看源代碼,能夠讓自己對這些方法,有更好的理解,之後會多看看源代碼是如何寫的。多參考源代碼的寫法,裏面有很多好的書寫習慣和格式。

day9--隊列queue