Python實現程序同步和通訊
引例:
如之前建立多程序的例子
# -*- coding:utf-8 -*-
from multiprocessing import Process,Pool
import os,time
def run_proc(name): ##定義一個函式用於程序呼叫
for i in range(5):
time.sleep(0.2) #休眠0.2秒
print 'Run child process %s (%s)' % (name, os.getpid())
#執行一次該函式共需1秒的時間
if __name__ =='__main__' : #執行主程序
print 'Run the main process (%s).' % (os.getpid())
mainStart = time.time() #記錄主程序開始的時間
p = Pool(8) #開闢程序池
for i in range(16): #開闢14個程序
p.apply_async(run_proc,args=('Process'+str(i),))#每個程序都呼叫run_proc函式,
#args表示給該函式傳遞的引數。
print 'Waiting for all subprocesses done ...'
p.close() #關閉程序池
p.join() #等待開闢的所有程序執行完後,主程序才繼續往下執行
print 'All subprocesses done'
mainEnd = time.time() #記錄主程序結束時間
print 'All process ran %0.2f seconds.' % (mainEnd-mainStart) #主程序執行時間
執行結果:
Run the main process (36652).
Waiting for all subprocesses done …
Run child process Process0 (36708)Run child process Process1 (36748)Run child process Process3 (36736)
Run child process Process2 (36716)
Run child process Process4 (36768)
如第3行的輸出,偶爾會出現這樣不如意的輸入格式,為什麼呢?
原因是多個程序爭用列印輸出資源的結果。前一個程序為來得急輸出換行符,該資源就切換給了另一個程序使用,致使兩個程序輸出在同一行上,而前一個程序的換行符在下一次獲得資源時才打印輸出。
Lock
為了避免這種情況,需在程序進入臨界區(使程序進入臨界資源的那段程式碼,稱為臨界區)時加鎖。
可以向如下這樣新增鎖後看看執行效果:
# -*- coding:utf-8 -*-
lock = Lock() #申明一個全域性的lock物件
def run_proc(name):
global lock #引用全域性鎖
for i in range(5):
time.sleep(0.2)
lock.acquire() #申請鎖
print 'Run child process %s (%s)' % (name, os.getpid())
lock.release() #釋放鎖
Semaphore
Semaphore為訊號量機制。當共享的資源擁有多個時,可用Semaphore來實現程序同步。其用法和Lock差不多,s = Semaphore(N),每執行一次s.acquire(),該資源的可用個數將減少1,當資源個數已為0時,就進入阻塞;每執行一次s.release(),佔用的資源被釋放,該資源的可用個數增加1。
多程序的通訊(資訊互動)
不同程序之間進行資料互動,可能不少剛開始接觸多程序的同學會想到共享全域性變數的方式,這樣通過向全域性變數寫入和讀取資訊便能實現資訊互動。但是很遺憾,並不能這樣實現。具體原因,看這篇文章。
下面通過例子,加深對那篇文章的理解:
# -*- coding:utf-8 -*-
from multiprocessing import Process, Pool
import os
import time
L1 = [1, 2, 3]
def add(a, b):
global L1
L1 += range(a, b)
print L1
if __name__ == '__main__':
p1 = Process(target=add, args=(20, 30))
p2 = Process(target=add, args=(30, 40))
p1.start()
p2.start()
p1.join()
p2.join()
print L1
輸出結果:
[1, 2, 3, 20, 21, 22, 23, 24, 25, 26, 27, 28, 29]
[1, 2, 3, 30, 31, 32, 33, 34, 35, 36, 37, 38, 39]
[1, 2, 3]
該程式的原本目的是想將兩個子程序生成的列表加到全域性變數L1中,但用該方法並不能達到想要的效果。既然不能通過全域性變數來實現不同程序間的資訊互動,那有什麼辦法呢。
mutiprocessing為我們可以通過Queue和Pipe來實現程序間的通訊。
Queue
按上面的例子通過Queue來實現:
# -*- coding:utf-8 -*-
from multiprocessing import Process, Queue, Lock
L = [1, 2, 3]
def add(q, lock, a, b):
lock.acquire() # 加鎖避免寫入時出現不可預知的錯誤
L1 = range(a, b)
lock.release()
q.put(L1)
print L1
if __name__ == '__main__':
q = Queue()
lock = Lock()
p1 = Process(target=add, args=(q, lock, 20, 30))
p2 = Process(target=add, args=(q, lock, 30, 40))
p1.start()
p2.start()
p1.join()
p2.join()
L += q.get() + q.get()
print L
執行結果:
[20, 21, 22, 23, 24, 25, 26, 27, 28, 29]
[30, 31, 32, 33, 34, 35, 36, 37, 38, 39]
[1, 2, 3, 20, 21, 22, 23, 24, 25, 26, 27, 28, 29, 30, 31, 32, 33, 34, 35, 36, 37, 38, 39]
下面介紹Queue的常用方法:
- 定義時可用q = Queue(maxsize = 10)來指定佇列的長度,預設時或maxsize值小於1時佇列為無限長度。
- q.put(item)方法向佇列放入元素,其還有一個可選引數block,預設為True,此時若佇列已滿則會阻塞等待,直到有空閒位置。而當black值為 False,在該情況下就會丟擲Full異 常
- Queue是不可迭代的物件,不能通過for迴圈取值,取值時每次呼叫q.get()方法。同樣也有可選引數block,預設為True,若此時佇列為空則會阻塞等待。而black值為False時,在該情況下就會丟擲Empty異常
- Queue.qsize() 返回佇列的大小
- Queue.empty() 如果佇列為空,返回True,反之False
- Queue.full() 如果佇列滿了,返回True,反之False
- Queue.get([block[, timeout]]) 獲取佇列,timeout等待時間Queue.get_nowait() 相當Queue.get(False) 非阻塞 Queue.put(item) 寫入佇列,timeout等待時間
- Queue.put_nowait(item) 相當Queue.put(item, False)
Pipe
Pipe管道,可以是單向(half-duplex),也可以是雙向(duplex)。我們通過mutiprocessing.Pipe(duplex=False)建立單向管道 (預設為雙向)。雙向Pipe允許兩端的進即可以傳送又可以接受;單向的Pipe只允許前面的埠用於接收,後面的埠用於傳送。
下面給出例子:
# -*- coding:utf-8 -*-
from multiprocessing import Process, Pipe
def proc1(pipe):
s = 'Hello,This is proc1'
pipe.send(s)
def proc2(pipe):
while True:
print "proc2 recieve:", pipe.recv()
if __name__ == "__main__":
pipe = Pipe()
p1 = Process(target=proc1, args=(pipe[0],))
p2 = Process(target=proc2, args=(pipe[1],))
p1.start()
p2.start()
p1.join()
p2.join(2) #限制執行時間最多為2秒
print '\nend all processes.'
執行結果如下:
proc2 recieve: Hello,This is proc1
proc2 recieve:
end all processes.
當第二行輸出後,因為管道中沒有資料傳來,Proc2處於阻塞狀態,2秒後被強制結束。
以下是單向管道的例子,注意pipe[0],pipe[1]的分配。
# -*- coding:utf-8 -*-
from multiprocessing import Process, Pipe
def proc1(pipe):
s = 'Hello,This is proc1'
pipe.send(s)
def proc2(pipe):
while True:
print "proc2 recieve:", pipe.recv()
if __name__ == "__main__":
pipe = Pipe(duplex=False)
p1 = Process(target=proc1, args=(pipe[1],)) #pipe[1]為傳送端
p2 = Process(target=proc2, args=(pipe[0],)) #pipe[0]為接收端
p1.start()
p2.start()
p1.join()
p2.join(2) # 限制執行時間最多為2秒
print '\nend all processes.'
執行結果同上。
強大的Manage
Queue和Pipe實現的資料共享方式只支援兩種結構 Value 和 Array。Python中提供了強大的Manage專門用來做資料共享,其支援的型別非常多,包括: Value,Array,list, dict,Queue, Namespace, Lock, RLock, Semaphore, BoundedSemaphore, Condition, Event等
其用法如下:
from multiprocessing import Process, Manager
def func(dt, lt):
for i in range(10):
key = 'arg' + str(i)
dt[key] = i * i
lt += range(11, 16)
if __name__ == "__main__":
manager = Manager()
dt = manager.dict()
lt = manager.list()
p = Process(target=func, args=(dt, lt))
p.start()
p.join()
print dt, '\n', lt
執行結果:
{‘arg8’: 64, ‘arg9’: 81, ‘arg0’: 0, ‘arg1’: 1, ‘arg2’: 4, ‘arg3’: 9, ‘arg4’: 16, ‘arg5’: 25, ‘arg6’: 36, ‘arg7’: 49}
[11, 12, 13, 14, 15]