1. 程式人生 > >python-multiprocessing 多程序平行計算

python-multiprocessing 多程序平行計算

python的multiprocessing包是標準庫提供的多程序平行計算包,提供了和threading(多執行緒)相似的API函式,但是相比於threading,將任務分配到不同的CPU,避免了GIL(Global Interpreter Lock)的限制。下面我們對multiprocessing中的Pool和Process類做介紹。

Pool

採用Pool程序池對任務並行處理更加方便,我們可以指定並行的CPU個數,然後 Pool 會自動把任務放到程序池中執行。 Pool 包含了多個並行函式。

apply apply_async

apply 要逐個執行任務,在python3中已經被棄用,而apply_async是apply的非同步執行版本。平行計算一定要採用apply_async函式。


import multiprocessing
import time

from random import randint, seed

def f(num):
    seed()
    rand_num = randint(0,10) # 每次都隨機生成一個停頓時間
    time.sleep(rand_num)
    return (num, rand_num)

start_time = time.time()
cores = multiprocessing.cpu_count()
pool = multiprocessing.Pool(processes=cores)
pool_list = []
result_list = []
start_time = time.time()
for
xx in xrange(10): pool_list.append(pool.apply_async(f, (xx, ))) # 這裡不能 get, 會阻塞程序 result_list = [xx.get() for xx in pool_list] #在這裡不免有人要疑問,為什麼不直接在 for 迴圈中直接 result.get()呢?這是因為pool.apply_async之後的語句都是阻塞執行的,呼叫 result.get() 會等待上一個任務執行完之後才會分配下一個任務。事實上,獲取返回值的過程最好放在程序池回收之後進行,避免阻塞後面的語句。 # 最後我們使用一下語句回收程序池:
pool.close() pool.join() print result_list print '並行花費時間 %.2f' % (time.time() - start_time) print '序列花費時間 %.2f' % (sum([xx[1] for xx in result_list])) #[(0, 8), (1, 2), (2, 4), (3, 9), (4, 0), (5, 1), (6, 8), (7, 3), (8, 4), (9, 6)] #並行花費時間 14.11 #序列花費時間 45.00

map map_async

map_async 是 map的非同步執行函式。
相比於 apply_async, map_async 只能接受一個引數。


import time
from multiprocessing import Pool
def run(fn):
  #fn: 函式引數是資料列表的一個元素
  time.sleep(1)
  return fn*fn

if __name__ == "__main__":
  testFL = [1,2,3,4,5,6]  
  print '序列:' #順序執行(也就是序列執行,單程序)
  s = time.time()
  for fn in testFL:
    run(fn)

  e1 = time.time()
  print "順序執行時間:", int(e1 - s)

  print '並行:' #建立多個程序,並行執行
  pool = Pool(4)  #建立擁有5個程序數量的程序池
  #testFL:要處理的資料列表,run:處理testFL列表中資料的函式
  rl =pool.map(run, testFL) 
  pool.close()#關閉程序池,不再接受新的程序
  pool.join()#主程序阻塞等待子程序的退出
  e2 = time.time()
  print "並行執行時間:", int(e2-e1)
  print rl

# 序列:
# 順序執行時間: 6
# 並行:
# 並行執行時間: 2
# [1, 4, 9, 16, 25, 36]

Process

採用Process必須注意的是,Process物件來建立程序,每一個程序佔據一個CPU,所以要建立的程序必須 小於等於 CPU的個數。如果啟動程序數過多,特別是當遇到CPU密集型任務,會降低並行的效率。

#16.6.1.1. The Process class
from multiprocessing import Process, cpu_count
import os
import time

start_time = time.time()
def info(title):
#     print(title)
    if hasattr(os, 'getppid'):  # only available on Unix
        print 'parent process:', os.getppid()
    print 'process id:', os.getpid()
    time.sleep(3)

def f(name):
    info('function f')
    print 'hello', name

if __name__ == '__main__':
#     info('main line')
    p_list = [] # 儲存Process新建的程序
    cpu_num = cpu_count()
    for xx in xrange(cpu_num):
        p_list.append(Process(target=f, args=('xx_%s' % xx,)))
    for xx in p_list:
        xx.start()

    for xx in p_list:
        xx.join()
    print('spend time: %.2f' % (time.time() - start_time))
parent process: 11741
# parent process: 11741
# parent process: 11741
# process id: 12249
# process id: 12250
# parent process: 11741
# process id: 12251
# process id: 12252
# hello xx_1
# hello xx_0
# hello xx_2
# hello xx_3
# spend time: 3.04

程序間通訊

Process和Pool均支援Queues 和 Pipes 兩種型別的通訊。

Queue 佇列

佇列遵循先進先出的原則,可以在各個程序間使用。


# 16.6.1.2. Exchanging objects between processes
# Queues

from multiprocessing import Process, Queue

def f(q):
    q.put([42, None, 'hello'])

if __name__ == '__main__':
    q = Queue()
    p = Process(target=f, args=(q,))
    p.start()
    print q.get()    # prints "[42, None, 'hello']"
    p.join()

pipe

from multiprocessing import Process, Pipe

def f(conn):
    conn.send([42, None, 'hello'])
    conn.close()

if __name__ == '__main__':
    parent_conn, child_conn = Pipe()
    p = Process(target=f, args=(child_conn,))
    p.start()
    print parent_conn.recv()   # prints "[42, None, 'hello']"
    p.join()

queue 與 pipe比較

  • Pipe() can only have two endpoints.
  • Queue() can have multiple producers and consumers.
    When to use them

    If you need more than two points to communicate, use a Queue().

    If you need absolute performance, a Pipe() is much faster because Queue() is built on top of Pipe().

共享資源

多程序應該避免共享資源。在多執行緒中,我們可以比較容易地共享資源,比如使用全域性變數或者傳遞引數。在多程序情況下,由於每個程序有自己獨立的記憶體空間,以上方法並不合適。此時我們可以通過共享記憶體和Manager的方法來共享資源。但這樣做提高了程式的複雜度,並因為同步的需要而降低了程式的效率。

共享記憶體

共享記憶體僅適用於 Process 類,不能用於程序池 Pool

# 16.6.1.4. Sharing state between processes
# Shared memory
from multiprocessing import Process, Value, Array

def f(n, a):
    n.value = 3.1415927
    for i in range(len(a)):
        a[i] = -a[i]

if __name__ == '__main__':
    num = Value('d', 0.0)
    arr = Array('i', range(10))

    p = Process(target=f, args=(num, arr))
    p.start()
    p.join()

    print num.value
    print arr[:]

# 3.1415927
# [0, -1, -2, -3, -4, -5, -6, -7, -8, -9]

Manager Class

Manager Class 既可以用於Process 也可以用於程序池 Pool。


from multiprocessing import Manager, Process
def f(d, l, ii):
    d[ii] = ii
    l.append(ii)

if __name__ == '__main__':
    manager = Manager()

    d = manager.dict()
    l = manager.list(range(10))
    p_list = [] 
    for xx in range(4):
        p_list.append(Process(target=f, args=(d, l, xx)))
    for xx in p_list:
        xx.start()

    for xx in p_list:
        xx.join()
    print d
    print l
# {0: 0, 1: 1, 2: 2, 3: 3}
# [0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 0, 1, 2, 3]