網絡編程——進程同步
阿新 • • 發佈:2018-05-11
通信 發送 行修改 信號量 targe 判斷 error main lock
鎖——multiprocess.Lock:
加鎖可以保證多個進程修改同一塊數據時,同一時間只能有一個任務可以進行修改,即串行的修改,會犧牲了速度卻保證了數據安全。
雖然可以用文件共享數據實現進程間通信,但問題是:
1,效率低。2,需要自己加鎖處理。
multiprocess模塊為我們提供的基於消息的IPC通信機制:隊列和管道。
隊列和管道都是將數據存放於內存中:
隊列又是基於(管道+鎖)實現的,可以讓我們從復雜的鎖問題中解脫出來,我們應該避免使用共享數據,盡可能使用消息傳遞和隊列,避免處理復雜的同步和鎖問題,而且在進程數目增多時,往往可以獲得更好的可擴展性。
importos import time import random from multiprocessing import Lock from multiprocessing import Process def work(n,lock): lock.acquire() print(‘%s:%s is running‘ % (n,os.getpid())) time.sleep(random.random()) print(‘%s:%s is done‘ % (n,os.getpid())) lock.release() if __name__ == ‘__main__‘: lock = Lock() for i in range(10): p = Process(target=work,args=(i,lock)) p.start() # 同步控制 # 只要用到了鎖,鎖之內的代碼就會變成同步的了 # 鎖:控制一段代碼,同一時間 只能被一個進程執行
import json import time import random from multiprocessing import Lock from multiprocessing import Process def check_ticket(i): with open(模擬搶票程序‘ticket‘) as f: ticket_count = json.load(f) # 通過json獲取文件中的信息 print(‘person%s查詢當前余票:‘% i, ticket_count[‘count‘]) def buy_ticket(i,lock): check_ticket(i) # 先進行查票操作 lock.acquire() # 得到鑰匙,進入程序 with open(‘ticket‘) as f: ticket_count = json.load(f) # 這一步是為了再次判斷是否還有余票 time.sleep(random.random()) if ticket_count[‘count‘]>0: print(‘person%s購票成功‘% i) ticket_count -= 1 # 字典的賦值 else: print(‘余票不足,person%s購票失敗‘% i) time.sleep(random.random()) with open(‘ticket‘,‘w‘)as f: json.dump(ticket_count,f) # 通過json.dump將字典轉化成字符串形式,然後寫入文件。 lock.release() # 歸還鑰匙 if __name__ == ‘__main__‘: lock = Lock() for i in range(10): Process(target=buy_ticket,args = (i,lock)).start()
信號量——multiprocess.Semaphore(了解)
互斥鎖同時只允許一個線程更愛數據,而信號量Semaphore是同時允許一定數量的線程更改數據。
信號量同步基於內部計數器,每調用一次acquire(),計數器減1,每調用一次release(),計數器加1,當計數器為0時,acquire()調用被阻塞,這是迪科斯徹信號量概念p()和v()的python實現,信號量同步機制適用於訪問像服務器這樣的有限資源。
信號量與進程池的概念很像,但是要區分開,信號量涉及到枷鎖的概念。
信號量介紹
import time import random from multiprocessing import Process from multiprocessing import Semaphore def ktv(i,sema): sema.acquire() print(‘person%s 進來唱歌了‘% i) time.sleep(random.randint(1,5)) print(‘person%s 從ktv出去了‘% i) sema.release() if __name__ == ‘__main__‘: sema = Semaphore(3) for i in range(5): Process(target=ktv,args=(i,sema)).start() # Semaphore 就是鎖+計數器 # acquire() 計數器-1 # release() 計數器+1 # 當計數器為0,acquire()就會阻塞實例
事件——multiprocess.Event (了解)
python線程的時間用於主線程控制其他線程的執行,事件主要提供了三個方法:set(),wait(),clear().
事件處理的機制,全局定義了一個flag,如果flag值為False,那麽當程序執行,event.wait()方法時就會阻塞,如果flag值為True,那麽event.wait 方法時便不再阻塞。
clear:將flag設置為False.
set:將flag設置為True.
Event介紹
import time import json import random from multiprocessing import Event from multiprocessing import Process def car(i,e): if not e.is_set(): print(‘car%s正在等待‘% i) e.wait() print(‘car%s正在通過‘% i) def traffic_light(e): print(‘\033[1;31m紅燈亮了\033[0m‘) time.sleep(2) while True: if not e.is_set(): print(‘\033[1;32m綠燈亮了\033[0m‘) e.set() elif e.is_set(): print(‘\033[1;31m紅燈亮了\033[0m‘) e.clear() time.sleep(2) if __name__ == ‘__main__‘: e = Event() Process(target=traffic_light,args=(e,)).start() for i in range(10): time.sleep(random.randrange(1,5,3)) Process(target=car,args=(i,e)).start()紅綠燈實例
進程之間的通信——隊列和管道: (multiprocess.Queue,multiprocess.Pipe)
進程間的通信:IPC(Inter-Process Communication)
隊列:創建共享的進程隊列,Queue時多進程安全的隊列,可以使用Queue實現多進程之間的數據傳遞。
Queue([maxsize]) 創建共享的進程隊列。 參數:maxsize是隊列中允許的最大項數,如果省略此參數,則無大小限制。 底層隊列使用管道和鎖定實現。
Queue的實例q具有以下方法:
q.get( [ block [ ,timeout ] ] )
返回q中的一個項目。如果q為空,此方法將阻塞,直到隊列中有項目可用為止。block用於控制阻塞行為,默認為True. 如果設置為False,將引發Queue.Empty異常(定義在Queue模塊中)。timeout是可選超時時間,用在阻塞模式中。如果在制定的時間間隔內沒有項目變為可用,將引發Queue.Empty異常。
q.get_nowait( )
同q.get(False)方法。
q.put(item [, block [,timeout ] ] )
將item放入隊列。如果隊列已滿,此方法將阻塞至有空間可用為止。block控制阻塞行為,默認為True。如果設置為False,將引發Queue.Empty異常(定義在Queue庫模塊中)。timeout指定在阻塞模式中等待可用空間的時間長短。超時後將引發Queue.Full異常。
q.qsize()
返回隊列中目前項目的正確數量。此函數的結果並不可靠,因為在返回結果和在稍後程序中使用結果之間,隊列中可能添加或刪除了項目。在某些系統上,此方法可能引發NotImplementedError異常。
q.empty()
如果調用此方法時 q為空,返回True。如果其他進程或線程正在往隊列中添加項目,結果是不可靠的。也就是說,在返回和使用結果之間,隊列中可能已經加入新的項目。
q.full()
如果q已滿,返回為True. 由於線程的存在,結果也可能是不可靠的(參考q.empty()方法)。。
Queue的方法介紹
q.close()
關閉隊列,防止隊列中加入更多數據。調用此方法時,後臺線程將繼續寫入那些已入隊列但尚未寫入的數據,但將在此方法完成時馬上關閉。如果q被垃圾收集,將自動調用此方法。關閉隊列不會在隊列使用者中生成任何類型的數據結束信號或異常。例如,如果某個使用者正被阻塞在get()操作上,關閉生產者中的隊列不會導致get()方法返回錯誤。
q.cancel_join_thread()
不會再進程退出時自動連接後臺線程。這可以防止join_thread()方法阻塞。
q.join_thread()
連接隊列的後臺線程。此方法用於在調用q.close()方法後,等待所有隊列項被消耗。默認情況下,此方法由不是q的原始創建者的所有進程調用。調用q.cancel_join_thread()方法可以禁止這種行為。
Queue的其他方法
from multiprocessing import Queue q = Queue(3) # 限定隊列中只能存在3項數據。 q.put(1) q.put(2) q.put(3) # q.put(4) # 如果隊列已經滿了,程序就會停在這裏,等待之前的數據被取走,再將數據放入q隊列中 # 如果數據沒有被取走,那麽程序將會一直停在這裏。 try: q.put_nowait(3) # 使用put_nowait()方法,如果隊列滿了不會阻塞而是會報錯。 except: print(‘隊列已經滿了‘) # 報錯則會打印 print(q.full()) # 判斷隊列是否滿了,返回bool值 print(q.get()) print(q.full()) print(q.get()) print(q.get()) # print(q.get()) 同放入一樣,如果隊列已經空了,繼續取值,就會出現阻塞。 try: print(q.get_nowait()) # 用get_nowait()方法取值,如果隊列為空不會阻塞則會報錯 except: print(‘隊列已經空了‘) print(q.empty()) # 判斷隊列是否為空,返回bool值Queue例子
import time from multiprocessing import Process,Queue def f(q): q.put([time.asctime(),‘from Eva‘,‘hello‘]) # 調用主函數中p進程傳遞過來的進程參數put向隊列中添加數據。 if __name__ == ‘__main__‘: q = Queue() # 創建一個Queue對象 p = Process(target=f,args=(q,)) # 創建一個進程 p.start() print(q.get()) # [‘Fri May 11 17:28:43 2018‘, ‘from Eva‘, ‘hello‘] p.join()子進程發送數據給父進程
import os import time import multiprocessing # 向queue中輸入數據的函數 def inputQ(queue): info = str(os.getpid()) + ‘(put):‘ + str(time.asctime()) queue.put(info) # 向queue中輸出數據的函數 def outputQ(queue): info = queue.get() print(‘%s%s\033[32m%s\033[0m‘%(str(os.getpid()),‘(get):‘,info)) if __name__ == ‘__main__‘: multiprocessing.freeze_support() record1 = [] record2 = [] queue = multiprocessing.Queue(3) # 輸入進程 for i in range(10): process = multiprocessing.Process(target=inputQ,args=(queue,)) process.start() record1.append(process) # 輸出進程 for i in range(10): process = multiprocessing.Process(target=outputQ, args=(queue,)) process.start() record2.append(process) for p in record1: p.join() for p in record2: p.join()批量生產數據放入隊列在批量獲得結果
網絡編程——進程同步