Python並行程式設計(十二):程序同步
1、基本概念
多個程序可以協同工作來完成一項任務,通常需要共享資料。所以在多程序之間保持資料的一致性就很重要,需要共享資料協同的程序必須以適當的策略來讀寫資料。同步原語和執行緒的庫類似。
- Lock:一個Lock物件有兩個方法acquire和release來控制共享資料的讀寫許可權。
- Event:一個程序發事件的訊號,另一個程序等待事件的訊號。Event物件有兩個方法set和clear來管理自己內部的變數。
- Condition:此物件用來同步部分工作流程,在並行的程序中,有兩個基本的方法,wait()用來等待程序,notify_all用來通知所有等待此條件的程序。
- Semaphore:用來共享資源,比如:支援固定資料的共享連線。
- RLock:遞迴鎖物件,其用途和方法同Threading模組一樣。
- Barrier:將程式分成幾個階段,適用於有些程序必須在某些特性程序之後執行,處於Barrier之後的程式碼不能同處於Barrier之前的程式碼並行。
2、測試用例
使用barrier函式來同步兩個程序
import multiprocessing from multiprocessing import Barrier, Lock, Process from time import time fromdatetime import datetime def test_with_barrier(synchronizer, serializer): name = multiprocessing.current_process().name synchronizer.wait() now = time() with serializer: print("process %s ----> %s" %(name, datetime.fromtimestamp(now))) def test_without_barrier(): name= multiprocessing.current_process().name now = time() print("process %s ----> %s" %(name, datetime.fromtimestamp(now))) if __name__ == "__main__": # create a barrier and lock. synchronizer = Barrier(2) serializer = Lock() # create four processes Process(name='p1 - test_with_barrier', target=test_with_barrier, args=(synchronizer, serializer)).start() Process(name='p2 - test_with_barrier', target=test_with_barrier, args=(synchronizer, serializer)).start() Process(name='p3 - test_without_barrier', target=test_without_barrier).start() Process(name='p4 - test_without_barrier', target=test_without_barrier).start()
執行結果如下:
test_with_barrier函式呼叫了barrier的wait()方法,當兩個程序都呼叫wait()方法時,他們會一起繼續執行。
3、程序之間管理狀態
Python的多程序模組提供了在所有的使用者間管理共享資訊的管理者(Manager),一個管理者物件控制著持有Python物件的服務程序,並允許其他程序操作共享物件。
管理者特性:
- 它控制著管理共享物件的服務程序
- 它確保當某一程序修改了共享物件之後,所有的程序拿到的共享物件都得到了更新。
程式碼示例:
import multiprocessing def worker(dictionary, key, item): dictionary[key] = item print("key = %d value = %d" %(key, item)) if __name__ == "__main__": mgr = multiprocessing.Manager() dictionary = mgr.dict() jobs = [multiprocessing.Process(target=worker, args=(dictionary, i, i*2)) for i in range(10)] for j in jobs: j.start() for j in jobs: j.join() print("Results:",dictionary)
執行結果:
上述程式碼建立了一個管理者字典dictionary,在n個job之間共享,每個job都會更新字典的某一個index,所有的job完成之後,最後列印該字典,所有資料均存在。