1. 程式人生 > >筆記-python-standard library-17.7 queue

筆記-python-standard library-17.7 queue

 筆記-python-standard library-17.7 queue

 

1.  queue

source code:Lib/queue.py

該模組實現了多生產者,多消費者佇列。

此模組實現了所有的required locking semantics.

模組支援三種類型的佇列,區別僅僅在於檢索的順序。

三種佇列分別是FIFO,LIFO,優先順序佇列(使用heaq模組,優先丟擲最小值)。

 

1.1.  模組中定義的類

class queue.Queue(maxsize=0)

class queue.LifoQueue(maxsize=0)

class queue.PriorityQueue(maxsize=0)

注意:優先順序佇列中優先丟擲最小值。

 

exception queue.Empty

Exception raised when non-blocking get() (or get_nowait()) is called on a Queue object which is empty.

 

exception queue.Full

Exception raised when non-blocking put() (or put_nowait()) is called on a Queue object which is full.

 

1.2.  類方法

三種佇列類都提供了以下方法:

1.   Queue.qsize()返回佇列大致的大小,注意qsize()>0不代表get不會阻塞,同樣,qsize()<maxsize不代表put不會阻塞

2.   Queue.empty()    

3.   Queue.full()

4.   Queue.put(item, block=True, timeout=None)

block決定是否阻塞,timeout決定最長阻塞時間

5.   Queue.put_nowait(item)

6.   Queue.get_nowait(): equal to get(False)

下面兩個方法來跟蹤佇列中的任務是否被執行完。

7.   Queue.task_done()

8.   Queue.join()

阻塞直到佇列中的所有任務完成。

 

其它屬性

maxsize:佇列最大長度,可以在初始化時給出,也可建立後手動設定。

 

1.1.    productor and consumer

queue實現生產者與消費者

q = queue.Queue()

def productor(arg):
    while True:
        if q.qsize() < 30:
            q.put(str(arg)+'banana')
        time.sleep(1)

def consumer(arg):
    while True:
        print('con {}, pro {}'.format(arg, q.get()))
        time.sleep(2)

for i in range(3):
    t = threading.Thread(target=productor, args=(i,))
    t.start()

for j in range(5):
    t = threading.Thread(target=consumer, args=(j,))
    t.start()

 

1.2.    task_done and join

文件如下:

    def task_done(self):

        '''Indicate that a formerly enqueued task is complete.

 

        Used by Queue consumer threads.  For each get() used to fetch a task,

        a subsequent call to task_done() tells the queue that the processing

        on the task is complete.

 

        If a join() is currently blocking, it will resume when all items

        have been processed (meaning that a task_done() call was received

        for every item that had been put() into the queue).

 

        Raises a ValueError if called more times than there were items

        placed in the queue.

        '''

        with self.all_tasks_done:

            unfinished = self.unfinished_tasks - 1

            if unfinished <= 0:

                if unfinished < 0:

                    raise ValueError('task_done() called too many times')

                self.all_tasks_done.notify_all()

            self.unfinished_tasks = unfinished

 

關鍵是如果不使用task_done,阻塞不會結束,

下面的程式碼把紅色行刪掉,執行緒會阻塞在q.join()處,

q = queue.Queue()
q.maxsize = 100

def productor(arg):
    while True:
        if q.qsize() >50:
            q.join()
        else:
            q.put(str(arg)+' banana')
        time.sleep(0.5)

def consumer(arg):
    while True:
        print('con {}, pro {}'.format(arg, q.get()))
        q.task_done()
        time.sleep(2)


def start_t():
    for i in range(4):
        t = threading.Thread(target=productor, args=(i,))
        t.start()

    for j in range(5):
        t = threading.Thread(target=consumer, args=(j,))
        t.start()

    time.sleep(1)
    while True:
        q_length = q.qsize()
        if q_length == 0:
            pass
           
#break
       
else:
            print("queue's size is {}".format(q_length))
            time.sleep(2)

start_t()
time.sleep(0.5)
print(r'end {}')
print(threading.enumerate())

 

2.      小結:

  1. queue是執行緒安全的
  2. FIFO實際使用的是dqueue,
  3. 在使用join時一定要使用task_done