1. 程式人生 > >python併發之multiprocessing

python併發之multiprocessing

由於GIL(全域性解釋鎖)的問題,python多執行緒並不能充分利用多核處理器。如果想要充分地使用多核CPU的資源,在python中大部分情況需要使用多程序。multiprocessing可以給每個程序賦予單獨的Python直譯器,這樣就規避了全域性解釋鎖所帶來的問題。與threading.Thread類似,可以利用multiprocessing.Process物件來建立一個程序。multiprocessing支援子程序、通訊和共享資料、執行不同形式的同步,提供了Process、Queue、Pipe、Lock等元件。

Process

單程序

# coding: utf-8

from
multiprocessing import Process import time def task(msg): print 'hello, %s' % msg time.sleep(1) if __name__ == '__main__': p = Process(target=task, args=('world',)) p.start() if p.is_alive(): print 'Process: %s is running' % p.pid p.join()

這段程式碼的執行過程:在主程序中建立子程序,然後呼叫start()

啟動子程序,呼叫join()等待子程序執行完,再繼續執行主程序的整個的執行流程。
控制子程序進入不同階段的是 start(), join(), is_alive(), terminate(), exitcode() 方法,這些方法只能在建立子程序的程序中執行。

建立:建立程序需要一個 function 和相關引數,引數可以是dictProcess(target=func, args=(), kwargs = {}),name 可以用來標識程序。

關閉:close停止接收新的任務,如果還有任務來,就會丟擲異常。 join 是等待所有任務完成。 join 必須要在 close 之後呼叫,否則會丟擲異常。

等待:在UNIX平臺上,當某個程序終結之後,該程序需要被其父程序呼叫wait,否則程序成為殭屍程序(Zombie)。所以在這裡,我們呼叫了Process物件的join()方法 ,實際上等同於wait的作用。
對於多執行緒來說,由於只有一個程序,所以不存在此必要性。

結束:terminate() 結束工作程序,不再處理未完成的任務。

多程序

# coding: utf-8

from multiprocessing import Process
import multiprocessing
import time


def task1(msg):
    print 'task1: hello, %s' % msg
    time.sleep(1)


def task2(msg):
    print 'task2: hello, %s' % msg
    time.sleep(1)


def task3(msg):
    print 'task3: hello, %s' % msg
    time.sleep(1)


if __name__ == '__main__':
    p1 = Process(target=task1, args=('one',))
    p2 = Process(target=task2, args=('two',))
    p3 = Process(target=task3, args=('three',))

    start = time.time()

    p1.start()
    p2.start()
    p3.start()

    print("The number of CPU is:" + str(multiprocessing.cpu_count()))
    for p in multiprocessing.active_children():
        print("child p.name: " + p.name + "\tp.id: " + str(p.pid))

    p1.join()
    p2.join()
    p3.join()

    end = time.time()
    print('3 processes take %s seconds' % (end - start))

執行結果:

task1: hello, one
task2: hello, two
task3: hello, three
The number of CPU is:4
child p.name: Process-1:p.id: 99359
child p.name: Process-2:p.id: 99360
child p.name: Process-3:p.id: 99361
3 processes take 1.00933504105 seconds

這裡三個程序執行花費約1s,說明程式是併發執行的。對於更多的併發程序,我們可以放到一個迴圈中進行處理。

Lock

當多個程序需要訪問共享資源的時候,Lock可以用來避免訪問的衝突

# coding: utf-8

from multiprocessing import Lock, Process
import time


def task1(lock, f):
    with lock:
        f = open(f, 'w+')
        f.write('hello ')
        time.sleep(1)
        f.close()


def task2(lock, f):
    lock.acquire()
    try:
        f = open(f, 'a+')
        time.sleep(1)
        f.write('world!')
    except Exception as e:
        print(e)
    finally:
        f.close()
        lock.release()


if __name__ == '__main__':
    lock = Lock()
    fn = './file.txt'

    start = time.time()
    p1 = Process(target=task1, args=(lock, fn,))
    p2 = Process(target=task2, args=(lock, fn,))

    p1.start()
    p2.start()

    p1.join()
    p2.join()

    end = time.time()
    print 'time cost: %s seconds' % (end - start)

    with open(fn, 'r') as f:
        for x in f.readlines():
            print x

執行結果:

time cost: 2.0059568882 seconds
hello world!

因為要訪問共享檔案,先獲得鎖的程序會阻塞後面的程序,因此程式執行耗時約2s。

Semaphore

Semaphore 和 Lock 稍有不同,Semaphore 相當於 N 把鎖,獲取其中一把就可以執行了。 訊號量的總數 N 在構造時傳入,s = Semaphore(N)。 和 Lock 一樣,如果訊號量為0,則程序堵塞,直到訊號大於0。Semaphore可用來控制對共享資源的訪問數量,例如池的最大連線數。

