31、互斥鎖與進程間通信
我們之前做了多進程並發,那麽你們有沒有發現問題。如果說多個進程共享同一個數據,比如搶火車票大家同時在客戶端查看同時購買會出現什麽問題呢?今天我們將講述進程鎖還有進程間通信,進程之間彼此隔離,他們需要一個第三方聯系起來。
一、互斥鎖
進程之間數據隔離,但是共享一套文件系統,因而可以通過文件來實現進程直接的通信,但問題是必須自己加鎖處理
註意:加鎖的目的是為了保證多個進程修改同一塊數據時,同一時間只能有一個修改,即串行的修改,沒錯,速度是慢了,犧牲了速度而保證了數據安全。
1、上廁所
先舉個通俗易懂的例子,家裏的廁所,你要上廁所進去後會先鎖門,廁所門就相當於一個互斥鎖,當你在裏面的時候別人過來上廁所就只能在門口等。
from multiprocessing import Process,Lock import os import time def work(mutex): mutex.acquire() #上鎖 print(‘task[%s] 上廁所‘ %os.getpid()) time.sleep(3) print(‘task[%s] 上完廁所‘ %os.getpid()) mutex.release() #開鎖 if __name__ == ‘__main__‘: mutex=Lock() #實例化(互斥鎖) p1=Process(target=work,args=(mutex,)) p2=Process(target=work,args=(mutex,)) p3=Process(target=work,args=(mutex,)) p1.start() p2.start() p3.start() print(‘start...‘)
2、模擬搶票
#文件db的內容為:{"count":1} #票數可以自己定義 #註意一定要用雙引號,不然json無法識別 from multiprocessing import Process,Lock import json import time import random import os defsearch() : #查看票數 dic=json.load(open(‘db.txt‘,)) print(‘剩余票數%s‘ %dic[‘count‘]) def get_ticket() : #購票 dic = json.load(open(‘db.txt‘,)) if dic[‘count‘] > 0 : dic[‘count‘] -= 1 json.dump(dic,open(‘db.txt‘,‘w‘)) print(‘%s 購票成功‘ %os.getpid()) def task(mutex) : #購票流程 search() time.sleep(random.randint(1, 3)) #模擬購票一系列繁瑣的過程所花費的時間 mutex.acquire() get_ticket() mutex.release() if __name__ == ‘__main__‘ : mutex = Lock() for i in range(50): p = Process(target=task,args=(mutex,)) p.start()
二、Process對象其他屬性使用案例補充
1、deamon守護進程
p.daemon:默認值為False,如果設為True,代表p為後臺運行的守護進程,當p的父進程終止時,p也隨之終止,並且設定為True後,p不能創建自己的新進程,必須在p.start()之前設置
ps:
from multiprocessing import Process import os import time def work(): print(‘%s is working‘ %os.getpid()) time.sleep(10) print(‘%s is ending‘ % os.getpid()) if __name__ == ‘__main__‘: p1=Process(target=work) p2=Process(target=work) p3=Process(target=work) p1.daemon=True p2.daemon=True p3.daemon=True p1.start() p2.start() p3.start() time.sleep(2) print(‘start。。。‘)
2、join等待子進程
p.join([timeout]):主線程等待p終止(強調:是主線程處於等的狀態,而p是處於運行的狀態)。timeout是可選的超時時間,需要強調的是,p.join只能join住start開啟的進程,而不能join住run開啟的進程
ps:
from multiprocessing import Process import os import time def work(): print(‘%s is working‘ %os.getpid()) time.sleep(3) if __name__ == ‘__main__‘: p1=Process(target=work) p2=Process(target=work) p3=Process(target=work) p1.daemon=True p2.daemon=True p3.daemon=True p1.start() #初始化1 p2.start() #初始化2 p3.start() #初始化3 p3.join() p1.join() p2.join() print(‘基於初始化的結果來繼續運行‘)
3、terminate,is_alive,name,pid
p.terminate():強制終止進程p,不會進行任何清理操作,如果p創建了子進程,該子進程就成了僵屍進程,使用該方法需要特別小心這種情況。如果p還保存了一個鎖那麽也將不會被釋放,進而導致死鎖
p.is_alive():如果p仍然運行,返回True
p.name:進程的名稱
p.pid:進程的pid
ps:
from multiprocessing import Process import os import time def work(): print(‘%s is working‘ %os.getpid()) time.sleep(3) if __name__ == ‘__main__‘: p1=Process(target=work) p2=Process(target=work) p3=Process(target=work) p1.start() #初始化1 p2.start() #初始化2 p3.start() #初始化3 p1.terminate() #不建議使用 print(p1.is_alive()) #雖然已經強制終止進程了但是操作系統終止進程也需要時間所以此時還是True print(p1.name) #如果沒有起名默認Process-1後面的數字按子進程順序排 print(p2.name) print(p1.pid) # p1.pid == os.getpid() print(‘基於初始化的結果來繼續運行‘)
三、進程間通信
我們學習了通過使用共享的文件的方式,實現進程直接的共享,即共享數據的方式,這種方式必須考慮周全同步、鎖等問題。而且文件是操作系統提供的抽象,可以作為進程直接通信的介質,與mutiprocess模塊無關
但其實mutiprocessing模塊為我們提供了基於消息的IPC通信機制:隊列和管道。IPC機制中的隊列又是基於(管道+鎖)實現的,可以讓我們從復雜的鎖問題中解脫出來,我們應該盡量避免使用共享數據,盡可能使用消息傳遞和隊列,避免處理復雜的同步和鎖問題,而且在進程數目增多時,往往可以獲得更好的可獲展性。
1、進程間通信(IPC)方式一:隊列(推薦使用)
進程彼此之間互相隔離,要實現進程間通信(IPC),multiprocessing模塊支持兩種形式:隊列和管道,這兩種方式都是使用消息傳遞的
可以往隊列裏放任意類型的數據
隊列:先進先出
1)導入
from multiprocessing import Queue
2)實例化
q=Queue(3) #3是隊列中規定允許的最大項數,省略即不限大小
3)主要方法
q.put方法用以插入數據到隊列中,put方法還有兩個可選參數:blocked和timeout。如果blocked為True(默認值),並且timeout為正值,該方法會阻塞timeout指定的時間,直到該隊列有剩余的空間。如果超時,會拋出Queue.Full異常。如果blocked為False,但該Queue已滿,會立即拋出Queue.Full異常。
q.get方法可以從隊列讀取並且刪除一個元素。同樣,get方法有兩個可選參數:blocked和timeout。如果blocked為True(默認值),並且timeout為正值,那麽在等待時間內沒有取到任何元素,會拋出Queue.Empty異常。如果blocked為False,有兩種情況存在,如果Queue有一個值可用,則立即返回該值,否則,如果隊列為空,則立即拋出Queue.Empty異常.
q.get_nowait():同q.get(False)
q.put_nowait():同q.put(False)
q.empty():調用此方法時q為空則返回True,該結果不可靠,比如在返回True的過程中,如果隊列中又加入了項目。
q.full():調用此方法時q已滿則返回True,該結果不可靠,比如在返回True的過程中,如果隊列中的項目被取走。
q.qsize():返回隊列中目前項目的正確數量,結果也不可靠,理由同q.empty()和q.full()一樣
4)其他方法(了解)
q.cancel_join_thread():不會在進程退出時自動連接後臺線程。可以防止join_thread()方法阻塞
q.close():關閉隊列,防止隊列中加入更多數據。調用此方法,後臺線程將繼續寫入那些已經入隊列但尚未寫入的數據,但將在此方法完成時馬上關閉。如果q被垃圾收集,將調用此方法。關閉隊列不會在隊列使用者中產生任何類型的數據結束信號或異常。例如,如果某個使用者正在被阻塞在get()操作上,關閉生產者中的隊列不會導致get()方法返回錯誤。
q.join_thread():連接隊列的後臺線程。此方法用於在調用q.close()方法之後,等待所有隊列項被消耗。默認情況下,此方法由不是q的原始創建者的所有進程調用。調用q.cancel_join_thread方法可以禁止這種行為
5)應用
from multiprocessing import Process,Queue #1:可以往隊列裏放任意類型的數據 2 隊列:先進先出 q=Queue(3) q.put(‘first‘) q.put(‘second‘) q.put(‘third‘) # q.put(‘fourht‘) print(q.full()) #滿了 print(q.get()) print(q.get()) print(q.get()) # print(q.get()) print(q.empty()) #空了 # q=Queue(3) # q.put(‘first‘,block=False) # q.put(‘second‘,block=False) # q.put(‘third‘,block=False) # q.put(‘fourth‘,block=True,timeout=3) # q.get(block=False) # q.get(block=True,timeout=3) # q.get_nowait() #q.get(block=False)
6)生產消費者模型
在並發編程中使用生產者和消費者模式能夠解決絕大多數並發問題。該模式通過平衡生產線程和消費線程的工作能力來提高程序的整體處理數據的速度。
在線程世界裏,生產者就是生產數據的線程,消費者就是消費數據的線程。在多線程開發當中,如果生產者處理速度很快,而消費者處理速度很慢,那麽生產者就必須等待消費者處理完,才能繼續生產數據。同樣的道理,如果消費者的處理能力大於生產者,那麽消費者就必須等待生產者。為了解決這個問題於是引入了生產者和消費者模式。
生產者消費者模式是通過一個容器來解決生產者和消費者的強耦合問題。
基於隊列實現生產者消費者模型
from multiprocessing import Process,Queue import time import random import os def consumer(q): while True: res=q.get() if res is None: break time.sleep(random.randint(1,3)) print(‘%s 吃了 %s‘ % (os.getpid(), res)) def producer(q): for i in range(5): time.sleep(2) res=‘包子%s‘ %i q.put(res) print(‘%s 制造了 %s‘ %(os.getpid(),res)) q.put(None) if __name__ == ‘__main__‘: q=Queue() #生產者們:廚師們 p1=Process(target=producer,args=(q,)) #消費者們:吃貨們 p2=Process(target=consumer,args=(q,)) p1.start() p2.start() p1.join() p2.join() print(‘主‘)生產者消費者模型
7)創建隊列的另外一個類
JoinableQueue([maxsize]):這就像是一個Queue對象,但隊列允許項目的使用者通知生成者項目已經被成功處理。通知進程是使用共享的信號和條件變量來實現的。
maxsize是隊列中允許最大項數,省略則無大小限制。
JoinableQueue的實例p除了與Queue對象相同的方法之外還具有:
q.task_done():使用者使用此方法發出信號,表示q.get()的返回項目已經被處理。如果調用此方法的次數大於從隊列中刪除項目的數量,將引發ValueError異常
q.join():生產者調用此方法進行阻塞,直到隊列中所有的項目均被處理。阻塞將持續到隊列中的每個項目均調用q.task_done()方法為止
from multiprocessing import Process,JoinableQueue import time import random import os def consumer(q): while True: res=q.get() time.sleep(random.randint(1,3)) print(‘%s 吃了 %s‘ % (os.getpid(), res)) q.task_done() def product_baozi(q): for i in range(5): time.sleep(2) res=‘包子%s‘ %i q.put(res) print(‘%s 制造了 %s‘ %(os.getpid(),res)) q.join() if __name__ == ‘__main__‘: q=JoinableQueue() #生產者們:廚師們 p1=Process(target=product_baozi,args=(q,)) #消費者們:吃貨們 p4=Process(target=consumer,args=(q,)) p4.daemon=True p1.start() p4.start() p1.join() print(‘主‘) #p2結束了生產者消費者模型2
from multiprocessing import Process,JoinableQueue import time import random import os def consumer(q): while True: res=q.get() time.sleep(random.randint(1,3)) print(‘%s 吃了 %s‘ % (os.getpid(), res)) q.task_done() def product_baozi(q): for i in range(3): time.sleep(2) res=‘包子%s‘ %i q.put(res) print(‘%s 制造了 %s‘ %(os.getpid(),res)) q.join() def product_gutou(q): for i in range(3): time.sleep(2) res=‘骨頭%s‘ %i q.put(res) print(‘%s 制造了 %s‘ %(os.getpid(),res)) q.join() def product_ganshui(q): for i in range(3): time.sleep(2) res=‘泔水%s‘ %i q.put(res) print(‘%s 制造了 %s‘ %(os.getpid(),res)) q.join() if __name__ == ‘__main__‘: q=JoinableQueue() #生產者們:廚師們 p1=Process(target=product_baozi,args=(q,)) p2=Process(target=product_gutou,args=(q,)) p3=Process(target=product_ganshui,args=(q,)) #消費者們:吃貨們 p4=Process(target=consumer,args=(q,)) p5=Process(target=consumer,args=(q,)) p4.daemon=True p5.daemon=True #設置為守護進程,在主線程停止時p也停止,但是不用擔心,producer內調用q.join保證了consumer已經處理完隊列中的所有元素 p_l=[p1,p2,p3,p4,p5] for p in p_l: p.start() p1.join() p2.join() p3.join() print(‘主‘)生產者消費者模型3
2、進程間通信(IPC)方式二:管道(不推薦使用,了解即可)
3、 進程間通信方式三:共享數據(不推薦使用,了解即可)
(對這兩種方式了解不深,有興趣的可以自己搜索相關文章)
31、互斥鎖與進程間通信