1. 程式人生 > >multiprocessing在python中的高階應用-程序池

multiprocessing在python中的高階應用-程序池

下面的類可以建立程序池,可以吧各種資料處理任務都提交給程序池。程序池提供的功能有點類似於列表解析和功能性程式設計操作(如對映-規約)提供的功能。

Pool( [ numprocess [, initializer [, initargs] ] ] )
建立工作程序池。
numprocess是要建立的程序數。如果省略此引數,將使用cpu_count()的值。【這裡簡單介紹一下:
from multiprocessing import cpu_count
print(cpu_count()) #獲得電腦的CPU的個數
】。
initializer是每個工作程序啟動時要執行的可呼叫物件。initargs是要傳遞給initializer的引數元組。initializer預設為None。
Pool類的例項p支援一下操作:

p.apply(func [, args[, kwargs] ] )
在一個池工作程序中執行函式(*args,**kwargs),然後返回結果。這裡要強調一點:此操作並不會在所有池工作程序中並行執行func函式。如果要通過不同引數併發地執行func函式,必須從不同執行緒呼叫p.apply()函式或者使用p.apply_async()函式。

p.apply_async( func [, args [, kwargs [, callback] ] ] )
在一個池工作程序中非同步地執行函式(*args,**kwargs),然後返回結果。此方法的結果是AsyncResult類的例項,稍後可用於獲得最終結果。callback禁止只習慣任何阻塞操作,否則將阻塞接收其他非同步操作中的結果。

p.close()
關閉程序池,防止進行進一步操作。如果所有操作持續掛起,他們將在工作程序終止之前完成。

p.join()
等待所有工作程序退出。此方法只能在close()或terminate()方法之後呼叫。

p.imap(func, iterable [, chunksize] )
map()函式的版本之一,返回迭代器而非結果列表。

p.imap_unordered(func, iterable [,chunksize] )
同imap()函式,但從工作程序接收結果時,返回結果的次序時任意的。

p.map(func, iterable [, chunksize] )
將可呼叫物件func應用給iterable中的所有專案,然後以列表的形式返回結果。通過將iterable劃分為多塊並將工作分派給工作程序,可以並行地執行這項操作。chunksize制定每塊中的專案數。
如果資料量較大,可以增大chunksize的值來提升效能。

p.map_async( func , iterable [, chunksize [, callback] ] )
同map()函式,但結果的返回時非同步地。如果提供callable引數,當結果變為可用時,它將與結果一起被呼叫。

p.terminate()
立即終止所有工作程序,同時不執行任何清理或結束任何掛起工作。如果p被垃圾收集,將自動呼叫此函式。
方法apply_async()和map_async()的返回值是AsyncResult例項。AsyncResult例項具有以下方法。

a.get( [timeout] )
返回結果,如果有必要則等待結果到達。timeout是可選的超時。如果結果在制定時間內沒有到達,將引發multuprocessing.TimeoutError異常。如果遠端操作中引發了異常,它將在呼叫此方法時再次被引發。

a.ready()
如果呼叫完成,返回True

a.sucessful()
如果呼叫完成且沒有引發異常,返回True。如果在結果就緒之前呼叫此方法,將引發AssertionError異常。

a.wait( [ timeout] )
等待結果變為可用。timeout是可選的超時。

下面的例子說明如何使用程序池構建字典,將整個目錄中檔案的檔名對映為SHA512摘要值:

import multiprocessing
import os
import hashlib
#Some parameters you can tweek
BUFSIZE=8192  #讀取緩衝區大小
POOLSIZE=4
def compute_digest(filename):
    try:
        f=open(filename,"rb")
    except IOError:
        return None
    digest=hashlib.sha512()
    while True:
        chunk=f.read(BUFSIZE)
        if not chunk:break
        digest.update(chunk)
    f.close()
    return filename,digest.digest()
def build_digest_map(topdir):
    digest_pool=multiprocessing.Pool(4)
    allfiles=(os.path.join(path,name)
              for path,dirs,files in os.walk(topdir)
              for name in files)

    digest_map=dict(digest_pool.imap_unordered(compute_digest,allfiles,20))
    digest_pool.close()
    return digest_map

if __name__=="__main__":
    digest_map=build_digest_map("F:\WaterFlow")
    print len(digest_map)

在這個例子中,使用生成器表示式指定一個目錄樹中所有檔案的路徑名稱序列。然後使用imap_unordered()函式將這個序列分割並傳遞給程序池。每個池工作程序使用compute_digest()函式為它的檔案計算SHA512摘要值。將結果返回給生成器,然後收集到python字典中。
要記住,只有充分利用了池工作程序才能夠使額外的通訊開銷變得有價值,使用程序池才有意義。一般而言,對於簡單的計算(如兩個數相加),使用程序池是沒有意義的。