# coding: utf-8

from multiprocessing import Semaphore, Process
import time


def task(s, msg):
    s.acquire()
    print 'hello, %s' % msg
    time.sleep(1)
    s.release()


if __name__ == '__main__':
    s = Semaphore(2)

    processes = []
    for x in range(8):
        p = Process(target=task, args=(s, x,))
        processes.append(p)

    start = time.time()
    for p in processes:
        p.start()

    for p in processes:
        p.join()

    end = time.time()

    print '8 process takes %s seconds' % (end - start)

執行結果:

hello, 0
hello, 1
hello, 3
hello, 2
hello, 4
hello, 5
hello, 7
hello, 6
8 process takes 4.00831484795 seconds

訊號量同步基於內部計數器,每呼叫一次acquire(),計數器減1;每呼叫一次release(),計數器加1.當計數器為0時,acquire()呼叫被阻塞。這是Dijkstra訊號量概念P()和V()的Python實現。訊號量同步機制適用於訪問像伺服器、檔案這樣的有限資源。

Event

Event用來實現程序間同步通訊。

# coding: utf-8

from multiprocessing import Process, Event
import time


def task1(e, msg):
    print 'task1 is waitting...'
    e.wait()
    time.sleep(1)
    print 'hello, %s, e.is_set(): %s' % (msg, e.is_set())


def task2(e, msg):
    print 'task2 is waitting...'
    e.wait(msg)
    print 'hello, %s, e.is_set(): %s' % (msg, e.is_set())


if __name__ == '__main__':
    e = Event()

    p1 = Process(target=task1, args=(e, 1))
    p2 = Process(target=task2, args=(e, 2))

    p1.start()
    p2.start()

    time.sleep(3)

    e.set()
    print 'main: event is set'

執行結果:

task1 is waitting...
task2 is waitting...
hello, 2, e.is_set(): False
main: event is set
hello, 1, e.is_set(): True

Pool

如果有50個task要執行,但 CPU 只有4核,我們當然可以迴圈建立50個程序來做這個事情,但這樣處理大大增加程序管理和排程的開銷。
如果可以只建立4個程序,讓它們輪流工作完成任務,不用我們自己去管理具體的程序的建立、銷燬和排程,豈不更好。multiprocessing中的 Pool 可以幫助我們做到這一點。

多程序非同步

# coding: utf-8

from multiprocessing import Pool
import time


def task(msg):
    print 'hello, %s' % msg
    time.sleep(1)


if __name__ == '__main__':
    pool = Pool(processes=4)

    for x in range(10):
        pool.apply_async(task, args=(x,))

    pool.close()
    pool.join()

    print 'processes done.'

注意:這裡使用的是apply_async,多個程序非同步執行;如果呼叫async,就變成阻塞版本了。
執行結果:

hello, 0
hello, 1
hello, 2
hello, 3
hello, 4
hello, 5
hello, 6
hello, 7
hello, 8
hello, 9
10 processes take 3.09226489067 seconds

Pool 程序池建立4個程序,不管有沒有任務,都一直在程序池中等候。官網的描述:Worker processes within a Pool typically live for the complete duration of the Pool’s work queue。資料來的時候,若有空閒程序,則利用空閒的程序完成任務,直到所有任務完成為止;若沒有空閒的程序,則需要等待,直到池中有程序結束。

獲取程序執行結果

更多的時候,我們不僅需要多程序執行,還需要關注每個程序的執行結果,我們可以通過獲取apply_async的返回值得到執行結果。

# coding: utf-8

from multiprocessing import Pool
import time


def task(msg):
    print 'hello, %s' % msg
    time.sleep(1)
    return 'msg: %s' % msg


if __name__ == '__main__':
    pool = Pool(processes=4)

    results = []
    for x in range(10):
        ret = pool.apply_async(task, args=(x,))
        results.append(ret)

    pool.close()
    pool.join()

    print 'processes done, result:'

    for x in results:
        print x.get()

pool中的map方法

上面我們是通過一個迴圈往程序池新增任務,Pool提供了更優雅的map方法來管理任務的提交,只需對上面的程式碼稍作修改。

# coding: utf-8

from multiprocessing import Pool
import time


def task(msg):
    print 'hello, %s' % msg
    time.sleep(1)
    return 'msg: %s' % msg


if __name__ == '__main__':
    pool = Pool(processes=4)

    results = []
    msgs = [x for x in range(10)]
    results = pool.map(task, msgs)

    pool.close()
    pool.join()

    print 'processes done, result:'

    for x in results:
        print x

友情推薦

推薦一下極客時間的阿里架構師架構專欄,全是乾貨,掃碼優有惠、贈30元新手券。
這裡寫圖片描述