1. 程式人生 > >互斥鎖,IPC隊列

互斥鎖,IPC隊列

處理 pro multi rgs 則無 傳輸介質 模擬 ret 後臺

進程同步(鎖)

進程之間數據不共享,但是共享同一套文件系統,所以訪問同一個文件,或同一個打印終端,是沒有問題的,
part1:共享同一打印終端,發現會有多行內容打印到一行的現象(多個進程共享並搶占同一個打印終端,亂了)


技術分享
#多進程共享一個打印終端(用python2測試看兩個進程同時往一個終端打印,出現打印到一行的錯誤)
from multiprocessing import Process
import time
class Logger(Process):
    def __init__(self):
        super(Logger,self).__init__()
    def run(self):
        print(self.name)


for i in range(1000000): l=Logger() l.start()
View Code

part2:共享同一個文件,有的同學會想到,既然可以用文件共享數據,那麽進程間通信用文件作為數據傳輸介質就可以了啊,可以,但是有問題:1.效率 2.需要自己加鎖處理

技術分享
#多進程共享一套文件系統
from multiprocessing import Process
import time,random

def work(f,msg):
    f.write(msg)
    f.flush()


f=open(a.txt,w) #在windows上無法把f當做參數傳入,可以傳入一個文件名,然後在work內用a+的方式打開文件,進行寫入測試
for i in range(5): p=Process(target=work,args=(f,str(i))) p.start()
View Code

需知:加鎖的目的是為了保證多個進程修改同一塊數據時,同一時間只能有一個修改,即串行的修改,沒錯,速度是慢了,犧牲了速度而保證了數據安全。

進程之間數據隔離,但是共享一套文件系統,因而可以通過文件來實現進程直接的通信,但問題是必須自己加鎖處理

所以,就讓我們幫文件當做數據庫,模擬搶票(Lock互斥鎖)

技術分享
from multiprocessing import Process,Lock
import time,random,os
import json

def tar(mul):
    dic1 
= json.load(open(db.txt,)) print("還剩%s張票"%dic1[count],os.getpid()) if dic1[count]>0: time.sleep(random.randint(1,10)) mul.acquire() dic = json.load(open(db.txt, )) if dic[count] > 0: dic[count] -= 1 json.dump(dic,open(db.txt,w)) print(購票成功,os.getpid()) mul.release() if __name__==__main__: mul= Lock() for i in range(20): p = Process(target=tar,args=(mul,)) p.start()
View Code

進程間通信(IPC)方式一:隊列(推薦使用)

進程彼此之間互相隔離,要實現進程間通信(IPC),multiprocessing模塊支持兩種形式:隊列和管道,這兩種方式都是使用消息傳遞的

創建隊列的類(底層就是以管道和鎖定的方式實現)

1 Queue([maxsize]):創建共享的進程隊列,Queue是多進程安全的隊列,可以使用Queue實現多進程之間的數據傳遞。 

參數介紹:

1 maxsize是隊列中允許最大項數,省略則無大小限制。    

  方法介紹:

主要方法:
1 q.put方法用以插入數據到隊列中,put方法還有兩個可選參數:blocked和timeout。如果blocked為True(默認值),並且timeout為正值,該方法會阻塞timeout指定的時間,直到該隊列有剩余的空間。
如果超時,會拋出Queue.Full異常。如果blocked為False,但該Queue已滿,會立即拋出Queue.Full異常。 2 q.get方法可以從隊列讀取並且刪除一個元素。同樣,get方法有兩個可選參數:blocked和timeout。如果blocked為True(默認值),並且timeout為正值,那麽在等待時間內沒有取到任何元素,會拋出Queue.Empty異常。
如果blocked為False,有兩種情況存在,如果Queue有一個值可用,則立即返回該值,否則,如果隊列為空,則立即拋出Queue.Empty異常. 3 4 q.get_nowait():同q.get(False) 5 q.put_nowait():同q.put(False) 6 7 q.empty():調用此方法時q為空則返回True,該結果不可靠,比如在返回True的過程中,如果隊列中又加入了項目。 8 q.full():調用此方法時q已滿則返回True,該結果不可靠,比如在返回True的過程中,如果隊列中的項目被取走。 9 q.qsize():返回隊列中目前項目的正確數量,結果也不可靠,理由同q.empty()和q.full()一樣
了解:

