python程序間通訊--Queue
阿新 • • 發佈:2019-01-31
multiprocessing模組中的Queue可以實現多程序之間的資料傳遞
初始化Queue()物件時 (例如:q = Qqueue()),若括號中沒有指定最大可接受的訊息數量或者數量為負數,則代表可接受的訊息數量沒有上線(直到記憶體的盡頭)
Queue類下的幾個常用方法:
Queue.qsize() 返回當前佇列包含的訊息數量
Queue.empty() 若佇列為空,返回True,反之False
Queue.full() 若佇列滿了,返回True,反之False
Queue.get([block[,timeout]]) 獲取佇列中的一條訊息,並將其從佇列中移除,block預設True
# 1.block(阻塞)使用預設值,並沒有設定timeout(單位:秒),若訊息佇列為空,程式將
# 被阻塞(停在讀取狀態),直到從訊息佇列讀到訊息為止,若設定了timeout,則會等待timeout秒,
# 若還沒有讀到任何訊息則丟擲"Queue.Empty異常"
# 2.若block值為False,訊息佇列為空,則立即丟擲"Queue.Empty異常"
Queue.get_nowait(): 相當於Queue.get(False)
Queue.put(item,[block[,timeout]]) 將item訊息寫入佇列,block預設值True
# 1.若block使用預設值,且沒有設定timeout(單位:秒),訊息佇列如果已經沒有空間可寫入,此時程式會
# 被阻塞(停在寫入狀態),直到訊息佇列騰出空間為止,如果設定了timeout,則會等待timeout秒,如果
# 還沒有空間,則丟擲"Queue.Full"異常
# 2.若block值為False,訊息佇列如果沒有空間寫入,則立即丟擲"Queue.Full"異常
Queue.put_nowait(item): 相當於Queue.put(item,False)
用一個小示例演示Queue工作原理:
from multiprocessing import Queue
if __name__ == '__main__':
#初始化
q = Queue(3 ) # 初始化一個Queue物件,最多可接受三條put訊息
#佇列儲存訊息
q.put("msg1")
q.put("msg2")
print(q.full())# 結果為False
q.put("msg3")
print(q.full())# 結果為True
# 因為訊息佇列已滿,下面的兩個 try函式會丟擲異常
try:
q.put("msg4",block=True,timeout=2)
except:
print("訊息佇列已滿,現在有訊息數量:%d"%q.qsize())
try:
q.put_nowait("msg4")
except:
print("訊息佇列已滿,現在有訊息數量:%d"%q.qsize())
#推薦這種方式
if not q.full():
q.put("msg5")
#讀取訊息
#格式:訊息佇列物件.get()
print(q.get())
print("現在有訊息數量:%d" % q.qsize())
q.put("msg6")
while q.qsize()>0:
print(q.get())
再寫一個例項,在父程序中建立兩個子程序,一個向Queue寫資料,一個從Queue讀資料
from multiprocessing import Process,Queue
# 寫入資料
def write(q):
values = ['A','B', 'C', 'D', 'E']
for i in values:
q.put(i)
print('put %s to queue...' % i)
# 讀取資料
def read(q):
while True:
if not q.empty():
print('get %s from queue!!' % q.get())
else:
break
if __name__ == '__main__':
q = Queue()
p1 = Process(target = write,args=(q,))
p1.start()
p1.join()
p2 = Process(target = read,args=(q,))
p2.start()
p2.join()
print('所有資料讀取完成')
執行結果為:
C:\ProgramData\Anaconda3\python.exe D:/PycharmProjects/firstproject/03_21/test02.py
put A to queue...
put B to queue...
put C to queue...
put D to queue...
put E to queue...
get A from queue!!
get B from queue!!
get C from queue!!
get D from queue!!
get E from queue!!
所有資料讀取完成
Process finished with exit code 0
程序池中的Queue
如果使用Pool建立程序,就需要用multiprocessing.Manager()中的Queue(), 而不是multiprocessing.Queue()
給一個具體例項說明
from multiprocessing import Manager,Pool
import os
def writer(q):
print('writer啟動%s' % os.getpid())
for i in 'jiaobaba':
q.put(i)
def reader(q):
print('reader啟動%s' % os.getpid())
for i in range(q.qsize()):
print('reader從Queue中讀到訊息: %s' % q.get())
if __name__ == '__main__':
q = Manager().Queue() # 用於在程序池中使用的訊息佇列
po = Pool()
po.apply_async(func = writer, args = (q,))
po.apply_async(func = reader, args = (q,))
po.close()
po.join()
執行結果:
C:\ProgramData\Anaconda3\python.exe D:/PycharmProjects/firstproject/03_21/pool.py
writer啟動1044
reader啟動1044
reader從Queue中讀到訊息: j
reader從Queue中讀到訊息: i
reader從Queue中讀到訊息: a
reader從Queue中讀到訊息: o
reader從Queue中讀到訊息: b
reader從Queue中讀到訊息: a
reader從Queue中讀到訊息: b
reader從Queue中讀到訊息: a
Process finished with exit code 0