1. 程式人生 > >python多程序利用Multiprocessing執行程式

python多程序利用Multiprocessing執行程式

Process類

Process 類用來描述一個程序物件。建立子程序的時候,只需要傳入一個執行函式和函式的引數即可完成 Process 示例的建立。

  • star() 方法啟動程序,
  • join() 方法實現程序間的同步,等待所有程序退出才執行下面的程式碼
  • close() 用來阻止多餘的程序湧入程序池 Pool 造成程序阻塞。
    multiprocessing.Process(group=None, target=None, name=None,args=(), kwargs={},daemon=None)
  • target 是函式名字,需要呼叫的函式
  • args 函式需要的引數,以 tuple 的形式傳入,當傳入一個引數是,要加,號,因為(1)不是一個tuple,而(1,)才是一個tuple

給函式分配程序執行例項,建立單程序:

import multiprocessing as mp


def job(a, b):
    print(a+b)


if __name__ == '__main__':  # 必須將程序過程放到main函式中去
    p1 = mp.Process(target=job, args=(1, 2))  
    p1.start()
    p1.join()

建立多程序:

import multiprocessing
import os


def run_proc(name):
    print('Child process {0} {1} Running '
.format(name, os.getpid())) if __name__ == '__main__': print('Parent process {0} is Running'.format(os.getpid())) for i in range(5): p = multiprocessing.Process(target=run_proc, args=(str(i),)) print('process start') p.start() p.join() print('Process close')

執行結果:

Parent process 6296 is Running
process start
process start
process start
process start
process start
Child process 0 9428 Running 
Child process 1 8444 Running 
Child process 2 7852 Running 
Child process 3 6540 Running 
Child process 4 14472 Running 
Process close

如果將p.join去掉,結果為:

Parent process 8712 is Running
process start
process start
process start
process start
process start
Child process 0 10516 Running 
Process close
Child process 1 3172 Running 
Child process 2 10748 Running 
Child process 3 11636 Running 
Child process 4 5484 Running 

Queue

使用Queue儲存程序輸出,用於多個程序間的通訊
Queue的功能是將每個核或執行緒的運算結果放在隊裡中, 等到每個執行緒或核執行完畢後再從佇列中取出結果, 繼續載入運算。原因很簡單, 多程序呼叫的函式不能有返回值(不能return), 所以使用Queue儲存多個程序運算的結果。
put方法:插入資料到佇列。
get方法:從佇列中讀取並刪除一個元素。

import multiprocessing as mp


def job(q):
    for i in range(10):
        q.put(i)  # 存放到佇列中


if __name__ == '__main__':
    q = mp.Queue()  # 建立佇列
    # args裡一個引數時要加,號,否則會報錯引數是不可迭代的
    p1 = mp.Process(target=job, args=(q,))
    p2 = mp.Process(target=job, args=(q,))
    p1.start()
    p2.start()
    p1.join()
    p2.join()
    res1 = q.get()  # 獲取佇列中的值
    res2 = q.get()
    res3 = q.get()
    print(res1)  # 0
    print(res2)  # 1
    print(res3)  # 2

Pool程序池

程序池就是我們將所要執行的東西,放到程序池中,Python會自行解決多程序的問題。Pool預設大小是CPU的核數,我們也可以通過在Pool中傳入processes引數自定義需要的核數量。定義程序池之後,就可以讓程序池對應某一個函式,通過向程序池中傳入資料從而返回函式值。 Pool和之前的Process的不同點是傳入Pool的函式有返回值,而Process的沒有返回值。
map方法:用map()獲取結果,在map()中需要放入函式和需要迭代運算的值,然後它會自動分配給CPU核,返回結果。
apply_async方法:apply_async()中只能傳遞一個值,它只會放入一個核進行運算,但是傳入值時要注意是元組型別,所以在傳入值後需要加逗號, 同時需要用get()方法獲取返回值。如果要實現map()的效果,需要將apply_async方法做成一個列表的形式。
程序池最後要加join方法,這樣程序池執行完畢後才向下進行,如果不加的話可能導致程序池還未執行完程式已經finished。
程式碼如下。

import multiprocessing as mp


def job(x):
    return x * x


def multicore():
    pool = mp.Pool(processes=mp.cpu_count() - 1)
    res = pool.map(job, range(10))
    print(res)
    res = pool.apply_async(job, (2,))
    print(res.get())
    multi_res = [pool.apply_async(job, (i,)) for i in range(10)]
    pool.close()  # 關閉程序池,其他程序無法加入
    pool.join()  # 等待所有程序執行完畢,呼叫前必須呼叫close方法
    print([res.get() for res in multi_res])


if __name__ == '__main__':
        multicore()

執行結果:

[0, 1, 4, 9, 16, 25, 36, 49, 64, 81]
4
[0, 1, 4, 9, 16, 25, 36, 49, 64, 81]

多程序(multiprocessing)和多執行緒(multi-threading)對比:

測試程式:

import multiprocessing as mp
import threading as td
import time


MAX = 10000000


def job(q):
    res = 0
    for i in range(MAX):
        res += i+i**2+i**3
    q.put(res)


def multicore():
    q = mp.Queue()
    p1 = mp.Process(target=job, args=(q,))
    p2 = mp.Process(target=job, args=(q,))
    p1.start()
    p2.start()
    p1.join()
    p2.join()
    res1 = q.get()
    res2 = q.get()
    print('multicore:', res1+res2)


def normal():
    res = 0
    for _ in range(2):
        for i in range(MAX):
            res += i+i**2+i**3
    print('normal:', res)

def multithread():
    q = mp.Queue()
    t1 = td.Thread(target=job, args=(q,))
    t2 = td.Thread(target=job, args=(q,))
    t1.start()
    t2.start()
    t1.join()
    t2.join()
    res1 = q.get()
    res2 = q.get()
    print('multithreading:', res1+res2)


if __name__ == '__main__':
    st = time.time()
    normal()
    st1 = time.time()
    print('normal time:', st1 - st)
    multithread()
    st2 = time.time()
    print('multithreading time:', st2 - st1)
    multicore()
    print('multicore time:', time.time()-st2)

執行結果:

normal time: 23.027679920196533
multithreading: 4999999666666716666660000000
multithreading time: 24.3942768573761
multicore: 4999999666666716666660000000
multicore time: 19.363178968429565

從上述結果來看,多程序的時間是要小於多執行緒和正常程式的,多執行緒的時間與正常時間相差無幾。原因是Python直譯器有一個全域性直譯器鎖(GIL),導致每個Python程序最多同時執行一個執行緒,因此Python多執行緒程式並不能改善程式效能,不能發揮CPU多核的優勢,通過GIL這篇文章可以瞭解。但是多程序程式可以不受影響,Python2.6引入multiprocessing來解決多程序問題,而multiprocessing的API幾乎是複製了threading的API。