1 q.cancel_join_thread():不會在進程退出時自動連接後臺線程。可以防止join_thread()方法阻塞
2 q.close():關閉隊列,防止隊列中加入更多數據。調用此方法,後臺線程將繼續寫入那些已經入隊列但尚未寫入的數據,但將在此方法完成時馬上關閉。如果q被垃圾收集,將調用此方法。
關閉隊列不會在隊列使用者中產生任何類型的數據結束信號或異常。例如,如果某個使用者正在被阻塞在get()操作上,關閉生產者中的隊列不會導致get()方法返回錯誤。 3 q.join_thread():連接隊列的後臺線程。此方法用於在調用q.close()方法之後,等待所有隊列項被消耗。默認情況下,此方法由不是q的原始創建者的所有進程調用。調用q.cancel_join_thread方法可以禁止這種行為

生產者消費者模型

在並發編程中使用生產者和消費者模式能夠解決絕大多數並發問題。該模式通過平衡生產線程和消費線程的工作能力來提高程序的整體處理數據的速度。

為什麽要使用生產者和消費者模式

在線程世界裏,生產者就是生產數據的線程,消費者就是消費數據的線程。在多線程開發當中,如果生產者處理速度很快,而消費者處理速度很慢,那麽生產者就必須等待消費者處理完,才能繼續生產數據。同樣的道理,如果消費者的處理能力大於生產者,那麽消費者就必須等待生產者。為了解決這個問題於是引入了生產者和消費者模式。

什麽是生產者消費者模式

生產者消費者模式是通過一個容器來解決生產者和消費者的強耦合問題。生產者和消費者彼此之間不直接通訊,而通過阻塞隊列來進行通訊,所以生產者生產完數據之後不用等待消費者處理,直接扔給阻塞隊列,消費者不找生產者要數據,而是直接從阻塞隊列裏取,阻塞隊列就相當於一個緩沖區,平衡了生產者和消費者的處理能力。

基於隊列實現生產者消費者模型

創建隊列的另外一個類:

JoinableQueue([maxsize]):這就像是一個Queue對象,但隊列允許項目的使用者通知生成者項目已經被成功處理。通知進程是使用共享的信號和條件變量來實現的。

參數介紹:

maxsize是隊列中允許最大項數,省略則無大小限制。

  方法介紹:

JoinableQueue的實例p除了與Queue對象相同的方法之外還具有: q.task_done():使用者使用此方法發出信號,表示q.get()的返回項目已經被處理。如果調用此方法的次數大於從隊列中刪除項目的數量,將引發ValueError異常 q.join():生產者調用此方法進行阻塞,直到隊列中所有的項目均被處理。阻塞將持續到隊列中的每個項目均調用q.task_done()方法為止
import os,time,random
from multiprocessing import  Process,JoinableQueue

def producer_dup(q):
    for i in range(10):
        time.sleep(2)
        print(os.getpid(),‘制造了包子\033[45m%s\033[0m‘%i)
        q.put(‘包子%s‘%i)
    q.join()
def producer_gu(q):
    for i in range(3):
        time.sleep(2)
        print(os.getpid(),‘制造了骨頭\033[45m%s\033[0m‘%i)
        q.put(‘骨頭%s‘%i)
    q.join()
def producer_sh(q):
    for i in range(3):
        time.sleep(2)
        print(os.getpid(),‘制造了泔水\033[45m%s\033[0m‘%i)
        q.put(‘泔水%s‘%i)
    q.join()
def constumer(q):
    while 1:
        time.sleep(random.randint(1,3))
        ret = q.get()
        print(os.getpid(),‘吃了\033[44m%s\033[0m‘%ret)
        q.task_done()


if __name__ == ‘__main__‘:
    q = JoinableQueue()
    p1 = Process(target=producer_dup,args=(q,))
    p2 = Process(target=producer_gu,args=(q,))
    p3 = Process(target=producer_sh,args=(q,))
    p4 = Process(target=constumer,args=(q,))
    p5 = Process(target=constumer,args=(q,))
    p4.daemon=True
    p5.daemon=True
    p1.start()
    p2.start()
    p3.start()
    p4.start()
    p5.start()
    p1.join()
    p2.join()
    p3.join()
    print(‘end‘)

  

 

互斥鎖,IPC隊列