程序之間的通訊(multiprocess.Queue)
阿新 • • 發佈:2021-07-21
一、程序間通訊
程序之間的資料是相互隔離的,例如
from multiprocessing import Process
def task():
global n # 宣告全域性變數
n = 999
print('子', n)
if __name__ == '__main__':
p = Process(target=task )
n=10
p.start()
print('主',n)
'''輸出結果
主 10
子 999''' # 子程序中的資料並不會影響到父程序的資料
而想做到程序與程序之間的通訊,就需要藉助到第三方的媒介進行資料的交換獲取等操作。
IPC(Inter-Process Communication)程序間通訊
二、佇列
multiprocess.Queue
建立共享的程序佇列,Queue是多程序安全佇列,可以使用Queue實現多程序之間的資料傳遞。
Queue([maxsize]) #建立共享的程序佇列
# maxsize是佇列中允許的最大項數。如果省略此引數,則無大小限制
底層佇列使用管道和鎖定實現。
Queue方法
Queue([maxsize]):建立共享的程序佇列。
Queue例項化之後q具有以下方法:
q.get([ block [ ,timeout ] ]) '''返回q中的一個專案。如果q為空,此方法將阻塞,直到佇列中有專案可用為止。 block用於控制阻塞行為,預設為True. 如果設定為False, 將引發Queue.Empty異常(定義在Queue模組中)。 timeout是可選超時時間,用在阻塞模式中。 如果在制定的時間間隔內沒有專案變為可用,將引發Queue.Empty異常。''' from multiprocessing import Queue if __name__ == '__main__': q = Queue(3) q.put('001') q.put('002') q.put('003') print(q.get()) print(q.get()) print(q.get()) print(q.get(block=False)) # 將block設定為False就不會處在阻塞狀態,一旦取不到值就會報異常。或者將timeout設定超時時間,超過這個時間取不到值也會報異常,不等待。
q.get_nowait() # 取值不會處在阻塞狀態,一旦取不到值就會報異常
q.put_nowait() # 將值放入佇列。如果佇列已滿,就會報異常
q.put(item [, block [,timeout ] ] )
'''將item放入佇列。如果佇列已滿,此方法將阻塞至有空間可用為止。
block控制阻塞行為,預設為True。如果設定為False,將引發Queue.Empty異常(定義在Queue庫模組中)。
timeout指定在阻塞模式中等待可用空間的時間長短。超時後將引發Queue.Full異常。'''
q.qsize() '''返回佇列中目前專案的正確數量。此函式的結果並不可靠, 因為在返回結果和在稍後程式中使用結果之間,佇列中可能新增或刪除了專案。 在某些系統上,此方法可能引發NotImplementedError異常。'''
q.empty()
'''判斷q佇列是否為空,空返回True,否則返回False。
如果其他程序或執行緒正在往佇列中新增專案,結果是不可靠的。也就是說,
在返回和使用結果之間,佇列中可能已經加入新的專案。'''
q.full() '''如果q已滿,返回為True. 由於執行緒的存在,結果也可能是不可靠的(參考q.empty()方法)。'''
Queue解決程序間資料隔離問題
from multiprocessing import Queue, Process
import time
def task(q):
q.put(time.strftime('%Y-%m-%d %H:%M:%S'))
print('資料放完了')
if __name__ == '__main__':
q = Queue(3)
p = Process(target=task, args=(q,))
p.start()
res = q.get()
print('取到了%s' % res)
print('主')
'''輸出結果
資料放完了
取到了2021-07-21 15:46:57
主'''
批量生產資料放入佇列再批量獲取結果
from multiprocessing import Queue, Process
import time
def q_put(q):
q.put(time.strftime('%Y-%m-%d %H:%M:%S'))
print('資料放完了')
def q_get(q):
print(q.get())
if __name__ == '__main__':
q = Queue(3)
p1 = Process(target=q_put, args=(q,))
p2 = Process(target=q_put, args=(q,))
p3 = Process(target=q_get, args=(q,))
p4 = Process(target=q_get, args=(q,))
p1.start()
p2.start()
p3.start()
p4.start()
生產者消費者模型
import os
import random
import time
from multiprocessing import Process, Queue
def producer(queue, product):
for i in range(1, 11):
print('%s號流水線生產了第%s個%s' % (i, i, product))
time.sleep(random.random())
queue.put('第%s個%s' % (i, product))
def consumer(queue):
while True:
res = queue.get()
if not res: break # 發現none直接結束執行
print('%s消費者取走了%s' % (os.getpid(), res))
if __name__ == '__main__':
q = Queue(3)
producer_p = Process(target=producer, args=(q, '大可樂'))
consumer_p = Process(target=consumer, args=(q,))
producer_p.start()
consumer_p.start()
producer_p.join() # 讓子程序先執行完再新增標誌
q.put(None)
多生產者 多消費者
# 生產者:
def producer(queue, food):
# 把資料全部放在Queue
for i in range(10):
data = "這個程序id:%s, 生產了%s個%s" % (os.getpid(), i, food)
print(data)
time.sleep(random.randint(1, 3))
# 放入資料
queue.put("第%s個%s" % (i, food))
def consumer(queue):
while True:
res = queue.get()
if not res:break
data = "這個程序id:%s, 吃了%s" % (os.getpid(), res)
print(data)
if __name__ == '__main__':
q = Queue(3)
p1 = Process(target=producer, args=(q, '麵包'))
p2 = Process(target=producer, args=(q, '奶粉'))
p3 = Process(target=producer, args=(q, '冰淇淋'))
p1.start()
p2.start()
p3.start()
p4 = Process(target=consumer, args=(q,))
p5 = Process(target=consumer, args=(q,))
p4.start()
p5.start()
# time.sleep(1000)
# none放在這裡是不行的,原因是主程序直接執行了put none, 消費者直接獲取到None, 程式直接結束了
# p.join()
# q.put(None)
p1.join()
p2.join()
p3.join()
q.put(None)
q.put(None)
q.put(None)
多生產者 多消費者 消費者大於生產者
# 生產者:
def producer(queue, food):
# 把資料全部放在Queue
for i in range(10):
data = "這個程序id:%s, 生產了%s個%s" % (os.getpid(), i, food)
print(data)
time.sleep(random.randint(1, 3))
# 放入資料
queue.put("第%s個%s" % (i, food))
def consumer(queue, name):
while True:
try:
res = queue.get(timeout=5)
if not res:break
data = "這個消費者:%s, 吃了%s" % (name, res)
print(data)
except Exception as e:
print(e)
break
if __name__ == '__main__':
q = Queue(3)
p1 = Process(target=producer, args=(q, '麵包'))
p2 = Process(target=producer, args=(q, '奶粉'))
p3 = Process(target=producer, args=(q, '冰淇淋'))
p1.start()
p2.start()
p3.start()
p4 = Process(target=consumer, args=(q, '許鵬'))
p5 = Process(target=consumer, args=(q, '勇哥'))
p6 = Process(target=consumer, args=(q, '勇哥2'))
p7 = Process(target=consumer, args=(q, '勇哥3'))
p4.start()
p5.start()
p6.start()
p7.start()
# time.sleep(1000)
# none放在這裡是不行的,原因是主程序直接執行了put none, 消費者直接獲取到None, 程式直接結束了
# p.join()
# q.put(None)
p1.join()
p2.join()
p3.join()
q.put(None)
q.put(None)
q.put(None)