1. 程式人生 > 其它 >程序之間的通訊(multiprocess.Queue)

程序之間的通訊(multiprocess.Queue)

一、程序間通訊

程序之間的資料是相互隔離的,例如

from multiprocessing import Process


def task():
    global n  # 宣告全域性變數
    n = 999
    print('子', n)


if __name__ == '__main__':
    p = Process(target=task )
    n=10
    p.start()
    print('主',n)
    
'''輸出結果
主 10
子 999''' # 子程序中的資料並不會影響到父程序的資料

而想做到程序與程序之間的通訊,就需要藉助到第三方的媒介進行資料的交換獲取等操作。

IPC(Inter-Process Communication)程序間通訊

二、佇列

multiprocess.Queue

建立共享的程序佇列,Queue是多程序安全佇列,可以使用Queue實現多程序之間的資料傳遞。

Queue([maxsize]) #建立共享的程序佇列
# maxsize是佇列中允許的最大項數。如果省略此引數,則無大小限制

底層佇列使用管道和鎖定實現。

Queue方法

Queue([maxsize]):建立共享的程序佇列。

Queue例項化之後q具有以下方法:

q.get([ block [ ,timeout ] ])
'''返回q中的一個專案。如果q為空,此方法將阻塞,直到佇列中有專案可用為止。
block用於控制阻塞行為,預設為True. 如果設定為False,
將引發Queue.Empty異常(定義在Queue模組中)。
timeout是可選超時時間,用在阻塞模式中。
如果在制定的時間間隔內沒有專案變為可用,將引發Queue.Empty異常。'''
from  multiprocessing import  Queue

if __name__ == '__main__':

    q = Queue(3)

    q.put('001')
    q.put('002')
    q.put('003')
    print(q.get())
    print(q.get())
    print(q.get())
    print(q.get(block=False))
    
# 將block設定為False就不會處在阻塞狀態,一旦取不到值就會報異常。或者將timeout設定超時時間,超過這個時間取不到值也會報異常,不等待。
q.get_nowait()  # 取值不會處在阻塞狀態,一旦取不到值就會報異常
q.put_nowait()  # 將值放入佇列。如果佇列已滿,就會報異常
q.put(item [, block [,timeout ] ] )
'''將item放入佇列。如果佇列已滿,此方法將阻塞至有空間可用為止。
block控制阻塞行為,預設為True。如果設定為False,將引發Queue.Empty異常(定義在Queue庫模組中)。
timeout指定在阻塞模式中等待可用空間的時間長短。超時後將引發Queue.Full異常。'''
q.qsize()
'''返回佇列中目前專案的正確數量。此函式的結果並不可靠,
因為在返回結果和在稍後程式中使用結果之間,佇列中可能新增或刪除了專案。
在某些系統上,此方法可能引發NotImplementedError異常。'''
q.empty()
'''判斷q佇列是否為空,空返回True,否則返回False。
如果其他程序或執行緒正在往佇列中新增專案,結果是不可靠的。也就是說,
在返回和使用結果之間,佇列中可能已經加入新的專案。'''
q.full() '''如果q已滿,返回為True. 由於執行緒的存在,結果也可能是不可靠的(參考q.empty()方法)。'''

Queue解決程序間資料隔離問題

from multiprocessing import Queue, Process
import time


def task(q):
    q.put(time.strftime('%Y-%m-%d %H:%M:%S'))
    print('資料放完了')


if __name__ == '__main__':
    q = Queue(3)
    p = Process(target=task, args=(q,))
    p.start()
    res = q.get()
    print('取到了%s' % res)
    print('主')
    
'''輸出結果
資料放完了
取到了2021-07-21 15:46:57
主'''

批量生產資料放入佇列再批量獲取結果

from multiprocessing import Queue, Process
import time


def q_put(q):
    q.put(time.strftime('%Y-%m-%d %H:%M:%S'))
    print('資料放完了')


def q_get(q):
    print(q.get())


