1. 程式人生 > 其它 >python 包之 multiprocessing 多程序教程

python 包之 multiprocessing 多程序教程

一、建立一個程序

  • 例項化 Process 類建立一個程序物件

  • 然後呼叫它的 start 方法即可生成一個子程序

from multiprocessing import Process

def func(s):
  print(s)

if __name__ == '__main__':
  p = Process(target=func, args=('autofelix', ))
  p.start()
  p.join()

 

二、建立多個程序

from multiprocessing import Process

def func(s):
  print(s)

if __name__ == '__main__':
  process = [
  	Process(target=func, args=('1', ))
    Process(target=func, args=('2', ))
  ]
  
  [p.start() for p in process]
  [p.join() for p in process]

 

三、管道pipe進行程序間通訊

  • Pipe(duplex=True):表示雙工通訊,也就是雙向的,既可以接受也可以傳送資料,預設為True

  • Pipe(duplex=False):表示單工通訊,也就是單向的,只能進行接受或者傳送資料

from multiprocessing import Process, Pipe

def func(conn):
  print('send a list object ot other side...')
  # 從管道物件的一端傳送資料物件
  conn.send(['33', 44, None])
  conn.close()

if __name__ == '__main__':
  # 預設建立一個雙工管道物件,返回的兩個物件代表管道的兩端,
  # 雙工表示兩端的物件都可以傳送和接收資料,但是需要注意,
  # 需要避免多個程序或執行緒從一端同時讀或寫資料
  parent_conn, child_conn = Pipe()
  p = Process(target=func, args=(child_conn, ))
  p.start()
  # 從管道的另一端接收資料物件
  print(parent_conn.recv())
  p.join()

 

四、佇列Queue進行程序間通訊

  • 當向佇列中放入的資料較大時,就會在join()處卡死

  • 為了避免這種情況,常的做法是先使用get()將資料取出來,再使用join()方法

  • 如果不這樣處理,佇列程序將不能正常終止,造成死鎖情況

from multiprocessing import Process, Queue

def func(q):
  print('put a list object to queue...')
  # 向Queue物件中新增一個物件
  q.put(['33', 44, None])

if __name__ == '__main__':
  # 建立一個佇列
  q = Queue()
  p = Process(target=func, args=(q, ))
  p.start()
  # 從Queue物件中獲取一個物件
  print(q.get())
  p.join()

 

五、程序間同步

  • 使用鎖保證程序間的同步操作

from multiprocessing import Process, Lock

def func(lc, num):
  # 使用鎖保證以下程式碼同一時間只有一個程序在執行
  lc.acquire()
  print('process num: ', num)
  lc.release()

if __name__ == '__main__':
  lock = Lock()
  for i in range(5):
      Process(target=func, args=(lock, i)).start()

 

六、程序間共享資料

  • 使用共享記憶體的方式,共享值Value物件和資料Array物件

from multiprocessing import Process, Value, Array

def func(n, a):
  n.value = 3.333
  for i in range(len(a)):
      a[i] = -a[i]

if __name__ == '__main__':
  # 第一個引數d表示資料型別'double'雙精度浮點型別
  num = Value('d', 0.0)
  # 第一個引數i表示資料型別'integer'整型
  arr = Array('i', range(6))
  p = Process(target=func, args=(num, arr))
  p.start()
  p.join()
  print(num.value)
  print(arr[:])

 

七、程序池

  • 建立一個 Pool 程序池物件,並執行提交給它的任務

  • 程序池物件允許其中的程序以不同的方式執行

  • 但是需要注意,Pool 物件的方法只能是建立它的程序才能呼叫

from multiprocessing import Pool
import time

def f(x):
    return x * x

if __name__ == '__main__':
    with Pool(processes=4) as pool:  # start 4 worker processes
      	# 在程序池中開啟一個新的程序並執行 f 函式
        result = pool.apply_async(f, (10,))  # evaluate "f(10)" asynchronously in a single process
        print(result.get(timeout=1))  # prints "100" unless your computer is *very* slow
				
        # map會一直阻塞當前程序直到執行完可迭代物件中的所有元素,並返回結果。
        print(pool.map(f, range(10)))  # prints "[0, 1, 4,..., 81]"
				
        # imap是map方法的延遲執行版本,對於比較消耗記憶體的迭代,建議使用這個方法,
        it = pool.imap(f, range(10))
        print(next(it))  # prints "0"
        print(next(it))  # prints "1"
        print(it.next(timeout=1))  # prints "4" unless your computer is *very* slow

        result = pool.apply_async(time.sleep, (10,))
        print(result.get(timeout=1))  # raises multiprocessing.TimeoutError