1. 程式人生 > >python關於queue模組

python關於queue模組

1.概述

The queue module implements multi-producer, multi-consumer queues. It is especially useful in threaded programming when information must be exchanged safely between multiple threads. The Queue class in this module implements all the required locking semantics. It depends on the availability of thread support in Python; see the threading module.

佇列模組實現了多生產者, 多消費者佇列; 它線上程程式設計中要求多個執行緒間安全的交換資訊時特別有用, Queue類實現了所有所需的鎖定語義,它取決於python支援的執行緒的可用性;

1)The module implements three types of queue, which differ only in the order in which the entries are retrieved. In a FIFO queue, the first tasks added are the first retrieved. In a LIFO queue, the most recently added entry is the first retrieved (operating like a stack). With a priority queue, the entries are kept sorted (using the heapq module) and the lowest valued entry is retrieved first.
2)Internally, those three types of queues use locks to temporarily block competing threads; however, they are not designed to handle reentrancy within a thread.
3)In addition, the module implements a “simple” FIFO queue type where specific implementations can provide additional guarantees in exchange for the smaller functionality.

模組實現了三種形式的佇列, 只是在放入的資料取出順序上有所不同: 1.FIFO: 先進先出; 2. LIFO: 後進先出; 3.priority:資料進行排序, 最低數字優先順序的先被取出;

在內部, 佇列是通過鎖來實現暫時阻塞競爭執行緒的, 但佇列不是用來處理執行緒內的重用的;

另外, 這個模組還實現了一種簡單的FIFO queue型別, 可以對特定的處理實現額外的保證但同時功能變少了;

2.該模組下的類和異常
(1)class queue.Queue(maxsize=0)

Constructor for a FIFO queue. maxsize is an integer that sets the upperbound limit on the number of items that can be placed in the queue. Insertion will block once this size has been reached, until queue items are consumed. If maxsize is less than or equal to zero, the queue size is infinite.

FIFO的建構函式; maxsize引數是一個整數,規定了可以放入佇列中的元素個數的上限; 如果size已滿, 那再輸入會阻塞, 直到佇列中的元素被消耗; 如果maxsize是0,或小於0, 佇列size是無限的

(2)class queue.LifoQueue(maxsize=0)

Constructor for a LIFO queue. maxsize is an integer that sets the upperbound limit on the number of items that can be placed in the queue. Insertion will block once this size has been reached, until queue items are consumed. If maxsize is less than or equal to zero, the queue size is infinite.

LIFO佇列的建構函式; 其餘同上;

(3)class queue.PriorityQueue(maxsize=0)

1)Constructor for a priority queue. maxsize is an integer that sets the upperbound limit on the number of items that can be placed in the queue. Insertion will block once this size has been reached, until queue items are consumed. If maxsize is less than or equal to zero, the queue size is infinite.
2)The lowest valued entries are retrieved first (the lowest valued entry is the one returned by sorted(list(entries))[0]). A typical pattern for entries is a tuple in the form: (priority_number, data).
3)If the data elements are not comparable, the data can be wrapped in a class that ignores the data item and only compares the priority number

priority queue的建構函式, 其餘同上;

最低優先順序數字的條目被最先取出(最低優先順序數字的條目就是sorted(list(entries))[0]的結果) ; 典型的條目格式如下(priority_number, data)
注意: 所有放入佇列的元素間必須可互相比較

???如果資料元素不可比較,那資料可以被封裝到一個忽略資料只比較priority number的類中進行處理:

from dataclasses import dataclass, field
from typing import Any

@dataclass(order=True)
class PrioritizedItem:
    priority: int
    item: Any=field(compare=False)  

(4)class queue.SimpleQueue

Constructor for an unbounded FIFO queue. Simple queues lack advanced functionality such as task tracking.

無界FIFO的建構函式; 缺少如任務追蹤等高階功能的簡單佇列

(5)exception queue.Empty

Exception raised when non-blocking get() (or get_nowait()) is called on a Queue object which is empty

當佇列物件為空時呼叫非阻塞的get()或get_notwait()方法時丟擲該異常

(6)exception queue.Full

Exception raised when non-blocking put() (or put_nowait()) is called on a Queue object which is full

當佇列已滿並呼叫非阻塞的put()或put_nowait()方法時丟擲該異常

3.佇列物件

(1)Queue.qsize()

Return the approximate size of the queue. Note, qsize() > 0 doesn’t guarantee that a subsequent get() will not block, nor will qsize() < maxsize guarantee that put() will not block

該方法返回佇列的大致大小.??? 返回值大於0不保證get()不會阻塞, 返回值小於最大條目數不保證put()不會阻塞. 要你何用?

(2)Queue.empty()

Return True if the queue is empty, False otherwise. If empty() returns True it doesn’t guarantee that a subsequent call to put() will not block. Similarly, if empty() returns False it doesn’t guarantee that a subsequent call to get() will not block.

如果佇列為空則返回True; 同樣僅作參考,不保證get()和put()不會阻塞.

(3)Queue.full()

Return True if the queue is full, False otherwise. If full() returns True it doesn’t guarantee that a subsequent call to get() will not block. Similarly, if full() returns False it doesn’t guarantee that a subsequent call to put() will not block.

如果佇列已滿返回True. 同樣僅作參考…

(4)Queue.put(item, block=True, timeout=None)

Put item into the queue. If optional args block is true and timeout is None (the default), block if necessary until a free slot is available. If timeout is a positive number, it blocks at most timeout seconds and raises the Full exception if no free slot was available within that time. Otherwise (block is false), put an item on the queue if a free slot is immediately available, else raise the Full exception (timeout is ignored in that case).