if __name__ == '__main__':
    q = Queue(3)
    p1 = Process(target=q_put, args=(q,))
    p2 = Process(target=q_put, args=(q,))
    p3 = Process(target=q_get, args=(q,))
    p4 = Process(target=q_get, args=(q,))
    p1.start()
    p2.start()
    p3.start()
    p4.start()

生產者消費者模型

import os
import random
import time
from multiprocessing import Process, Queue


def producer(queue, product):
    for i in range(1, 11):
        print('%s號流水線生產了第%s個%s' % (i, i, product))
        time.sleep(random.random())
        queue.put('第%s個%s' % (i, product))


def consumer(queue):
    while True:
        res = queue.get()
        if not res: break  # 發現none直接結束執行
        print('%s消費者取走了%s' % (os.getpid(), res))


if __name__ == '__main__':
    q = Queue(3)
    producer_p = Process(target=producer, args=(q, '大可樂'))
    consumer_p = Process(target=consumer, args=(q,))
    producer_p.start()
    consumer_p.start()
    producer_p.join()  # 讓子程序先執行完再新增標誌
    q.put(None)
多生產者 多消費者
# 生產者:
def producer(queue, food):
    # 把資料全部放在Queue
    for i in range(10):
        data = "這個程序id:%s, 生產了%s個%s" % (os.getpid(), i, food)
        print(data)

        time.sleep(random.randint(1, 3))
        # 放入資料
        queue.put("第%s個%s" % (i, food))



def consumer(queue):
    while True:
        res = queue.get()
        if not res:break
        data = "這個程序id:%s, 吃了%s" % (os.getpid(), res)
        print(data)


if __name__ == '__main__':
    q = Queue(3)
    p1 = Process(target=producer, args=(q, '麵包'))
    p2 = Process(target=producer, args=(q, '奶粉'))
    p3 = Process(target=producer, args=(q, '冰淇淋'))
    p1.start()
    p2.start()
    p3.start()

    p4 = Process(target=consumer, args=(q,))
    p5 = Process(target=consumer, args=(q,))
    p4.start()
    p5.start()

    # time.sleep(1000)
    # none放在這裡是不行的,原因是主程序直接執行了put none, 消費者直接獲取到None, 程式直接結束了
    # p.join()
    # q.put(None)

    p1.join()
    p2.join()
    p3.join()

    q.put(None)
    q.put(None)
    q.put(None)
多生產者 多消費者  消費者大於生產者
# 生產者:
def producer(queue, food):
    # 把資料全部放在Queue
    for i in range(10):
        data = "這個程序id:%s, 生產了%s個%s" % (os.getpid(), i, food)
        print(data)

        time.sleep(random.randint(1, 3))
        # 放入資料
        queue.put("第%s個%s" % (i, food))



def consumer(queue, name):
    while True:
        try:

            res = queue.get(timeout=5)
            if not res:break
            data = "這個消費者:%s, 吃了%s" % (name, res)
            print(data)
        except Exception as e:
            print(e)
            break


if __name__ == '__main__':
    q = Queue(3)
    p1 = Process(target=producer, args=(q, '麵包'))
    p2 = Process(target=producer, args=(q, '奶粉'))
    p3 = Process(target=producer, args=(q, '冰淇淋'))
    p1.start()
    p2.start()
    p3.start()

    p4 = Process(target=consumer, args=(q, '許鵬'))
    p5 = Process(target=consumer, args=(q, '勇哥'))
    p6 = Process(target=consumer, args=(q, '勇哥2'))
    p7 = Process(target=consumer, args=(q, '勇哥3'))
    p4.start()
    p5.start()
    p6.start()
    p7.start()

    # time.sleep(1000)
    # none放在這裡是不行的,原因是主程序直接執行了put none, 消費者直接獲取到None, 程式直接結束了
    # p.join()
    # q.put(None)

    p1.join()
    p2.join()
    p3.join()

    q.put(None)
    q.put(None)
    q.put(None)