1. 程式人生 > >【Python】【平行計算】Python 多核平行計算

【Python】【平行計算】Python 多核平行計算

以前寫點小程式其實根本不在乎並行,單核跑跑也沒什麼問題,而且我的電腦也只有雙核四個超執行緒(下面就統稱好了),覺得去折騰並行沒啥意義(除非在做IO密集型任務)。然後自從用上了32核128GB記憶體,看到 htop 裡面一堆空載的核,很自然地就會想這個並行必須去折騰一下。後面發現,其實 Python 的並行真的非常簡單。

htop 32cores 128GB RAM

multiprocessing vs threading

Python 自帶的庫又全又好用,這是我特別喜歡 Python 的原因之一。Python 裡面有 multiprocessing和 threading 這兩個用來實現並行的庫。用執行緒應該是很自然的想法,畢竟(直覺上)開銷小,還有共享記憶體的福利,而且在其他語言裡面執行緒用的確實是非常頻繁。然而,我可以很負責任的說,如果你用的是 CPython 實現,那麼用了 threading

 就等同於和平行計算說再見了(實際上,甚至會比單執行緒更慢),除非這是個IO密集型的任務。

GIL

CPython 指的是 python.org 提供的 Python 實現。是的,Python 是一門語言,它有各種不同的實現,比如 PyPyJythonIronPython 等等……我們用的最多的就是 CPython,它幾乎就和 Python 畫上了等號。

CPython 的實現中,使用了 GIL 即全域性鎖,來簡化直譯器的實現,使得直譯器每次只執行一個執行緒中的位元組碼。也就是說,除非是在等待IO操作,否則 CPython 的多執行緒就是徹底的謊言!

有關 GIL 下面兩個資料寫的挺好的:

multiprocessing.Pool

因為 GIL 的緣故 threading 不能用,那麼我們就好好研究研究 multiprocessing。(當然,如果你說你不用 CPython,沒有 GIL 的問題,那也是極佳的。)

首先介紹一個簡單粗暴,非常實用的工具,就是 multiprocessing.Pool。如果你的任務能用 ys = map(f, xs) 來解決,大家可能都知道,這樣的形式天生就是最容易並行的,那麼在 Python 裡面平行計算這個任務真是再簡單不過了。舉個例子,把每個數都平方:

import multiprocessing

def f(x):
    return x * x

cores = multiprocessing
.cpu_count() pool = multiprocessing.Pool(processes=cores) xs = range(5) # method 1: map print pool.map(f, xs) # prints [0, 1, 4, 9, 16] # method 2: imap for y in pool.imap(f, xs): print y # 0, 1, 4, 9, 16, respectively # method 3: imap_unordered for y in pool.imap_unordered(f, xs): print(y) # may be in any order

map 直接返回列表,而 i 開頭的兩個函式返回的是迭代器;imap_unordered 返回的是無序的。

當計算時間比較長的時候,我們可能想要加上一個進度條,這個時候 i 系列的好處就體現出來了。另外,有一個小技巧,就是輸出 \r 可以使得游標回到行首而不換行,這樣就可以製作簡易的進度條了。

cnt = 0
for _ in pool.imap_unordered(f, xs):
    sys.stdout.write('done %d/%d\r' % (cnt, len(xs)))
    cnt += 1

更復雜的操作

其中我強烈推薦的就是 Queue,因為其實很多場景就是生產者消費者模型,這個時候用 Queue 就解決問題了。用的方法也很簡單,現在父程序建立 Queue,然後把它當做 args 或者 kwargs 傳給 Process 就好了。

使用 Theano 或者 Tensorflow 等工具時的注意事項

需要注意的是,在 import theano 或者 import tensorflow 等呼叫了 Cuda 的工具的時候會產生一些副作用,這些副作用會原樣拷貝到子程序中,然後就發生錯誤,如:

could not retrieve CUDA device count: CUDA_ERROR_NOT_INITIALIZED

解決的方法是,保證父程序不引入這些工具,而是在子程序建立好了以後,讓子程序各自引入。

如果使用 Process,那就在 target 函式裡面 import。舉個例子:

import multiprocessing

def hello(taskq, resultq):
    import tensorflow as tf
    config = tf.ConfigProto()
    config.gpu_options.allow_growth=True
    sess = tf.Session(config=config)
    while True:
        name = taskq.get()
        res = sess.run(tf.constant('hello ' + name))
        resultq.put(res)

if __name__ == '__main__':
    taskq = multiprocessing.Queue()
    resultq = multiprocessing.Queue()
    p = multiprocessing.Process(target=hello, args=(taskq, resultq))
    p.start()

    taskq.put('world')
    taskq.put('abcdabcd987')
    taskq.close()

    print(resultq.get())
    print(resultq.get())

    p.terminate()
    p.join()

如果使用 Pool,那麼可以編寫一個函式,在這個函式裡面 import,並且把這個函式作為 initializer傳入到 Pool 的建構函式裡面。舉個例子:

import multiprocessing

def init():
    global tf
    global sess
    import tensorflow as tf
    config = tf.ConfigProto()
    config.gpu_options.allow_growth=True
    sess = tf.Session(config=config)

def hello(name):
    return sess.run(tf.constant('hello ' + name))

if __name__ == '__main__':
    pool = multiprocessing.Pool(processes=2, initializer=init)
    xs = ['world', 'abcdabcd987', 'Lequn Chen']
    print pool.map(hello, xs)