python 包之 multiprocessing 多程序教程
阿新 • • 發佈:2022-03-31
一、建立一個程序
-
例項化 Process 類建立一個程序物件
-
然後呼叫它的 start 方法即可生成一個子程序
from multiprocessing import Process
def func(s):
print(s)
if __name__ == '__main__':
p = Process(target=func, args=('autofelix', ))
p.start()
p.join()
二、建立多個程序
from multiprocessing import Process def func(s): print(s) if __name__ == '__main__': process = [ Process(target=func, args=('1', )) Process(target=func, args=('2', )) ] [p.start() for p in process] [p.join() for p in process]
三、管道pipe進行程序間通訊
-
Pipe(duplex=True):表示雙工通訊,也就是雙向的,既可以接受也可以傳送資料,預設為True
-
Pipe(duplex=False):表示單工通訊,也就是單向的,只能進行接受或者傳送資料
from multiprocessing import Process, Pipe def func(conn): print('send a list object ot other side...') # 從管道物件的一端傳送資料物件 conn.send(['33', 44, None]) conn.close() if __name__ == '__main__': # 預設建立一個雙工管道物件,返回的兩個物件代表管道的兩端, # 雙工表示兩端的物件都可以傳送和接收資料,但是需要注意, # 需要避免多個程序或執行緒從一端同時讀或寫資料 parent_conn, child_conn = Pipe() p = Process(target=func, args=(child_conn, )) p.start() # 從管道的另一端接收資料物件 print(parent_conn.recv()) p.join()
四、佇列Queue進行程序間通訊
-
當向佇列中放入的資料較大時,就會在join()處卡死
-
為了避免這種情況,常的做法是先使用get()將資料取出來,再使用join()方法
-
如果不這樣處理,佇列程序將不能正常終止,造成死鎖情況
from multiprocessing import Process, Queue def func(q): print('put a list object to queue...') # 向Queue物件中新增一個物件 q.put(['33', 44, None]) if __name__ == '__main__': # 建立一個佇列 q = Queue() p = Process(target=func, args=(q, )) p.start() # 從Queue物件中獲取一個物件 print(q.get()) p.join()
五、程序間同步
-
使用鎖保證程序間的同步操作
from multiprocessing import Process, Lock
def func(lc, num):
# 使用鎖保證以下程式碼同一時間只有一個程序在執行
lc.acquire()
print('process num: ', num)
lc.release()
if __name__ == '__main__':
lock = Lock()
for i in range(5):
Process(target=func, args=(lock, i)).start()
六、程序間共享資料
-
使用共享記憶體的方式,共享值Value物件和資料Array物件
from multiprocessing import Process, Value, Array
def func(n, a):
n.value = 3.333
for i in range(len(a)):
a[i] = -a[i]
if __name__ == '__main__':
# 第一個引數d表示資料型別'double'雙精度浮點型別
num = Value('d', 0.0)
# 第一個引數i表示資料型別'integer'整型
arr = Array('i', range(6))
p = Process(target=func, args=(num, arr))
p.start()
p.join()
print(num.value)
print(arr[:])
七、程序池
-
建立一個 Pool 程序池物件,並執行提交給它的任務
-
程序池物件允許其中的程序以不同的方式執行
-
但是需要注意,Pool 物件的方法只能是建立它的程序才能呼叫
from multiprocessing import Pool
import time
def f(x):
return x * x
if __name__ == '__main__':
with Pool(processes=4) as pool: # start 4 worker processes
# 在程序池中開啟一個新的程序並執行 f 函式
result = pool.apply_async(f, (10,)) # evaluate "f(10)" asynchronously in a single process
print(result.get(timeout=1)) # prints "100" unless your computer is *very* slow
# map會一直阻塞當前程序直到執行完可迭代物件中的所有元素,並返回結果。
print(pool.map(f, range(10))) # prints "[0, 1, 4,..., 81]"
# imap是map方法的延遲執行版本,對於比較消耗記憶體的迭代,建議使用這個方法,
it = pool.imap(f, range(10))
print(next(it)) # prints "0"
print(next(it)) # prints "1"
print(it.next(timeout=1)) # prints "4" unless your computer is *very* slow
result = pool.apply_async(time.sleep, (10,))
print(result.get(timeout=1)) # raises multiprocessing.TimeoutError