Python multiprocessing模組中Queue的通訊問題
先簡單說一下Queue的常用方法:
put:向佇列插入一個數據
get:從佇列取出並刪除一個數據(先進先出)
empty:判斷佇列是否為空
最近發現了一個有意思的地方,在多程序加入Queue後,如果你不告訴其中通過Queue取東西的程序Queue此時已經空了,那麼該程序不會自動中斷,而且好像也不會繼續做其他的事情,像這樣:
# encoding=utf8 from multiprocessing import Process, Queue from time import sleep def get(q): while True: info = q.get() print 'get %s' % info print 'is running' sleep(1) def put(q): for i in range(5): q.put(str(i)) print 'put is done' def main(): print 'main task start' q = Queue() p1 = Process(target=put, args=(q, )) p2 = Process(target=get, args=(q, )) p1.start() p2.start() p1.join() p2.join() print 'main task done' if __name__ == '__main__': main()
執行結果:
main task start
get 0
is running
put is done
get 1
is running
get 2
is running
get 3
is running
get 4
is running
通過執行結果可以知道,主程序一隻被p2程序阻塞著,因為主程序最後的“main task done”一直沒有被打印出來。而且正常來講,就算p2程序沒有停止,也應該持續列印“is running”,但是結果顯示並沒有,暫時還未搞明白是為什麼。不過阻止上面p2造成主程序阻塞的方法還是有的,最簡單的就是呼叫Queue的empty方法:
# encoding=utf8 from multiprocessing import Process, Queue from time import sleep def get(q): while True: info = q.get() print 'get %s' % info print 'is running' if q.empty(): # 如果佇列空了,就退出迴圈 break sleep(1) def put(q): for i in range(5): q.put(str(i)) print 'put is done' def main(): print 'main task start' q = Queue() p1 = Process(target=put, args=(q, )) p2 = Process(target=get, args=(q, )) p1.start() p2.start() p1.join() p2.join() print 'main task done' if __name__ == '__main__': main()
執行結果:
main task start
get 0
is running
put is done
get 1
is running
get 2
is running
get 3
is running
get 4
is running
main task done
另一種方法就是在put中告訴佇列,我已經空了:
# encoding=utf8 from multiprocessing import Process, Queue from time import sleep def get(q): while True: info = q.get() print 'get %s' % info print 'is running' if not info: break sleep(1) def put(q): for i in range(5): q.put(str(i)) q.put(None) print 'put is done' def main(): print 'main task start' q = Queue() p1 = Process(target=put, args=(q, )) p2 = Process(target=get, args=(q, )) p1.start() p2.start() p1.join() p2.join() print 'main task done' if __name__ == '__main__': main()
執行結果:
main task start
put is done
get 0
is running
get 1
is running
get 2
is running
get 3
is running
get 4
is running
get None
is running
main task done
也就是說,想要佇列知道自己是不是空了,一種是呼叫empty方法,另一種就是自己告訴自己一個“結束標記”。但是這兩種方法只適用於get和put各自只有一個程序的情況,當有多個程序在通過同一個Queue中get和put的時候,上面的方法就不適用了。
JoinableQueue
解決上面問題的一個方法就是使用JoinableQueue,不過自己還沒弄太明白,參照別人的例子自己寫了一個:
# encoding=utf8
import multiprocessing
from multiprocessing import Process, JoinableQueue
from time import sleep
import random
s = '\033[31;42m'
e = '\033[0m'
def get(q):
name = multiprocessing.current_process().name
while True:
info = q.get()
print '--- %s%s get %s%s' % (s, name, info, e)
sleep(random.randint(1,2))
q.task_done()
def put(q):
name = multiprocessing.current_process().name
for i in range(3):
print '%s put %s' % (name, i)
q.put(str(i))
sleep(random.randint(1, 2))
q.join()
def main():
print 'main task start'
q = JoinableQueue()
p1 = Process(name='p1', target=put, args=(q, ))
p2 = Process(name='p2', target=put, args=(q, ))
p3 = Process(name='p3', target=put, args=(q, ))
q1 = Process(name='q1', target=get, args=(q, ))
q2 = Process(name='q2', target=get, args=(q, ))
q1.daemon = True
q2.daemon = True
for item in [p1, p2, p3, q1, q2]:
item.start()
p1.join()
p2.join()
p3.join()
print 'main task done'
if __name__ == '__main__':
main()
執行結果:
說明:
JoinableQueue是Queue的子類,增加了task_done()和join()方法。
task_done()用來告訴queue一個task完成。一般地在呼叫get()獲得一個task,在task結束後呼叫task_done()來通知Queue當前task完成。
join() 阻塞直到queue中的所有的task都被處理(即task_done方法被呼叫)。
在上面程式碼中,對於呼叫get的兩個子程序應該設定為守護程序(daemon = True),這裡子程序不會直接被結束,可能是因為JoinableQueue自己會協調put和get,這樣當JoinableQueue中的資料全部被取出後,這兩個子程序才會自動結束。