1. 程式人生 > >Python之進程同步控制(鎖信號量事件 )、進程間通信——隊列和管道

Python之進程同步控制(鎖信號量事件 )、進程間通信——隊列和管道

load 很快 容器 數據安全 全部 傳遞 幫我 之前 引入

進程同步(multiprocess.Lock、multiprocess.Semaphore、multiprocess.Event)

鎖 —— multiprocess.Lock

通過剛剛的學習,我們千方百計實現了程序的異步,讓多個任務可以同時在幾個進程中並發處理,他們之間的運行沒有順序,一旦開啟也不受我們控制。盡管並發編程讓我們能更加充分的利用IO資源,但是也給我們帶來了新的問題。

  當多個進程使用同一份數據資源的時候,就會引發數據安全或順序混亂問題。

#加鎖可以保證多個進程修改同一塊數據時,同一時間只能有一個任務可以進行修改,即串行的修改,沒錯,速度是慢了,但犧牲了速度卻保證了數據安全。
技術分享圖片
import json
import time
from multiprocessing import Process
from multiprocessing import Lock

def show(i):
    with open(ticket) as f:
        dic = json.load(f)
    print(余票: %s%dic[ticket])

def buy_ticket(i,lock):
    lock.acquire() #拿鑰匙進門
    with open(ticket) as f:
        dic = json.load(f)
        time.sleep(
0.1) if dic[ticket] > 0 : dic[ticket] -= 1 print(\033[32m%s買到票了\033[0m%i) else: print(\033[31m%s沒買到票\033[0m%i) time.sleep(0.1) with open(ticket,w) as f: json.dump(dic,f) lock.release() # 還鑰匙 if __name__ == __main__: for i in range(5): p
= Process(target=show,args=(i,)) p.start() lock = Lock() for i in range(5): p = Process(target=buy_ticket, args=(i,lock)) p.start()
火車票案例

#雖然可以用文件共享數據實現進程間通信,但問題是:
#1.效率低(共享數據基於文件,而文件是硬盤上的數據)
#2.需要自己加鎖處理

#因此我們最好找尋一種解決方案能夠兼顧:1、效率高(多個進程共享一塊內存的數據)2、幫我們處理好鎖問題。這就是mutiprocessing模塊為我們提供的基於消息的IPC通信機制:隊列和管道。
#隊列和管道都是將數據存放於內存中
#隊列又是基於(管道+鎖)實現的,可以讓我們從復雜的鎖問題中解脫出來,
#我們應該盡量避免使用共享數據,盡可能使用消息傳遞和隊列,避免處理復雜的同步和鎖問題,而且在進程數目增多時,往往可以獲得更好的可獲展性。

信號量 —— multiprocess.Semaphore(了解)

信號量介紹Semaphore

互斥鎖同時只允許一個線程更改數據,而信號量Semaphore是同時允許一定數量的線程更改數據 。
假設商場裏有4個迷你唱吧,所以同時可以進去4個人,如果來了第五個人就要在外面等待,等到有人出來才能再進去玩。
實現:
信號量同步基於內部計數器,每調用一次acquire(),計數器減1;每調用一次release(),計數器加1.當計數器為0時,acquire()調用被阻塞。這是迪科斯徹(Dijkstra)信號量概念P()和V()的Python實現。信號量同步機制適用於訪問像服務器這樣的有限資源。
信號量與進程池的概念很像,但是要區分開,信號量涉及到加鎖的概念
技術分享圖片
from multiprocessing import Process,Semaphore
import time,random

def go_ktv(sem,user):
    sem.acquire()
    print(%s 占到一間ktv小屋 %user)
    time.sleep(random.randint(0,3)) #模擬每個人在ktv中待的時間不同
    print(%s出來 %user)
    sem.release()

if __name__ == __main__:
    sem=Semaphore(4)
    p_l=[]
    for i in range(7):
        p=Process(target=go_ktv,args=(sem,user%s %i,))
        p.start()
        p_l.append(p)
    print(p_l)

    for i in p_l:
        i.join()
    print(============》)
ktv小屋

事件 —— multiprocess.Event(了解)

事件介紹

python線程的事件用於主線程控制其他線程的執行,事件主要提供了三個方法 set、wait、clear。

    事件處理的機制:全局定義了一個“Flag”,如果“Flag”值為 False,那麽當程序執行 event.wait 方法時就會阻塞,如果“Flag”值為True,那麽event.wait 方法時便不再阻塞。

clear:將“Flag”設置為False
set:將“Flag”設置為True
技術分享圖片
# 通過一個信號 來控制 多個進程 同時 執行或者阻塞
# 事件
# from multiprocessing import Event
# 一個信號可以使所有的進程都進入阻塞狀態
# 也可以控制所有的進程解除阻塞
# 一個事件被創建之後,默認是阻塞狀態
# e = Event()  # 創建了一個事件
# print(e.is_set())   # 查看一個事件的狀態,默認被設置成阻塞
# e.set()      # 將這個事件的狀態改為True
# print(e.is_set())
# e.wait()     # 是依據e.is_set()的值來決定是否阻塞的
# print(123456)
# e.clear()    # 將這個事件的狀態改為False
# print(e.is_set())
# e.wait()     # 等待 事件的信號被變成True
# print(‘*‘*10)


# set 和 clear
    #  分別用來修改一個事件的狀態 True或者False
# is_set 用來查看一個事件的狀態
# wait 是依據事件的狀態來決定自己是否在wait處阻塞
    #  False阻塞 True不阻塞


# 紅綠燈事件
import time#引入時間模塊
import random#引入隨機模塊
from multiprocessing import Event,Process#引入進程模塊和時間模塊
def cars(e,i):#定義一個函數
    if not e.is_set():#如果信號燈為真的時候
        print(car%i在等待%i)#打印內容
        e.wait()    # 阻塞 直到得到一個 事件狀態變成 True 的信號
    print(\033[0;32;40mcar%i通過\033[0m % i)#打印通過

def light(e):#定義一個燈
    while True:#循環為真
        if e.is_set():#如果事件狀態為真
            e.clear()#則清除信號燈
            print(\033[31m紅燈亮了\033[0m)#打印紅燈亮了
        else:#否則
            e.set()#設置狀態為真
            print(\033[32m綠燈亮了\033[0m)#打印綠燈亮了
        time.sleep(2)#睡2秒

if __name__ == __main__:#如果為真
    e = Event()#實例化一個事件
    traffic = Process(target=light,args=(e,))#定義一個燈的進程
    traffic.start()#開始進程
    for i in range(20):#循環20次
        car = Process(target=cars, args=(e,i))#創建20個汽車進程
        car.start()#啟動汽車進程
        time.sleep(random.random())#隨機睡,隨機出現0~1之間的小數
紅綠燈示例

進程間通信——隊列和管道(multiprocess.Queue、multiprocess.Pipe)

進程間通信

IPC(Inter-Process Communication)

隊列

創建共享的進程隊列,Queue是多進程安全的隊列,可以使用Queue實現多進程之間的數據傳遞。 
Queue([maxsize]) 
創建共享的進程隊列。
參數 :maxsize是隊列中允許的最大項數。如果省略此參數,則無大小限制。
底層隊列使用管道和鎖定實現。
技術分享圖片
Queue([maxsize]) 
創建共享的進程隊列。maxsize是隊列中允許的最大項數。如果省略此參數,則無大小限制。底層隊列使用管道和鎖定實現。另外,還需要運行支持線程以便隊列中的數據傳輸到底層管道中。 
Queue的實例q具有以下方法:

q.get( [ block [ ,timeout ] ] ) 
返回q中的一個項目。如果q為空,此方法將阻塞,直到隊列中有項目可用為止。block用於控制阻塞行為,默認為True. 如果設置為False,將引發Queue.Empty異常(定義在Queue模塊中)。timeout是可選超時時間,用在阻塞模式中。如果在制定的時間間隔內沒有項目變為可用,將引發Queue.Empty異常。

q.get_nowait( ) 
同q.get(False)方法。

q.put(item [, block [,timeout ] ] ) 
將item放入隊列。如果隊列已滿,此方法將阻塞至有空間可用為止。block控制阻塞行為,默認為True。如果設置為False,將引發Queue.Empty異常(定義在Queue庫模塊中)。timeout指定在阻塞模式中等待可用空間的時間長短。超時後將引發Queue.Full異常。

q.qsize() 
返回隊列中目前項目的正確數量。此函數的結果並不可靠,因為在返回結果和在稍後程序中使用結果之間,隊列中可能添加或刪除了項目。在某些系統上,此方法可能引發NotImplementedError異常。


q.empty() 
如果調用此方法時 q為空,返回True。如果其他進程或線程正在往隊列中添加項目,結果是不可靠的。也就是說,在返回和使用結果之間,隊列中可能已經加入新的項目。

q.full() 
如果q已滿,返回為True. 由於線程的存在,結果也可能是不可靠的(參考q.empty()方法)。。
方法介紹 技術分享圖片
q.close() 
關閉隊列,防止隊列中加入更多數據。調用此方法時,後臺線程將繼續寫入那些已入隊列但尚未寫入的數據,但將在此方法完成時馬上關閉。如果q被垃圾收集,將自動調用此方法。關閉隊列不會在隊列使用者中生成任何類型的數據結束信號或異常。例如,如果某個使用者正被阻塞在get()操作上,關閉生產者中的隊列不會導致get()方法返回錯誤。

q.cancel_join_thread() 
不會再進程退出時自動連接後臺線程。這可以防止join_thread()方法阻塞。

q.join_thread() 
連接隊列的後臺線程。此方法用於在調用q.close()方法後,等待所有隊列項被消耗。默認情況下,此方法由不是q的原始創建者的所有進程調用。調用q.cancel_join_thread()方法可以禁止這種行為。
其他方法(了解)

代碼實例

技術分享圖片
‘‘‘
multiprocessing模塊支持進程間通信的兩種主要形式:管道和隊列
都是基於消息傳遞實現的,但是隊列接口
‘‘‘

from multiprocessing import Queue
q=Queue(3)

#put ,get ,put_nowait,get_nowait,full,empty
q.put(3)
q.put(3)
q.put(3)
# q.put(3)   # 如果隊列已經滿了,程序就會停在這裏,等待數據被別人取走,再將數據放入隊列。
           # 如果隊列中的數據一直不被取走,程序就會永遠停在這裏。
try:
    q.put_nowait(3) # 可以使用put_nowait,如果隊列滿了不會阻塞,但是會因為隊列滿了而報錯。
except: # 因此我們可以用一個try語句來處理這個錯誤。這樣程序不會一直阻塞下去,但是會丟掉這個消息。
    print(隊列已經滿了)

# 因此,我們再放入數據之前,可以先看一下隊列的狀態,如果已經滿了,就不繼續put了。
print(q.full()) #滿了

print(q.get())
print(q.get())
print(q.get())
# print(q.get()) # 同put方法一樣,如果隊列已經空了,那麽繼續取就會出現阻塞。
try:
    q.get_nowait(3) # 可以使用get_nowait,如果隊列滿了不會阻塞,但是會因為沒取到值而報錯。
except: # 因此我們可以用一個try語句來處理這個錯誤。這樣程序不會一直阻塞下去。
    print(隊列已經空了)

print(q.empty()) #空了
單看隊列用法

上面這個例子還沒有加入進程通信,只是先來看看隊列為我們提供的方法,以及這些方法的使用和現象。

技術分享圖片
import time
from multiprocessing import Process, Queue

def f(q):
    q.put([time.asctime(), from Eva, hello])  #調用主函數中p進程傳遞過來的進程參數 put函數為向隊列中添加一條數據。

if __name__ == __main__:
    q = Queue() #創建一個Queue對象
    p = Process(target=f, args=(q,)) #創建一個進程
    p.start()
    print(q.get())
    p.join()
子進程發送數據給父進程

上面是一個queue的簡單應用,使用隊列q對象調用get函數來取得隊列中最先進入的數據。 接下來看一個稍微復雜一些的例子:

技術分享圖片
import os
import time
import multiprocessing

# 向queue中輸入數據的函數
def inputQ(queue):
    info = str(os.getpid()) + (put): + str(time.asctime())
    queue.put(info)

# 向queue中輸出數據的函數
def outputQ(queue):
    info = queue.get()
    print (%s%s\033[32m%s\033[0m%(str(os.getpid()), (get):,info))

# Main
if __name__ == __main__:
    multiprocessing.freeze_support()
    record1 = []   # store input processes
    record2 = []   # store output processes
    queue = multiprocessing.Queue(3)

    # 輸入進程
    for i in range(10):
        process = multiprocessing.Process(target=inputQ,args=(queue,))
        process.start()
        record1.append(process)

    # 輸出進程
    for i in range(10):
        process = multiprocessing.Process(target=outputQ,args=(queue,))
        process.start()
        record2.append(process)

    for p in record1:
        p.join()

    for p in record2:
        p.join()
批量生產數據放入隊列再批量獲取結果 x

生產者消費者模型

在並發編程中使用生產者和消費者模式能夠解決絕大多數並發問題。該模式通過平衡生產線程和消費線程的工作能力來提高程序的整體處理數據的速度。

為什麽要使用生產者和消費者模式

在線程世界裏,生產者就是生產數據的線程,消費者就是消費數據的線程。在多線程開發當中,如果生產者處理速度很快,而消費者處理速度很慢,那麽生產者就必須等待消費者處理完,才能繼續生產數據。同樣的道理,如果消費者的處理能力大於生產者,那麽消費者就必須等待生產者。為了解決這個問題於是引入了生產者和消費者模式。

什麽是生產者消費者模式

生產者消費者模式是通過一個容器來解決生產者和消費者的強耦合問題。生產者和消費者彼此之間不直接通訊,而通過阻塞隊列來進行通訊,所以生產者生產完數據之後不用等待消費者處理,直接扔給阻塞隊列,消費者不找生產者要數據,而是直接從阻塞隊列裏取,阻塞隊列就相當於一個緩沖區,平衡了生產者和消費者的處理能力。

基於隊列實現生產者消費者模型
技術分享圖片
# 隊列
# 生產者消費者模型

# 生產者 進程
# 消費者 進程
import time#引入時間模塊
import random#引入隨機數
from multiprocessing import Process,Queue#引入進程模塊和隊列模塊
def consumer(q,name):#定義一個消費者函數
    while True:#循環為真
        food = q.get()#拿出食物
        if food is None:#如果食物為空
            print(%s獲取到了一個空%name)#打印胡去到一個空
            break#打斷
        print(\033[31m%s消費了%s\033[0m % (name,food))#打印誰消費了什麽食物
        time.sleep(random.randint(1,3))#隨機睡1~3秒

def producer(name,food,q):#定義一個生產者函數
    for i in range(4):#循環4次
        time.sleep(random.randint(1,3))#隨機睡1~3秒
        f = %s生產了%s%s%(name,food,i)#誰生產了什麽食物
        print(f)#打印內容
        q.put(f)#把食物放到隊列裏

if __name__  == __main__:#如果名稱是當前名稱
    q = Queue(20)#實例化一個隊列20
    p1 = Process(target=producer,args=(Egon,包子,q))#創建一個進程
    p2 = Process(target=producer, args=(wusir,泔水, q))#創建一個進程
    c1 = Process(target=consumer, args=(q,alex))#創建一個進程
    c2 = Process(target=consumer, args=(q,jinboss))#創建一個進程
    p1.start()#啟動一個進程
    p2.start()#啟動一個進程
    c1.start()#啟動一個進程
    c2.start()#啟動一個進程
    p1.join()#感知p1進程結
    p2.join()#感知p2進程結束
    q.put(None)#往隊列中添加一個None
    q.put(None)#往隊列中添加一個None
基於隊列實現生產者消費者模型

此時的問題是主進程永遠不會結束,原因是:生產者p在生產完後就結束了,但是消費者c在取空了q之後,則一直處於死循環中且卡在q.get()這一步。

解決方式無非是讓生產者在生產完畢後,往隊列中再發一個結束信號,這樣消費者在接收到結束信號後就可以break出死循環。

註意:結束信號None,不一定要由生產者發,主進程裏同樣可以發,但主進程需要等生產者結束後才應該發送該信號

JoinableQueue([maxsize])
創建可連接的共享進程隊列。這就像是一個Queue對象,但隊列允許項目的使用者通知生產者項目已經被成功處理。通知進程是使用共享的信號和條件變量來實現的。

#方法介紹
#JoinableQueue的實例p除了與Queue對象相同的方法之外,還具有以下方法:

#q.task_done() 
#使用者使用此方法發出信號,表示q.get()返回的項目已經被處理。如果調用此方法的次數大於從隊列中刪除的項目數量,將引發ValueError異常。

#q.join() 
#生產者將使用此方法進行阻塞,直到隊列中所有項目均被處理。阻塞將持續到為隊列中的每個項目均調用q.task_done()方法為止。 
#下面的例子說明如何建立永遠運行的進程,使用和處理隊列上的項目。生產者將項目放入隊列,並等待它們被處理。
技術分享圖片
import time#引入一個時間模塊
import random#引入一個隨機數模塊
from multiprocessing import Process,JoinableQueue#引入進程模塊和隊列模塊
def consumer(q,name):#定義一個消費者函數
    while True:#循環為真
        food = q.get()#從隊列中拿出食物
        print(\033[31m%s消費了%s\033[0m % (name,food))#打印內容
        time.sleep(random.randint(1,3))#隨機睡1~3秒
        q.task_done()     # count - 1#

def producer(name,food,q):#生產者
    for i in range(4):#循環4次
        time.sleep(random.randint(1,3))#隨機睡1~3秒
        f = %s生產了%s%s%(name,food,i)#誰生產了食物
        print(f)#打印這個內容
        q.put(f)#放入到隊列裏
    q.join() # 阻塞  直到一個隊列中的所有數據 全部被處理完畢

if __name__  == __main__:#如果文件名為當前名稱
    q = JoinableQueue(20)#實例化一個隊列對象
    p1 = Process(target=producer,args=(Egon,包子,q))#創建一個生產者進程
    p2 = Process(target=producer, args=(wusir,泔水, q))#創建一個生產著進程
    c1 = Process(target=consumer, args=(q,alex))#創建一個消費者
    c2 = Process(target=consumer, args=(q,jinboss))#創建一個消費者進程
    p1.start()#開啟一個生產者進程
    p2.start()#開啟一個生產者進程
    c1.daemon = True   # 設置為守護進程 主進程中的代碼執行完畢之後,子進程自動結束
    c2.daemon = True   #設置守護進程
    c1.start()  #開啟一個消費者進程
    c2.start()  #開啟一個消費者進程
    p1.join()    #感知一個生產者進程結束
    p2.join()      # 感知一個進程的結束

#  在消費者這一端:
    # 每次獲取一個數據
    # 處理一個數據
    # 發送一個記號 : 標誌一個數據被處理成功

# 在生產者這一端:
    # 每一次生產一個數據,
    # 且每一次生產的數據都放在隊列中
    # 在隊列中刻上一個記號
    # 當生產者全部生產完畢之後,
    # join信號 : 已經停止生產數據了
                # 且要等待之前被刻上的記號都被消費完
                # 當數據都被處理完時,join阻塞結束

# consumer 中把所有的任務消耗完
# producer 端 的 join感知到,停止阻塞
# 所有的producer進程結束
# 主進程中的p.join結束
# 主進程中代碼結束
# 守護進程(消費者的進程)結束
JoinableQueue隊列實現消費之生產者模型

Python之進程同步控制(鎖\信號量\事件 )、進程間通信——隊列和管道