multiprocessing在python中的高階應用-共享資料與同步
通常,程序之間彼此是完全孤立的,唯一的通訊方式是佇列或管道。但可以使用兩個物件來表示共享資料。其實,這些物件使用了共享記憶體(通過mmap模組)使訪問多個程序成為可能。
Value( typecode, arg1, … argN, lock )
在共享內容中常見ctypes物件。typecode要麼是包含array模組使用的相同型別程式碼(如’i’,’d’等)的字串,要麼是來自ctypes模組的型別物件(如ctypes.c_int、ctypes.c_double等)。所有額外的位置引數arg1, arg2 ….. argN將傳遞給指定型別的建構函式。lock是隻能使用關鍵字呼叫的引數,如果把它置為True(預設值),將建立一個新的鎖定來包含對值的訪問。如果傳入一個現有鎖定,比如Lock或RLock例項,該鎖定將用於進行同步。如果v是Value建立的共享值的例項,便可使用v.value訪問底層的值。例如,讀取v.value將獲取值,而賦值v.value將修改值。
RawValue( typecode, arg1, … ,argN)
同Value物件,但不存在鎖定。
Array( typecode, initializer, lock )
在共享記憶體中建立ctypes陣列。typecode描述了陣列的內容,意義與Value()函式中的相同。initializer要麼是設定陣列初始大小的整數,要麼是專案序列,其值和大小用於初始化陣列。lock是隻能使用關鍵字呼叫的引數,意義與Value()函式中相同。如果a是Array建立的共享陣列的例項,便可使用標準的python索引、切片和迭代操作訪問它的內容,其中每種操作均由鎖定進行同步。對於位元組字串,a還具有a.value屬性,可以吧整個陣列當做一個字串進行訪問。
RawArray(typecode, initializer )
同Array物件,但不存在鎖定。當所編寫的程式必須一次性操作大量的陣列項時,如果同時使用這種資料型別和用於同步的單獨鎖定(如果需要的話),效能將得到極大的提升。
除了使用Value()和Array()建立的共享值之外,multiprocessing模組還提供一下同步源於的共享版本。
這些物件的行為與threading模組中定義的名稱相同的同步原語相似。請參考threading文件瞭解更多細節。
應該注意,使用多程序後,通常不必再擔心與鎖定、訊號量或類似構造的底層同步,這一點與執行緒不相伯仲。在某種程度上,管道上的send()和receive()操作,以及佇列上的put()和get()操作已經提供了同步功能。但是,在某寫特定的設定下還是需要用到共享值和鎖定。下面這個例子說明了如何使用共享陣列代替管道,將一個浮點數的python列表傳送給另一個程序:
import multiprocessing
class FloatChannel(object):
def __init__(self,maxsize):
self.buffer=multiprocessing.RawArray('d',maxsize)
self.buffer_len=multiprocessing.Value('i')
self.empty=multiprocessing.Semaphore(1)
self.full=multiprocessing.Semaphore(0)
def send(self,values):
self.empty.acquire() #只在快取為空時繼續
nitems=len(values)
self.buffer_len=nitems #設定緩衝區大小
self.buffer[:nitems]=values #將複製到緩衝區中
self.full.release() #發訊號通知緩衝區已滿
def recv(self):
self.full.acquire() #只在緩衝區已滿時繼續
values=self.buffer[:self.buffer_len.value] #複製值
self.empty.release() #傳送訊號通知緩衝區為空
return values
#效能測試 接收多條訊息
def consume_test(count,ch):
for i in xrange(count):
values=ch.recv()
#效能測試 傳送多條訊息
def produce_test(count,values,ch):
for i in xrange(count):
ch.send(values)
if __name__=="__main__":
ch=FloatChannel(100000)
p=multiprocessing.Process(target=consume_test,args=(1000,ch))
p.start()
values=[float(x) for x in xrange(100000)]
produce_test(1000,values,ch)
print "Done"
p.join()
在我的計算機上執行效能測試時,通過FloatChannel傳送一個較大的浮點數列表,速度比通過Pipe傳送快大約80%,因為後者必須對所有值進行序列化和反序列化。