快速掌握用python寫並行程式
目錄
小子今天想來談談“平行計算”,作為一個非科班人員,我為什麼去搗鼓這麼一個在科班裡也比較專業的問題了。這就要說下我前幾天做的一個作業了,當時我用python寫了個程式,結果運行了一天,這個速度可讓我愁了,我還怎麼優化,怎麼交作業啊。於是小子就去各大論壇尋丹問藥了,終於讓我發現可以用平行計算來最大化壓榨電腦的CPU,提升計算效率,而且python裡有multiprocessing這個庫可以提供平行計算介面,於是小子花1天時間改程序序,終於在規定時間內做出了自己滿意的結果,上交了作業。之後,小子對平行計算充滿了興趣,於是又重新在Google上游歷了一番,大致弄清了GPU、CPU、程序、執行緒、平行計算、分散式計算等概念,也把python的multiprocessing耍了一遍,現在小子也算略有心得了,所以來此立碑,以示後來遊客。
小子本文分為四部分,一是大資料時代現狀,其二是面對挑戰的方法,然後是用python寫並行程式,最後是multiprocessing實戰。
一、大資料時代的現狀
當前我們正處於大資料時代,每天我們會通過手機、電腦等裝置不斷的將自己的資料傳到網際網路上。據統計,YouTube上每分鐘就會增加500多小時的視訊,面對如此海量的資料,如何高效的儲存與處理它們就成了當前最大的挑戰。
但在這個對硬體要求越來越高的時代,CPU卻似乎並不這麼給力了。自2013年以來,處理器頻率的增長速度逐漸放緩了,目前CPU的頻率主要分佈在3~4GHz。這個也是可以理解的,畢竟摩爾定律都生效了50年了,如果它老人家還如此給力,那我們以後就只要靜等處理器頻率提升,什麼計算問題在未來那都不是話下了。實際上CPU與頻率是於能耗密切相關的,我們之前可以通過加電壓來提升頻率,但當能耗太大,散熱問題就無法解決了,所以頻率就逐漸穩定下來了,而Intel與AMD等大製造商也將目標轉向了多核晶片,目前普通桌面PC也達到了4~8核。
二、面對挑戰的方法
咱們有了多核CPU,以及大量計算裝置,那我們怎麼來用它們應對大資料時代的挑戰了。那就要提到下面的方法了。
2.1 平行計算
並行(parallelism)是指程式執行時的狀態,如果在同時刻有多個“工作單位”執行,則所執行的程式處於並行狀態。圖一是並行程式的示例,開始並行後,程式從主執行緒分出許多小的執行緒並同步執行,此時每個執行緒在各個獨立的CPU進行執行,在所有執行緒都執行完成之後,它們會重新合併為主執行緒,而執行結果也會進行合併,並交給主執行緒繼續處理。
圖二是一個多執行緒的任務(沿線為執行緒時間),但它不是並行任務。這是因為task1與task2總是不在同一時刻執行,這個情況下單核CPU完全可以同時執行task1與task2。方法是在task1不執行的時候立即將CPU資源給task2用,task2空閒的時候CPU給task1用,這樣通過時間窗調整任務,即可實現多執行緒程式,但task1與task2並沒有同時執行過,所以不能稱為並行。我們可以稱它為併發(concurrency)程式,這個程式一定意義上提升了單個CPU的使用率,所以效率也相對較高。
並行程式設計模型:
- 資料並行(Data Parallel)模型:將相同的操作同時作用於不同資料,只需要簡單地指明執行什麼並行操作以及並行操作物件。該模型反映在圖一中即是,並行同時在主執行緒中拿取資料進行處理,併線程執行相同的操作,然後計算完成後合併結果。各個並行執行緒在執行時互不干擾。
- 訊息傳遞(Message Passing)模型:各個並行執行部分之間傳遞訊息,相互通訊。訊息傳遞模型的並行執行緒在執行時會傳遞資料,可能一個執行緒執行到一半的時候,它所佔用的資料或處理結果就要交給另一個執行緒處理,這樣,在設計並行程式時會給我們帶來一定麻煩。該模型一般是分散式記憶體平行計算機所採用方法,但是也可以適用於共享式記憶體的平行計算機。
什麼時候用平行計算:
- 多核CPU——計算密集型任務。儘量使用平行計算,可以提高任務執行效率。計算密集型任務會持續地將CPU佔滿,此時有越多CPU來分擔任務,計算速度就會越快,這種情況才是並行程式的用武之地。
- 單核CPU——計算密集型任務。此時的任務已經把CPU資源100%消耗了,就沒必要使用平行計算,畢竟硬體障礙擺在那裡。
- 單核CPU——I/O密集型任務。I/O密集型任務在任務執行時需要經常呼叫磁碟、螢幕、鍵盤等外設,由於呼叫外設時CPU會空閒,所以CPU的利用率並不高,此時使用多執行緒程式,只是便於人機互動。計算效率提升不大。
- 多核CPU——I/O密集型任務。同單核CPU——I/O密集型任務。
2.2 改用GPU處理計算密集型程式
GPU即圖形處理器核心(Graphics Processing Unit),它是顯示卡的心臟,顯示卡上還有視訊記憶體,GPU與視訊記憶體類似與CPU與記憶體。
GPU與CPU有不同的設計目標,CPU需要處理所有的計算指令,所以它的單元設計得相當複雜;而GPU主要為了圖形“渲染”而設計,渲染即進行資料的列處理,所以GPU天生就會為了更快速地執行復雜算術運算和幾何運算的。
GPU相比與CPU有如下優勢:
- 強大的浮點數計算速度。
- 大量的計算核心,可以進行大型平行計算。一個普通的GPU也有數千個計算核心。
- 強大的資料吞吐量,GPU的吞吐量是CPU的數十倍,這意味著GPU有適合的處理大資料。
GPU目前在處理深度學習上用得十分多,英偉達(NVIDIA)目前也花大精力去開發適合深度學習的GPU。現在上百層的神經網路已經很常見了,面對如此龐大的計算量,CPU可能需要運算幾天,而GPU卻可以在幾小時內算完,這個差距已經足夠別人比我們多打幾個比賽,多發幾篇論文了。
3.3 分散式計算
說到分散式計算,我們就先說下下Google的3篇論文,原文可以直接點連結去下載:
- GFS(The Google File System):解決資料儲存的問題。採用N多臺廉價的電腦,使用冗餘的方式,來取得讀寫速度與資料安全並存的結果。
- MapReduce(Simplified Data Processing on Large Clusters):函數語言程式設計,把所有的操作都分成兩類,map與reduce,map用來將資料分成多份,分開處理,reduce將處理後的結果進行歸併,得到最終的結果。
- BigTable(Bigtable: A Distributed Storage System for Structured Data):在分散式系統上儲存結構化資料的一個解決方案,解決了巨大的Table的管理、負載均衡的問題.
Google在2003~2006年發表了這三篇論文之後,一時之間引起了轟動,但是Google並沒有將MapReduce開源。在這種情況下Hadoop就出現了,Doug Cutting在Google的3篇論文的理論基礎上開發了Hadoop,此後Hadoop不斷走向成熟,目前Facebook、IBM、ImageShack等知名公司都在使用Hadoop執行他們的程式。
分散式計算的優勢:
可以整合諸多低配的計算機(成千上萬臺)進行高併發的儲存與計算,從而達到與超級計算機媲美的處理能力。
三、用python寫並行程式
在介紹如何使用python寫並行程式之前,我們需要先補充幾個概念,分別是程序、執行緒與全域性直譯器鎖(Global Interpreter Lock, GIL)。
3.1 程序與執行緒
程序(process):
- 在面向執行緒設計的系統(如當代多數作業系統、Linux 2.6及更新的版本)中,程序本身不是基本執行單位,而是執行緒的容器。
- 程序擁有自己獨立的記憶體空間,所屬執行緒可以訪問程序的空間。
- 程式本身只是指令、資料及其組織形式的描述,程序才是程式的真正執行例項。 例如,Visual Studio開發環境就是利用一個程序編輯原始檔,並利用另一個程序完成編譯工作的應用程式。
執行緒(threading):
- 執行緒有自己的一組CPU指令、暫存器與私有資料區,執行緒的資料可以與同一程序的執行緒共享。
- 當前的作業系統是面向執行緒的,即以執行緒為基本執行單位,並按執行緒分配CPU。
程序與執行緒有兩個主要的不同點,其一是程序包含執行緒,執行緒使用程序的記憶體空間,當然執行緒也有自己的私有空間,但容量小;其二是程序有各自獨立的記憶體空間,互不干擾,而執行緒是共享記憶體空間。
圖三展示了程序、執行緒與CPU之間的關係。在圖三中,程序一與程序二都含有3個執行緒,CPU會按照執行緒來分配任務,如圖中4個CPU同時執行前4個執行緒,後兩個標紅執行緒處於等待狀態,在CPU執行完當前執行緒時,等待的執行緒會被喚醒並進入CPU執行。通常,程序含有的執行緒數越多,則它佔用CPU的時間會越長。
3.2 全域性直譯器鎖GIL:
GIL是計算機程式設計語言直譯器用於同步執行緒的一種機制,它使得任何時刻僅有一個執行緒在執行。即便在多核心處理器上,使用 GIL 的直譯器也只允許同一時間執行一個執行緒。Python的Cpython直譯器(普遍使用的直譯器)使用GIL,在一個Python直譯器程序內可以執行多執行緒程式,但每次一個執行緒執行時就會獲得全域性直譯器鎖,使得別的執行緒只能等待,由於GIL幾乎釋放的同時就會被原執行緒馬上獲得,那些等待執行緒可能剛喚醒,所以經常造成執行緒不平衡享受CPU資源,此時多執行緒的效率比單執行緒還要低下。在python的官方文件裡,它是這樣解釋GIL的:
In CPython, the global interpreter lock, or GIL, is a mutex that prevents multiple native threads from executing Python bytecodes at once. This lock is necessary mainly because CPython’s memory management is not thread-safe. (However, since the GIL exists, other features have grown to depend on the guarantees that it enforces.)
可以說它的初衷是很好的,為了保證執行緒間的資料安全性;但是隨著時代的發展,GIL卻成為了python平行計算的最大障礙,但這個時候GIL已經遍佈CPython的各個角落,修改它的工作量太大,特別是對這種開源性的語音來說。但幸好GIL只鎖了執行緒,我們可以再新建直譯器程序來實現並行,那這就是multiprocessing的工作了。
3.3 multiprocessing
multiprocessing是python裡的多程序包,通過它,我們可以在python程式裡建立多程序來執行任務,從而進行平行計算。官方文件如下所述:
The multiprocessing package offers both local and remote concurrency, effectively side-stepping the Global Interpreter Lock by using subprocesses instead of threads.
我們接下來介紹下multiprocessing的各個介面:
3.3.1 程序process
multiprocessing.Process(target=None, args=())
target: 可以被run()呼叫的函式,簡單來說就是程序中執行的函式
args: 是target的引數
process的方法:
start(): 開始啟動程序,在建立process之後執行
join([timeout]):阻塞目前父程序,直到呼叫join方法的程序執行完或超時(timeout),才繼續執行父程序
terminate():終止程序,不論程序有沒有執行完,儘量少用。
示例1
from multiprocessing import Process
def f(name):
print 'hello', name
if __name__ == '__main__':
p = Process(target=f, args=('bob',)) # p程序執行f函式,引數為'bob',注意後面的“,”
p.start() # 程序開始
p.join() # 阻塞主執行緒,直至p程序執行結束
3.3.2 程序池Process Pools
class multiprocessing.Pool([processes])
processes是程序池中的程序數,預設是本機的cpu數量
方法:
apply(func[, args[, kwds]])程序池中的程序進行func函式操作,操作時會阻塞程序,直至生成結果。
apply_async(func[, args[, kwds[, callback]]])與apply類似,但是不會阻塞程序
map(func, iterable[, chunksize])程序池中的程序進行對映操作
map_async(func, iterable[, chunksize[, callback]])
imap(func, iterable[, chunksize]):返回有序迭代器
imap_unordered(func, iterable[, chunsize]):返回無序迭代器
close():禁止程序池再接收任務
terminate():強行終止程序池,不論是否有任務在執行
join():在close()或terminate()之後進行,等待程序退出
示例2
from multiprocessing import Pool
def f(x):
return x*x
if __name__ == '__main__':
p = Pool(5) # 建立有5個程序的程序池
print(p.map(f, [1, 2, 3])) # 將f函式的操作給程序池
3.3.3 Pipes & Queues
multiprocessing.Pipe([duplex])
返回兩個連線物件(conn1, conn2),兩個連線物件分別訪問pipe的頭和尾,進行讀寫操作
Duplex: True(default),建立的pipe是雙向的,也即兩端都可以進行讀寫;若為False,則pipe是單向的,僅可以在一端讀,另一端寫,此時與Queue類似。
multiprocessing.Queue([maxsize])
qsize():返回queue中member數量
empty():如果queue是空的,則返回true
full():如果queue中member數量達到maxsize,則返回true
put(obj):將一個object放入到queue中
get():從佇列中取出一個object並將它從queue中移除,FIFO原則
close():關閉佇列,並將快取的object寫入pipe
示例
from multiprocessing import Pool
import time
def f(x):
return x*x
if __name__ == '__main__':
pool = Pool(processes=4) # start 4 worker processes
result = pool.apply_async(f, (10,)) # evaluate "f(10)" asynchronously in a single process
print result.get(timeout=1) # prints "100" unless your computer is *very* slow
print pool.map(f, range(10)) # prints "[0, 1, 4,..., 81]"
it = pool.imap(f, range(10))
print it.next() # prints "0"
print it.next() # prints "1"
print it.next(timeout=1) # prints "4" unless your computer is *very* slow
result = pool.apply_async(time.sleep, (10,))
print result.get(timeout=1) # raises multiprocessing.TimeoutError
3.3.4 程序鎖multiprocessing.Lock
當一個程序獲得(acquire)鎖之後,其它程序在想獲得鎖就會被禁止,可以保護資料,進行同步處理。
acquire(block=True, timeout=None):嘗試獲取一個鎖,如果block為true,則會在獲得鎖之後阻止其它程序再獲取鎖。
release():釋放鎖
3.3.5 共享記憶體——Value, Array
共享記憶體通常需要配合程序鎖來處理,保證處理的順序相同。
multiprocessing.Value(typecode_or_type, *args[, lock])
返回一個ctype物件,
建立c = Value(‘d’, 3.14),呼叫c.value()
multiprocessing.Array(typecode_or_type, size_or_initializer, *, lock=True)
返回一個ctype陣列,只能是一維的
Array(‘i’, [1, 2, 3, 4])
Type code | C Type | Python Type | Minimum size in bytes |
---|---|---|---|
'b' |
signed char | int | 1 |
'B' |
unsigned char | int | 1 |
'u' |
Py_UNICODE | Unicode character | 2 |
'h' |
signed short | int | 2 |
'H' |
unsigned short | int | 2 |
'i' |
signed int | int | 2 |
'I' |
unsigned int | int | 2 |
'l' |
signed long | int | 4 |
'L' |
unsigned long | int | 4 |
'q' |
signed long long | int | 8 |
'Q' |
unsigned long long | int | 8 |
'f' |
float | float | 4 |
'd' |
double | float | 8 |
3.3.6 其它方法
multiprocessing.active_children():返回當前程序的所有子程序
multiprocessing.cpu_count():返回本計算機的cpu數量
multiprocessing.current_process():返回當前程序
3.3.7 注意事項:
- 儘量避免共享資料
- 所有物件都儘量是可以pickle的
- 避免使用terminate強行終止程序,以造成不可預料的後果
- 有佇列的程序在終止前佇列中的資料需要清空,join操作應放到queue清空後
- 明確給子程序傳遞資源、引數
windows平臺另需注意:
- 注意跨模組全域性變數的使用,可能被各個程序修改造成結果不統一
- 主模組需要加上if name == 'main':來提高它的安全性,如果有互動介面,需要加上freeze_support()
四、multiprocessing實戰
process、lock與value嘗試:
import multiprocessing as mp
import time
def job(v, num, l):
l.acquire() # 鎖住
for _ in range(5):
time.sleep(0.1)
v.value += num # 獲取共享記憶體
print(v.value)
l.release() # 釋放
def multicore():
l = mp.Lock() # 定義一個程序鎖
#l = 1
v = mp.Value('i', 0) # 定義共享記憶體
p1 = mp.Process(target=job, args=(v,1,l)) # 需要將lock傳入
p2 = mp.Process(target=job, args=(v,3,l))
p1.start()
p2.start()
p1.join()
p2.join()
if __name__=='__main__':
multicore()
上述程式碼即對共享記憶體疊加5次,p1程序每次疊加1,p2程序每次疊加3,為了避免p1與p2在執行時搶奪共享資料v,在程序執行時鎖住了該程序,從而保證了執行的順序。我測試了三個案例:
- 直接執行上述程式碼輸出[1, 2, 3, 4, 5, 8, 11, 14, 17, 20],執行時間為1.037s
- 在1的基礎上註釋掉鎖(上述註釋了三行),在沒有鎖的情況下,輸出[1, 4, 5, 8, 9, 12, 13, 15, 14, 16],執行時間為0.53s
- 在2的基礎上將p1.join()調到p2.start()前面,輸出為[1, 2, 3, 4, 5, 8, 11, 14, 17, 20],執行時間為1.042s.
可以發現,沒鎖的情況下調整join可以取得與加鎖類似的結果,這是因為join即是阻塞主程序,直至當前程序結束才回到主程序,若將p1.join()放到p1.start()後面,則會馬上阻塞主程序,使得p2要稍後才開始,這與鎖的效果一樣。
如果如上述程式碼所示,p1.join()在p2.start()後面,雖然是p1先join(),但這時只是阻塞了主程序,而p2是兄弟程序,它已經開始了,p1就不能阻止它了,所以這時如果沒鎖的話p1與p2就是並行了,執行時間就是一半,但因為它們爭搶共享變數,所以輸出就變得不確定了。
pool
import multiprocessing as mp
#import pdb
def job(i):
return i*i
def multicore():
pool = mp.Pool()
#pdb.set_trace()
res = pool.map(job, range(10))
print(res)
res = pool.apply_async(job, (2,))
# 用get獲得結果
print(res.get())
# 迭代器,i=0時apply一次,i=1時apply一次等等
multi_res = [pool.apply_async(job, (i,)) for i in range(10)]
# 從迭代器中取出
print([res.get() for res in multi_res])
multicore()
pool其實非常好用,特別是map與apply_async。通過pool這個介面,我們只有指定可以並行的函式與函式引數列表,它就可以自動幫我們建立多程序池進行平行計算,真的不要太方便。pool特別適用於資料並行模型,假如是訊息傳遞模型那還是建議自己通過process來創立程序吧。
總結
小子這次主要是按自己的理解把平行計算理了下,對程序、執行緒、CPU之間的關係做了下闡述,並把python的multiprocessing這個包拎了拎,個人感覺這個裡面還大有學問,上次我一個師兄用python的process來控制單次迭代的執行時間(執行超時就跳過這次迭代,進入下一次迭代)也是讓我漲了見識,後面還要多多學習啊。
感謝您花費寶貴的時間閱讀到這裡,希望能有所收穫,也歡迎在評論區進行交流。
推薦好文:
multiprocessing官方文件
python多程序的理解 multiprocessing Process join run(推薦好文)
多程序 Multiprocessing