multiprocessing多程序模組
前言
其實multiprocessing
模組與threading
模組的介面都非常相似,但是有一些地方有一些細微差別。所以本文是基於前面的threading
模組的一些知識對multiprocessing
模組進行講解的。
他們的主要區別有以下幾點
1.建立子程序的方式針對不同平臺有著差異化
2.關於守護執行緒的設定介面是
setDaemon(True)
,而關於守護程序的介面是deamon = True
3.
multiprocessing
模組下的獲取程序名與設定程序名沒有threading
模組下的getName()
和setName()
,而是直接採取屬性name
進行操作4.多程序中資料共享不能使用普通的
queue
模組下提供的佇列進行資料共享,而應使用multiprocessing
中提供的Queue
5.
multiprocessing
模組下中提供的Queue
先進先出佇列沒有task_done()
與join()
,他們都在JoinableQueue
中,並且該模組下沒有提供LifoQueue
後進先出佇列與PriorityQueue
優先順序佇列
多程序與多執行緒工作的區別
多執行緒工作方式
多執行緒的工作方式實際上在第一篇的時候,我們已經說過了。因為執行緒必須存在於程序之中,是最小的執行單元,所以你可以將它如此理解:
其實就是不斷的往程序這個小房間加人,那麼它的優點如下:
開一條新的執行緒比開一條新的程序開銷要小很多
並且對於執行緒的切換來說代價也要小很多
多條執行緒共有該程序下的所有資源,資料共享比較容易實現
而CPython由於GIL鎖的設定,所以它的多執行緒是殘缺不全的,因此在很多時候我們依然要用到多程序,雖然這種情況比較少。
多程序工作方式
其實就是不斷的造出一模一樣的小房間,那麼它的優點如下:
雖然說,新開一條程序比新開一條執行緒的代價大很多,但是由於CPython中GIL鎖的設定想在多執行緒的情況下實現並行是不可能的,只有多程序才能夠實現並行。
可以說是唯一優點了,但是我們依然要學習一下multiprocessing
模組,它的學習代價並不是很大,所以接下來正式進入multiprocessing
模組的學習。
基本使用
針對不同平臺的程序啟動方式
對於程序啟動方式來說,其實multiprocessing
模組中對於不同平臺下有不同的啟動方式。如下:
spawn
:這玩意兒相當於建立了一個新的直譯器程序,對比其他兩種方法,這種方法速度上比較慢,但是它是Windows平臺下預設的啟動方式(Unix系統下可用)。並且在windows平臺下,我們應該在if __name__ == '__main__'
下進行新程序的啟動。但是我依然認為不管在哪個平臺下不論執行緒還是程序都應該在if __name__ == '__main__'
這條語句下啟動。
fork
:這種啟動方式是通過os.fork()
來產生一個新的直譯器分叉,是Unix系統的預設啟動方式。
forkserver
:這個我也看不太明白,直接把官方文件搬過來。如果有懂的大神可以解釋一下。程式啟動並選擇
forkserver
啟動方法時,將啟動伺服器程序。從那時起,每當需要一個新程序時,父程序就會連線到伺服器並請求它分叉一個新程序。分叉伺服器程序是單執行緒的,因此使用 可在Unix平臺上使用,支援通過Unix管道傳遞檔案描述符。
import multiprocessing as mp def foo(q): q.put('hello') if __name__ == '__main__': # <--- 強烈注意!在windows平臺下開多程序一定要在該條語句之下,否則會丟擲異常!! mp.set_start_method('spawn') # 選擇啟動方式 q = mp.Queue() # 例項化出用於程序間資料共享的管道 p = mp.Process(target=foo, args=(q,)) p.start() # 啟動程序任務,等待CPU排程執行 print(q.get()) # 從管道中拿出資料 p.join() # 阻塞至子程序執行完畢
例項化Process類建立子程序
其實感覺上面的方法都已經將本章要寫的內容舉例了一個七七八八,但是我們接著往下看。與threading
模組中建立多執行緒的方式一樣,multiprocessing
模組建立多程序的方式也有兩種,所以我們將之前的示例拿過來直接改一改就好。
import multiprocessing import time print("主程序任務開始處理") def task(th_name): print("子程序任務開始處理,引數:{0}".format(th_name)) time.sleep(3) print("子程序任務處理完畢") if __name__ == '__main__': # <--- Windows平臺下必須在該條語句下執行 # ==== 例項化出Process類並新增子程序任務以及引數 ==== p1 = multiprocessing.Process(target=task, args=("程序[1]",)) # <-- 引數必須新增逗號。因為是args所以會打散,如果不加逗號則不能進行打散會丟擲異常 p1.start() # 等待CPU排程..請注意這裡不是立即執行 print("主程序任務處理完畢") # ==== 執行結果 ==== """ 主程序任務開始處理 主程序任務處理完畢 主程序任務開始處理 子程序任務開始處理,引數:程序[1] 子程序任務處理完畢 """
這裡我們看執行結果,主程序任務開始處理
列印了兩次,而主程序任務處理完畢
列印了一次,這是為什麼呢?由於我們是在Windows
平臺下,所以它預設的程序啟動方式為spawn
,即建立了一個新的直譯器程序並開始執行,所以上面的主程序任務開始處理
就列印了兩次,一次是主程序,一次是新建立的子程序。而下面由於if __name__ == '__main__':
這條語句,子程序並不會執行該語句下面的程式碼塊,所以主程序任務處理完畢
就只打印了一次。
自定義類繼承Process並覆寫run方法
import multiprocessing import time print("主程序任務開始處理") class Processing(multiprocessing.Process): """自定義類""" def __init__(self, th_name): self.th_name = th_name super(Processing, self).__init__() def run(self): print("子程序任務開始處理,引數:{0}".format(self.th_name)) time.sleep(3) print("子程序任務處理完畢") if __name__ == '__main__': p1 = Processing("程序[1]") p1.start() # 等待CPU排程..請注意這裡不是立即執行 print("主程序任務處理完畢") # ==== 執行結果 ==== """ 主程序任務開始處理 主程序任務處理完畢 主程序任務開始處理 子程序任務開始處理,引數:程序[1] 子程序任務處理完畢 """
multiprocessing方法大全
multiprocessing
模組中的方法參考了thrading
模組中的方法。但是我們一般用下面兩個方法就夠了,他們都可以拿到具體的程序物件。
multiprocessing模組方法大全 | |
---|---|
方法/屬性名稱 | 功能描述 |
multiprocessing.active_children() | 檢視當前程序存活了的所有子程序物件,以列表形式返回。 |
multiprocessing.current_process() | 獲取當前程序物件。 |
程序物件方法大全
程序物件方法大全(即Process類的例項物件) | |
---|---|
方法/屬性名稱 | 功能描述 |
start() | 啟動程序,該方法不會立即執行,而是告訴CPU自己準備好了,可以隨時排程,而非立即啟動。 |
run() | 一般是自定義類繼承Process 類並覆寫的方法,即執行緒的詳細任務邏輯。 |
join(timeout=None) | 主程序預設會等待子程序執行結束後再繼續執行,timeout 為等待的秒數,如不設定該引數則一直等待。 |
name | 可以通過 = 給該程序設定一個通俗的名字。如直接使用該屬性則返回該程序的預設名字。 |
is_alive() | 檢視程序是否存活,返回布林值。 |
daemon | 可以通過 = 給該程序設定一個守護程序。如直接使用該屬性則是檢視程序是否為一個守護程序,返回布林值。預設為False 。 |
pid | 返回程序ID。在生成該程序之前,這將是 None 。 |
exitcode | 子程序的退出程式碼。如果程序尚未終止,這將是 None 。負值 -N 表示子程序被訊號 N 終止。 |
authkey | 程序的身份驗證金鑰(位元組字串)。 |
sentinel | 系統物件的數字控制代碼,當程序結束時將變為 "ready" 。 |
terminate() | 終止程序。 |
kill() | 同上 |
close() | 關閉 |
注意 |
程序物件的好夥伴(即Process類的例項物件) | |
---|---|
os.getpid() | 返回程序ID。 |
守護程序daemon
import multiprocessing import time print("主程序任務開始處理") def task(th_name): print("子程序任務開始處理,引數:{0}".format(th_name)) time.sleep(3) print("子程序任務處理完畢") if __name__ == '__main__': p1 = multiprocessing.Process(target=task, args=("程序[1]",)) p1.daemon = True # <-- 設定程序物件p1為守護程序,注意這一步一定要放在start之前。 p1.start() # 等待CPU排程..請注意這裡不是立即執行 time.sleep(2) print("主程序任務處理完畢") # ==== 執行結果 ==== # print("子程序任務處理完畢") 可以看到該句沒有執行 """ 主程序任務開始處理 主程序任務開始處理 子程序任務開始處理,引數:程序[1] 主程序任務處理完畢 """
設定與獲取程序名
import multiprocessing import time print("主程序任務開始處理") def task(th_name): print("子程序任務開始處理,引數:{0}".format(th_name)) obj = multiprocessing.current_process() # 獲取當前程序物件 print("獲取當前的程序名:{0}".format(obj.name)) print("開始設定程序名") obj.name = "yyy" print("獲取修改後的程序名:{0}".format(obj.name)) time.sleep(3) print("子程序任務處理完畢") if __name__ == '__main__': # ==== 第一步:例項化出Process類並新增子程序任務以及引數 ==== t1 = multiprocessing.Process(target=task, args=("程序[1]",),name="xxx") t1.start() # 等待CPU排程..請注意這裡不是立即執行 print("主程序名:",multiprocessing.current_process().name) # 直接使用屬性 name print("主程序任務處理完畢") # ==== 執行結果 ==== """ 主程序任務開始處理 主程序名: MainProcess 主程序任務處理完畢 主程序任務開始處理 子程序任務開始處理,引數:程序[1] 獲取當前的程序名:xxx 開始設定程序名 獲取修改後的程序名:yyy 子程序任務處理完畢 """
鎖相關演示
鎖的使用和threading
模組中鎖的使用相同,所以我們舉例一個Lock
鎖即可。
import multiprocessing lock = multiprocessing.Lock() # 例項化同步鎖物件 # 注意!!! 在Windows平臺下,我們應該將鎖的例項化放在上面,這樣子程序才能拿到鎖物件。否則就會丟擲異常!!!或者也可以將鎖物件傳入當做形參進行傳入,二者選其一 num = 0 def add(): lock.acquire() # 上鎖 global num for i in range(10000000): # 一千萬次 num += 1 lock.release() # 解鎖 def sub(): lock.acquire() # 上鎖 global num for i in range(10000000): # 一千萬次 num -= 1 lock.release() # 解鎖 if __name__ == '__main__': t1 = multiprocessing.Process(target=add, ) t2 = multiprocessing.Process(target=sub, ) t1.start() t2.start() t1.join() t2.join() print("最終結果:", num) # ==== 執行結果 ==== 三次採集 """ 最終結果: 0 最終結果: 0 最終結果: 0 """
from multiprocessing import Process, Lock def f(l, i): l.acquire() try: print('hello world', i) finally: l.release() if __name__ == '__main__': lock = Lock() # 將鎖例項化後傳入 for num in range(10): Process(target=f, args=(lock, num)).start()將鎖當做引數傳入
三種程序資料共享的方式
multiprocessing.Queue
這裡一定要使用multiprocessing
中的Queue
,如果你想用佇列中的task_done()
與join()
方法,你應該匯入JoinableQueue
這個佇列。
multiprocessing.Queue方法大全 | |
---|---|
方法名稱 | 功能描述 |
Queue.qsize() | 返回當前佇列的大小 |
Queue.empty() | 判斷當前佇列是否為空 |
Queue.full() | 判斷當前佇列是否已滿 |
Queue.put(item, block=True, timeout=None) | 將item 放入佇列中,block 引數為如果要操作的佇列目前已滿是否阻塞,timeout 為超時時間。 |
Queue.put_nowait(item) | 相當於 put(item, False) ,如果操作的佇列已滿則不進行阻塞,而是丟擲Full 異常。 |
Queue.get(block=True, timeout=None) | 將專案從佇列中取出,block 引數為如果要操作的佇列目前為空是否阻塞,timeout 為超時時間。 |
Queue.get_nowait() | 相當於 get(False) ,如果要操作的佇列為空則不進行阻塞,而是丟擲Empty 異常。 |
Queue.close() | 指示當前程序將不會再往佇列中放入物件。一旦所有緩衝區中的資料被寫入管道之後,後臺的執行緒會退出。這個方法在佇列被gc 回收時會自動呼叫。 |
Queue.join_thread() | 等待後臺執行緒。這個方法僅在呼叫了 |
Queue.cancel_join_thread() | 防止 |
程序佇列multiprocessing.Queue
不同於執行緒佇列queue.Queue
,程序佇列的消耗和底層實現比執行緒佇列的要複雜許多。還是因為各程序之間不能共享任何資料,所以只能通過對映的方式來傳遞資料。程序佇列multiprocessing.Queue
作為資料安全型別的資料結構,放在多程序中做通訊使用是非常合適的,但是同時它的消耗也是非常大的,能不使用則儘量不要使用。
import time import multiprocessing from multiprocessing import Queue,JoinableQueue def task_1(q): print("正在裝東西..") time.sleep(3) q.put("玫瑰花") # 正在裝東西 q.task_done() # 通知對方可以取了 def task_2(q): q.join() # 阻塞等待通知,接到通知說明佇列裡裡有東西了。 print("取到了",q.get()) # 取東西 if __name__ == '__main__': q = JoinableQueue(maxsize=5) # 例項化佇列 t1 = multiprocessing.Process(target=task_1,args=(q,),name="小明") # 將佇列傳進子程序任務中 t2 = multiprocessing.Process(target=task_2,args=(q,),name="小花") t1.start() t2.start() # ==== 執行結果 ==== """ 正在裝東西.. 取到了 玫瑰花 """程序佇列Queue實現程序間的資料共享
什麼執行緒佇列queue.Queue
不能做到程序間資料共享呢,這是因為程序佇列multiprocessing.Queue
會採取一種對映的方式來同步資料,所以說程序佇列的資源消耗比執行緒佇列要龐大很多。執行緒中所有資訊共享,所以執行緒佇列根本不需要對映關係。程序佇列只是告訴你可以這樣使用它達到程序間的資料共享,但是並不推薦你濫用它。
multiprocessing.Pipe
除開使用程序佇列來實現程序間的通訊,multiprocessing
還提供了Pipe
管道來進行通訊。他的資源消耗較少並且使用便捷,但是唯一的缺點便是只支援點對點。
Pipe
有點類似socket
通訊。但是比socket
通訊更加簡單,它不需要去做字串處理位元組,先來看一個例項:
import multiprocessing from multiprocessing import Pipe def task_1(conn1): conn1.send("hello,我是task1") print(conn1.recv()) def task_2(conn2): print(conn2.recv()) conn2.send("我收到了,我是task2") if __name__ == '__main__': conn1,conn2 = Pipe() # 建立兩個電話 p1 = multiprocessing.Process(target=task_1,args=(conn1,)) # 一人一部電話 p2 = multiprocessing.Process(target=task_2,args=(conn2,)) p1.start() p2.start() p1.join() p2.join() # ==== 執行結果 ==== """ hello,我是task1 我收到了,我是task2 """Pipe實現程序間的資料共享
multiprocessing.Mangaer
除了程序佇列multiprocessing.Queue
,管道Pipe
,multiprocessing
還提供了Manager
作為共享變數來提供使用,但是這種方式是不應該被直接使用的因為它本身相較於程序佇列Queue
是資料不安全的。當多個程序同時修改一個共享變數勢必導致結果出現問題,所以要想使用共享變數還得使用multiprocessin
提供的程序鎖才行。
Manager
類是資料不安全的;
Mangaer
類支援的型別非常多,如:value
, Array
, List
, Dict
, Queue(程序池通訊專用)
, Lock
等。
Mangaer
實現了上下文管理器,可使用with
語句建立多個物件。具體使用方法我們來看一下:
import multiprocessing from multiprocessing import Manager def task_1(dic): dic["task_1"] = "大帥哥" def task_2(dic): dic["task_2"] = "大美女" print(dic.get("task_1")) if __name__ == '__main__': with Manager() as m: # !!!!! 注意 !!!!!!! 如果對 Manager()中的資料型別進行頻繁的操作,而程序又特別多的時候,請使用 Rlock 鎖進行處理,這有可能引發執行緒不安全!!! dic = m.dict() # 例項化出了一個字典,除此之外還有很多其他的資料型別 p1 = multiprocessing.Process(target=task_1,args=(dic,)) # 將字典傳進來 p2 = multiprocessing.Process(target=task_2,args=(dic,)) p1.start() # 啟動一定要放在with之後 p2.start() p1.join() p2.join() # ==== 執行結果 ==== """ 大帥哥 """Manager實現程序間的資料共享
import multiprocessing from multiprocessing import Manager def task_1(dic): for i in range(1000): dic["count"] += 1 def task_2(dic): for i in range(1000): dic["count"] -= 1 if __name__ == '__main__': with Manager() as m: # !!!!! 注意 !!!!!!! 如果對 Manager()中的資料型別進行頻繁的操作,而程序又特別多的時候,請使用 Rlock 鎖進行處理,這有可能引發執行緒不安全!!! dic = m.dict({"count":0}) # 例項化出了一個字典,除此之外還有很多其他的資料型別 p1 = multiprocessing.Process(target=task_1,args=(dic,)) # 傳字典 p2 = multiprocessing.Process(target=task_2,args=(dic,)) p1.start() p2.start() p1.join() p2.join() print(dic) # ==== 執行結果 ==== """ {'count': -23} """程序安全問題
import multiprocessing from multiprocessing import Manager from multiprocessing import RLock def task_1(dic,lock): with lock: for i in range(1000): dic["count"] += 1 def task_2(dic,lock): with lock: for i in range(1000): dic["count"] -= 1 if __name__ == '__main__': lock = RLock() # 例項化鎖 with Manager() as m: # !!!!! 注意 !!!!!!! 如果對 Manager()中的資料型別進行頻繁的操作,而程序又特別多的時候,請使用 Rlock 鎖進行處理,這有可能引發執行緒不安全!!! dic = m.dict({"count":0}) # 例項化出了一個字典,除此之外還有很多其他的資料型別 p1 = multiprocessing.Process(target=task_1,args=(dic,lock,)) # 傳字典,傳鎖 p2 = multiprocessing.Process(target=task_2,args=(dic,lock,)) p1.start() p2.start() p1.join() p2.join() print(dic) # ==== 執行結果 ==== """ {'count': 0} """實用Rlock鎖解決程序安全問題