Python-multiprocessing程序管理
阿新 • • 發佈:2019-01-27
multiprocessing模組包含一個API,它基於threading API可以在多個程序間劃分工作。有些情況下,multiprocessing可以作為臨時替換,取代threading來利用多個CPU核心,避免全域性直譯器鎖帶來的效能瓶頸。
1. multiprocessing基礎
建立程序(MP.Process)
要建立第二個程序,最簡單的方法是例項化一個Process物件,並呼叫start()讓其工作。import multiprocessing def worker(): print 'Worker' return if __name__ == '__main__': jobs = [] for i in range(5): p = multiprocessing.Process(target = worker) jobs.append(p) p.start()
執行結果將會列印5次‘Worker',不過不清楚孰先孰後,這取決於具體的執行順序,因為每個程序都在競爭訪問輸出流。更有用的做法是,建立一個程序時可以提供引數。與threading不同,要向一個multiprocessing Process傳遞引數,這個引數必須能夠使用pickle序列化。下面的例子向各個工作程序傳遞一個要列印的數。
Worker 0 Worker 2 Worker 3 Worker 4 Worker 1import multiprocessing import time def worker(num): print "Worker", num time.sleep(0.1) return if __name__ == '__main__': jobs = [] for i in range(5): p = multiprocessing.Process(target = worker, args = (i,)) jobs.append(p) p.start()
可匯入的目標函式
threading與multiprocessing例子之間有一個區別,multiprocessing例子中對__main__使用了額外的保護。對於新程序的啟動方式,要求子程序能夠匯入包含目標函式的指令碼。可以講應用的主要部分包裝在一個__main__檢查中,確保模組匯入時不會在各個子程序中遞迴地執行。另外一個方法是從一個單獨的指令碼中匯入目標函式,下面例子中程序的工作函式是simple.py中worker函式:import multiprocessing import simple if __name__ == '__main__': jobs = [] for i in range(5): p = multiprocessing.Process( target = simple.worker, ) jobs.append(p) p.start()
確定當前程序(MP.current_process().name)
每個Process例項都有一個名稱,其預設值可以在建立程序時改變。給程序命名對於跟蹤程序很有用,特別是在當前應用中有多種型別的程序同時執行時。import multiprocessing
import time
def worker():
name = multiprocessing.current_process().name
print name, 'Starting'
time.sleep(2)
print name, 'Exiting'
def my_device():
name = multiprocessing.current_process().name
print name, 'Starting'
time.sleep(3)
print name, 'Exiting'
if __name__ == '__main__':
service = multiprocessing.Process(name = 'my_service',target = my_device)
worker_1 = multiprocessing.Process(name = 'worker 1', target = worker)
worker_2 = multiprocessing.Process(target = worker) #default name, or set name by worker_2.name = 'worker_2'
worker_1.start()
worker_2.start()
service.start()
程序名稱為Process-3的行對應未命名的程序worker_2。
worker 1 Starting
worker 1 Exiting
Process-3 Starting
Process-3 Exiting
my_service Starting
my_service Exiting
守護程序(P.daemon=True)
要標誌一個程序為守護程序,可以將其daemon屬性設定為True,預設情況下不為守護程序。service = multiprocessing.Process(name = 'my_service',target = my_device)
service.daemon = True
主程序可以使用join()等待守護程序退出,也可以給join傳入一個超時引數(浮點數,單位為秒),即使程序在這個超時範圍內沒有完成,join()也會返回,此時daemon程序會繼續執行,不會終結。情況和threading一樣。
終止程序(P.terminate())
儘管最好使用“毒丸”(posion pill)方法向程序發出訊號讓其退出,但是如果一個程序看起來已經掛起或者陷入死鎖,則需要能夠強制性的結束。對一個程序呼叫terminate()會結束子程序。import multiprocessing
import time
def slow_worker():
print 'Starting worker'
time.sleep(10)
print 'finished worker'
if __name__ == '__main__':
p = multiprocessing.Process(target = slow_worker)
print "Befor:", p, p.is_alive()
p.start()
print 'During:', p, p.is_alive()
p.terminate()
print 'Terminate:', p, p.is_alive()
p.join()
print 'Joined:', p, p.is_alive()
Befor: <Process(Process-1, initial)> False
During: <Process(Process-1, started)> True
Terminate: <Process(Process-1, started)> True
Joined: <Process(Process-1, stopped[SIGTERM])> False
注意:終止程序後要使用join()退出程序,使程序管理程式碼有時間更新物件的狀態,以反映程序已經終止。
程序的退出狀態(P.exitcode)
程序退出時聲稱的狀態碼可以通過exitcode屬性訪問。狀態碼的範圍是:- == 0 : 未生成任何錯誤
- >0 : 程序有一個錯誤,並以該錯誤碼退出
- <0 :程序由一個-1*exitcode訊號結束
2. 日誌
除錯併發問題時,能夠訪問multiprocessing提供的物件的內部狀態很有用。可以使用一個方便的模組級函式來啟動日誌記錄,名為log_to_stderr()。它使用logging建立一個日誌記錄器物件,並增加一個處理程式,使得日誌訊息將傳送到標準錯誤通道,日誌預設格式為 '[%(levelname)s/%(processName)s] %(message)s’。import multiprocessing
import logging
import sys
def worker():
print "Doing some work"
sys.stdout.flush()
if __name__ == '__main__':
multiprocessing.log_to_stderr(logging.DEBUG)
p = multiprocessing.Process(target = worker)
p.start()
p.join()
預設情況下,日誌級別設定為NOTSET,即不產生任何訊息。通過傳入一個不同的日誌級別,可以初始化日誌記錄器,制定所需的詳細程度。
[INFO/Process-1] child process calling self.run()
Doing some work
[INFO/Process-1] process shutting down
[DEBUG/Process-1] running all "atexit" finalizers with priority >= 0
[DEBUG/Process-1] running the remaining "atexit" finalizers
[INFO/Process-1] process exiting with exitcode 0
[INFO/MainProcess] process shutting down
[DEBUG/MainProcess] running all "atexit" finalizers with priority >= 0
[DEBUG/MainProcess] running the remaining "atexit" finalizers
要直接處理日誌記錄器(修改日誌級別或新增處理程式),可以使用get_logger()。
import multiprocessing
import logging
import sys
def worker():
print 'Doing some work'
sys.stdout.flush()
if __name__ == '__main__':
multiprocessing.log_to_stderr()
logger = multiprocessing.get_logger()
logger.setLevel(logging.INFO)
p = multiprocessing.Process(target = worker)
p.start()
p.join()
[INFO/Process-1] child process calling self.run()
Doing some work
[INFO/Process-1] process shutting down
[INFO/Process-1] process exiting with exitcode 0
[INFO/MainProcess] process shutting down3. 派生程序
名字聽上去挺唬人的,其實就是把multiprocessing.Process作為基類來建立新的class。import multiprocessing
class Worker(multiprocessing.Process):
def run(self):
print "In %s' % self.name
return
if __name__ == '__main__':
jobs = []
for i in range(5):
p = Worker()
jobs.append(p)
p.start()
for j in jobs:
j.join()
In Worker-1
In Worker-2
In Worker-3
In Worker-5
In Worker-4派生程序應當覆蓋run()來完成工作
4. 訊息傳遞
類似於執行緒,對於多個程序,一種常用的模式是將一個工作劃分為多個工作程序並行地執行。要想有效地使用多個程序,通常它們之間有某種通訊,這樣才能有效分解工作,並完成結果的彙總。利用multiprocessing完成程序通訊的一種簡單方法是使用一個Queue來傳遞訊息。能夠用pickle序列化的任何物件都可以通過Queue傳遞。import multiprocessing
class MyFancyClass(object):
def __init__(self, name):
self.name = name
def do_something(self):
proc_name = multiprocessing.current_process().name
print 'Doing something fancy in %s for %s' % (proc_name, self.name)
def worker(q):
obj = q.get() # get a object from the queue
obj.do_something() # doing the task of the object
if __name__ == '__main__':
queue = multiprocessing.Queue() # the queue
p = multiprocessing.Process(target=worker, args=(queue,))
p.start()
queue.put(MyFancyClass('Fancy Dan')) # put one object to the queue
# Wait for the worker to finish
queue.close()
queue.join_thread()
p.join()
這個例子只是像一個工作程序傳遞一個訊息,然後主程序等待這個工作程序完成。結果為:
Doing something fancy in Process-1 for Fancy Dan5. 執行緒池
Multiprocessing同樣提供了執行緒池,可設定執行緒池大小等。from multiprocessing import Pool
def my_func(x):
print x**2
pool = Pool(processes=5)
target = range(10)
pool.map(my_func, target)
上述程式碼定義了一個5個執行緒大小的執行緒池,用以計算每個數的平方。