筆記-python-standard library-17.7 queue
筆記-python-standard library-17.7 queue
1. queue
source code:Lib/queue.py
該模組實現了多生產者,多消費者佇列。
此模組實現了所有的required locking semantics.
模組支援三種類型的佇列,區別僅僅在於檢索的順序。
三種佇列分別是FIFO,LIFO,優先順序佇列(使用heaq模組,優先丟擲最小值)。
1.1. 模組中定義的類
class queue.Queue(maxsize=0)
class queue.LifoQueue(maxsize=0)
class queue.PriorityQueue(maxsize=0)
注意:優先順序佇列中優先丟擲最小值。
exception queue.Empty
Exception raised when non-blocking get() (or get_nowait()) is called on a Queue object which is empty.
exception queue.Full
Exception raised when non-blocking put() (or put_nowait()) is called on a Queue object which is full.
1.2. 類方法
三種佇列類都提供了以下方法:
1. Queue.qsize()返回佇列大致的大小,注意qsize()>0不代表get不會阻塞,同樣,qsize()<maxsize不代表put不會阻塞
2. Queue.empty()
3. Queue.full()
4. Queue.put(item, block=True, timeout=None)
block決定是否阻塞,timeout決定最長阻塞時間
5. Queue.put_nowait(item)
6. Queue.get_nowait(): equal to get(False)
下面兩個方法來跟蹤佇列中的任務是否被執行完。
7. Queue.task_done()
8. Queue.join()
阻塞直到佇列中的所有任務完成。
其它屬性
maxsize:佇列最大長度,可以在初始化時給出,也可建立後手動設定。
1.1. productor and consumer
queue實現生產者與消費者
q = queue.Queue()
def productor(arg):
while True:
if q.qsize() < 30:
q.put(str(arg)+'banana')
time.sleep(1)
def consumer(arg):
while True:
print('con {}, pro {}'.format(arg, q.get()))
time.sleep(2)
for i in range(3):
t = threading.Thread(target=productor, args=(i,))
t.start()
for j in range(5):
t = threading.Thread(target=consumer, args=(j,))
t.start()
1.2. task_done and join
文件如下:
def task_done(self):
'''Indicate that a formerly enqueued task is complete.
Used by Queue consumer threads. For each get() used to fetch a task,
a subsequent call to task_done() tells the queue that the processing
on the task is complete.
If a join() is currently blocking, it will resume when all items
have been processed (meaning that a task_done() call was received
for every item that had been put() into the queue).
Raises a ValueError if called more times than there were items
placed in the queue.
'''
with self.all_tasks_done:
unfinished = self.unfinished_tasks - 1
if unfinished <= 0:
if unfinished < 0:
raise ValueError('task_done() called too many times')
self.all_tasks_done.notify_all()
self.unfinished_tasks = unfinished
關鍵是如果不使用task_done,阻塞不會結束,
下面的程式碼把紅色行刪掉,執行緒會阻塞在q.join()處,
q = queue.Queue()
q.maxsize = 100
def productor(arg):
while True:
if q.qsize() >50:
q.join()
else:
q.put(str(arg)+' banana')
time.sleep(0.5)
def consumer(arg):
while True:
print('con {}, pro {}'.format(arg, q.get()))
q.task_done()
time.sleep(2)
def start_t():
for i in range(4):
t = threading.Thread(target=productor, args=(i,))
t.start()
for j in range(5):
t = threading.Thread(target=consumer, args=(j,))
t.start()
time.sleep(1)
while True:
q_length = q.qsize()
if q_length == 0:
pass
#break
else:
print("queue's size is {}".format(q_length))
time.sleep(2)
start_t()
time.sleep(0.5)
print(r'end {}')
print(threading.enumerate())
2. 小結:
- queue是執行緒安全的
- FIFO實際使用的是dqueue,
- 在使用join時一定要使用task_done