將一個條目放入佇列; 如果block引數為True, 那將一直阻塞直到佇列中有空位, (timeout引數給定的話則最多阻塞給定的時間然後就丟擲Full異常); block如果為False, 則佇列如滿則立即丟擲異常, 無視timeout引數.

Queue.put_nowait(item)

Equivalent to put(item, False).

相當於put(item, False)

(5)Queue.get(block=True, timeout=None)

Remove and return an item from the queue. If optional args block is true and timeout is None (the default), block if necessary until an item is available. If timeout is a positive number, it blocks at most timeout seconds and raises the Empty exception if no item was available within that time. Otherwise (block is false), return an item if one is immediately available, else raise the Empty exception (timeout is ignored in that case).

返回佇列中條目的同時從佇列中移除它; 如果引數block引數為True, 那將一直阻塞直到可從佇列中取出資料 (timeout引數給出則最多阻塞給定的時間,如果沒有獲得資料則丟擲empty異常); 如果引數block設為False則沒有獲得資料立即丟擲異常,無視timeout引數.

Queue.get_nowait()

Equivalent to get(False).

相當於get(False)

(6)Queue.task_done()

1)Indicate that a formerly enqueued task is complete. Used by queue consumer threads. For each get() used to fetch a task, a subsequent call to task_done() tells the queue that the processing on the task is complete.
2)If a join() is currently blocking, it will resume when all items have been processed (meaning that a task_done() call was received for every item that had been put() into the queue).
3)Raises a ValueError if called more times than there were items placed in the queue.

表明前面的佇列任務已完成. 被佇列消費者執行緒使用; 對於獲取任務的每個get(), 隨後呼叫task_done()來告訴佇列任務已處理完成;

如果join()方法目前正在阻塞, 那麼當所有條目都已處理後(每個已放入佇列中的元素收到task_done)它就會恢復

如果呼叫次數超出佇列中曾經放入的元素個數

(7)Queue.join()

Blocks until all items in the queue have been gotten and processed.
The count of unfinished tasks goes up whenever an item is added to the queue. The count goes down whenever a consumer thread calls task_done() to indicate that the item was retrieved and all work on it is complete. When the count of unfinished tasks drops to zero, join() unblocks.

阻塞直到佇列中的所有元素已經被獲取並處理; 每當元素被放入佇列, 未完成的任務數就會增加; 每當呼叫task_done()方法, 意味著元素被取出並處理完成, 未完成任務數就會減少; 當未完成任務數減少到0, join()就停止阻塞;

注意:join()是否阻塞只跟task_done()的數量有關,佇列中幾個元素就有幾個相應的task_done(), 否則就算元素都被取走也會一直阻塞, 同樣元素沒有取完但task_done()數量夠了也會變為非阻塞,佇列長用於生產者消費者模型
使用例項:

import threading
import time
import queue


def test1():
    time.sleep(2)
    print('test1 starting')
    l.put('shit', True, None)  # 返回None
    l.put(123)
    l.join()  # 只要task_done()數量小於佇列中元素數量就會阻塞
    print(l.qsize(), l.empty(), l.full())
    print('test1 ending')


def test2():
    print('test2 starting')
    data = l.get(True, None)
    print(data)
    time.sleep(1)
    l.task_done()
    l.task_done()  # 返回None
    print('test2 ending')


def test3():
    time.sleep(4)
    print('test3 starting')
    l1.put((5, 2))
    l1.put((3, 'happy'))  # 優先順序佇列必須是互相間可比較元素
    print('test3 ending')


def test4():
    time.sleep(4)
    print('test4 starting')
    print(l1.get())
    print('test4 ending')


a = threading.Thread(target=test1)
b = threading.Thread(target=test2)
c = threading.Thread(target=test3)
d = threading.Thread(target=test4)

l = queue.Queue()
l1 = queue.PriorityQueue()

a.start()
b.start()
c.start()
d.start()

print('main ending')

#result
test2 starting
main ending
test1 starting
shit
test2 ending
1 False False
test1 ending
test3 starting
test3 ending
test4 starting
(3, 'happy')
test4 ending

4.simple Queue物件的方法
(1)

SimpleQueue.qsize()
Return the approximate size of the queue. Note, qsize() > 0 doesn’t guarantee that a subsequent get() will not block.

SimpleQueue.empty()
Return True if the queue is empty, False otherwise. If empty() returns False it doesn’t guarantee that a subsequent call to get() will not block.

SimpleQueue.put_nowait(item)
Equivalent to put(item), provided for compatibility with Queue.put_nowait().

SimpleQueue.get(block=True, timeout=None)
Remove and return an item from the queue. If optional args block is true and timeout is None (the default), block if necessary until an item is available. If timeout is a positive number, it blocks at most timeout seconds and raises the Empty exception if no item was available within that time. Otherwise (block is false), return an item if one is immediately available, else raise the Empty exception (timeout is ignored in that case).

SimpleQueue.get_nowait()
Equivalent to get(False).

上述五種方法與Queue用法一樣;

(2)
SimpleQueue.put(item, block=True, timeout=None)

Put item into the queue. The method never blocks and always succeeds (except for potential low-level errors such as failure to allocate memory). The optional args block and timeout are ignored and only provided for compatibility with Queue.put().

CPython implementation detail: This method has a C implementation which is reentrant. That is, a put() or get() call can be interrupted by another put() call in the same thread without deadlocking or corrupting internal state inside the queue. This makes it appropriate for use in destructors such as __ del__ methods or weakref callbacks.

put()方法有區別,見上面說明,較少用到