1. 程式人生 > 實用技巧 >python 佇列(queue)阻塞

python 佇列(queue)阻塞

本文連結:https://www.cnblogs.com/tujia/p/13637535.html

背景:python佇列 queue.Queue或 multiprcessing.Queue或其他佇列在寫入佇列或從佇列中讀取元素時,都有可能會發生執行緒阻塞。

相關官方文件:https://docs.python.org/zh-cn/3/library/queue.html

下面來說一下阻塞有什麼作用,然後怎麼避免阻塞~

一、阻塞的型別

佇列的阻塞分為:入隊(put)時的阻塞、出隊(get)時的阻塞、整體(join)的阻塞(消費的阻塞)

二、入隊的阻塞

import queue


def 入隊阻塞():
    q 
= queue.Queue(maxsize=3) for i in range(4): q.put('任務' + str(i+1)) print('Finished') if __name__ == '__main__': 入隊阻塞()

注:因為定義的佇列的 maxsize=3,但 put 了4個元素進佇列,第4個元素將無法 put進佇列,發生阻塞;注意:就算不設定 maxsize,電腦的記憶體也是有限的,佇列也是會滿的。當佇列已滿,做 put操作時,一樣會發生阻塞。

正確的處理方法:

import queue


def 入隊阻塞():
    q 
= queue.Queue(maxsize=3) for i in range(4): try: q.put('任務' + str(i+1), block=True, timeout=3) except queue.Full: print('任務%d: 佇列已滿,寫入失敗' % (i+1)) print('Finished') if __name__ == '__main__': 入隊阻塞()

注:設定 timeout超時時間,並捕捉 queue.Full異常;設定tomeout一樣會阻塞執行緒,但timeout之後,可以繼續操行程式。如果不想使用 timeout選項,也可以直接設定 block(阻塞)為False,或者直接使用 q.put_nowait方法(注意:當佇列已滿的時候 ,put_nowait一樣會觸發 queue.Full異常)

三、出隊的阻塞

import queue


def 出隊阻塞():
    q = queue.Queue(maxsize=3)
    for i in range(3):
        try:
            q.put_nowait('任務' + str(i+1))
        except queue.Full:
            print('full')

    for i in range(4):
        task = q.get()
        print(task)

    print('Finished')


if __name__ == '__main__':
    出隊阻塞()

注:佇列裡只有3個元素,但get了4次。第4次get的時候,不會返回空,而是會發生阻塞。

正確的處理方法:

import queue


def 出隊阻塞():
    q = queue.Queue(maxsize=3)
    for i in range(3):
        try:
            q.put_nowait('任務' + str(i+1))
        except queue.Full:
            print('full')

    for i in range(4):
        try:
            task = q.get(block=True, timeout=3)
            print(task)
        except queue.Empty:
            print('佇列為空,get失敗')

    print('Finished')


if __name__ == '__main__':
    出隊阻塞()

注:設定 timeout超時時間,並捕捉 queue.Empty 異常;設定tomeout一樣會阻塞執行緒,但timeout之後,可以繼續操行程式。如果不想使用 timeout選項,也可以直接設定 block(阻塞)為False,或者直接使用 q.get_nowait方法(注意:當佇列為空的時候 ,get_nowait一樣會觸發 queue.Empty 異常)

四、消費阻塞(正確來說,應該是未消費完時的阻塞)

import queue


def 消費阻塞():
    q = queue.Queue(maxsize=3)
    for i in range(3):
        try:
            q.put_nowait('任務' + str(i+1))
        except queue.Full:
            print('full')

    for i in range(2):
        try:
            task = q.get(block=True, timeout=3)
            print(task)
            q.task_done()
        except queue.Empty:
            print('佇列為空,get失敗')
    # 阻塞佇列
    q.join()
    print('Finished')


if __name__ == '__main__':
    消費阻塞()

注:佇列裡設定了3個任務,但只調用了兩次 task_done(標記兩個任務已完成),還有一個任務未處理,佇列將阻塞至第三個任務被消費(標誌為 task_done)


上面說完了各種阻塞,下面來說一下阻塞作用~~

五、入隊阻塞的作用

很明顯,當我要做入隊操作時,如果佇列已滿時,我不會說馬上掉頭就走,而是會等一下,看有沒有人出隊,然後,我就可以擠上去了。這就是入隊阻塞的作用。

例如非同步(asyncio)或多執行緒(Thread)操作同一個佇列(queue),下面看一下使用 asyncio非同步操作Queue的例子:

import time
import queue
import asyncio


def get_now():
    return time.strftime('%X')


# 入隊
async def qput(q):
    for i in range(5):
        # 每1秒寫入一個元素
        await asyncio.sleep(1)
        try:
            await q.put(i)
            print('%s: %d 入隊' % (get_now(), i))
        except queue.Full:
            print('Full')


# 出隊
async def qget(q):
    for i in range(5):
        # 每2秒消費一個元素
        await asyncio.sleep(2)
        try:
            item = await q.get()
            print('%s: %d 出隊' % (get_now(), item))
        except queue.Empty:
            print('Empty')


async def main():
    q = asyncio.Queue(maxsize=3)

    print('%s: Start' % get_now())
    await asyncio.gather(qput(q), qget(q))
    print('%s: Finished' % get_now())


if __name__ == '__main__':
    asyncio.run(main())

執行結果大概是這樣:

六、出隊阻塞的作用

出隊阻塞和入隊阻塞是一樣的。假設你是一個包工頭,看到應聘的佇列裡沒有人,不要著急著馬上走啊,等一下可能就有人過來應聘了。這就是 get阻塞的作用。

下面來看一下 asyncio非同步操作 queue的例子:

import time
import queue
import asyncio
import random


def get_now():
    return time.strftime('%X')


# 招工
async def 招工(q):
    print('包工頭:招人了喂,管吃管喝、五險一金~')
    worker_count = 0
    for i in range(q.maxsize):
        try:
            # 就等10秒
            worker = await asyncio.wait_for(q.get(), timeout=10)
            worker_count = worker_count + 1
            print('%s: 面試【%s】,通過/入職' % (get_now(), worker))
        except asyncio.TimeoutError:
            # 10秒內都沒人來,直接提前下班了
            print('包工頭:唉,都沒人來應聘,今天只能提前下班了~')
            exit(0)

    print('包工頭:招夠了,可以下班了~~')


# 應聘
async def 應聘(q):
    workers = ['張三', '李四', '王五', '趙六', '陳七']
    for name in workers:
        # 不定時有人來應聘。注:時間要控制到10秒內,10秒內都沒人來,包工頭就要提前下班了
        await asyncio.sleep(random.randint(1, 10))
        try:
            await q.put(name)
            print('%s: 【%s】 去應聘了' % (get_now(), name))
        except queue.Full:
            print('Full')


async def main():
    # 上級給任務了,要招夠5個人
    q = asyncio.Queue(maxsize=5)

    print('%s: Start' % get_now())
    await asyncio.gather(招工(q), 應聘(q))
    print('%s: Finished' % get_now())


if __name__ == '__main__':
    asyncio.run(main())

執行結果大概是這樣:

注:須要注意一下,asyncio操作 queue時,不能用原生的 queue.Queue,要用 asyncio.Queue

參考連結:

https://docs.python.org/zh-cn/3/library/queue.html

https://docs.python.org/zh-cn/3.7/library/asyncio-task.html

https://docs.python.org/zh-cn/3/library/asyncio-queue.html

本文連結:https://www.cnblogs.com/tujia/p/13637535.html


完。