1. 程式人生 > 實用技巧 >python多程序併發程式設計之互斥鎖與程序間的通訊

python多程序併發程式設計之互斥鎖與程序間的通訊

一、互斥鎖

多個程序之間的記憶體空間是隔離的,但是硬碟,資料庫,列印終端都是共享的 。因此當多個程序同時修改硬碟中的同一個檔案,或者修改資料庫中的同一條記錄時,就存在資源競爭的問題,容易出錯。

加鎖的目的就是為了保證多個程序修改同一塊資料時,同一時間只能有一個修改,即序列的修改,沒錯,速度是慢了,犧牲了速度而保證了資料安全。

程序互斥鎖程式碼示例

import os
import json
import time
import random
from multiprocessing import Process,Lock
def buy_ticket(mutex):
    time.sleep(random.randint(1,3))  #模擬延遲
    ticket_info = json.load(open('ticket_info','r',encoding='utf-8'))
    mutex.acquire()
    if ticket_info["ticket_count"] > 0:
        ticket_info["ticket_count"] -= 1
        json.dump(ticket_info,open('ticket_info','w',encoding='utf-8'))
        print("PID %s 買到票了"% os.getpid())
    else:
        print("pid %s 沒買到票,票賣完了!" % os.getpid())
    mutex.release()


if __name__ == '__main__':
    mutex = Lock()
    for i in range(10):
        p = Process(target=buy_ticket,args=(mutex,))
        p.start()

執行結果:

PID 10632 買到票了
pid 3808 沒買到票,票賣完了!
pid 15488 沒買到票,票賣完了!
pid 556 沒買到票,票賣完了!
pid 10448 沒買到票,票賣完了!
pid 11672 沒買到票,票賣完了!
pid 8400 沒買到票,票賣完了!
pid 5340 沒買到票,票賣完了!
pid 11164 沒買到票,票賣完了!
pid 9140 沒買到票,票賣完了!

通過上面這個例子,我們知道互斥鎖可以一個任務中的某個子任務由並行改為序列,而不是將整個任務改成序列。

總結:

多個程序的記憶體是相互隔離的,但硬碟,資料庫等確實共享的。通過互斥鎖就可以實現多個程序之間的通訊,並且不會造成資料混亂,保證的資料的安全。但互斥鎖將併發改為了序列,降低了效率 ,而且需要我們自己加鎖,釋放鎖,容易出現問題。  

二、IPC機制  

程序間通訊(IPC,InterProcess Communication)

是指在不同程序之間傳播或交換資訊。是一組程式設計介面,讓程式設計師能夠協調不同的程序,使之能在一個作業系統裡同時執行,並相互傳遞、交換資訊。
這使得一個程式能夠在同一時間裡處理許多使用者的要求。因為即使只有一個使用者發出要求,也可能導致一個作業系統中多個程序的執行,程序之間必須互相通話。
IPC介面就提供了這種可能性。每個IPC方法均有它自己的優點和侷限性,一般,對於單個程式而言使用所有的IPC方法是不常見的。 IPC的方式通常有管道(PIPE)(包括無名管道和命名管道)、訊息佇列、訊號量、共享記憶體、套接字(Socket)、Streams、旗語等。其中 Socket和Streams支援不同主機上的兩個程序IPC。

程序間通訊(IPC)常見分類的特點

1.管道

管道,通常指無名管道,是 UNIX 系統IPC最古老的形式。  

特點:

它是半雙工的(即資料只能在一個方向上流動),具有固定的讀端和寫端。
它只能用於具有親緣關係的程序之間的通訊(也是父子程序或者兄弟程序之間)。
它可以看成是一種特殊的檔案,對於它的讀寫也可以使用普通的read、write 等函式。但是它不是普通的檔案,並不屬於其他任何檔案系統,並且只存在於記憶體中。

2.FIFO (命名管道)

FIFO,也稱為命名管道,它是一種檔案型別。

特點:

FIFO可以在無關的程序之間交換資料,與無名管道不同。
FIFO有路徑名與之相關聯,它以一種特殊裝置檔案形式存在於檔案系統中。

3.訊息佇列

訊息佇列,是訊息的連結表,存放在核心中。一個訊息佇列由一個識別符號(即佇列ID)來標識。

特點:

訊息佇列是面向記錄的,其中的訊息具有特定的格式以及特定的優先順序。
訊息佇列獨立於傳送與接收程序。程序終止時,訊息佇列及其內容並不會被刪除。
訊息佇列可以實現訊息的隨機查詢,訊息不一定要以先進先出的次序讀取,也可以按訊息的型別讀取。

4.訊號量

訊號量(semaphore)與已經介紹過的 IPC 結構不同,它是一個計數器。訊號量用於實現程序間的互斥與同步,而不是用於儲存程序間通訊資料。  

特點:

訊號量用於程序間同步,若要在程序間傳遞資料需要結合共享記憶體。
訊號量基於作業系統的 PV 操作,程式對訊號量的操作都是原子操作。
每次對訊號量的 PV 操作不僅限於對訊號量值加 1 或減 1,而且可以加減任意正整數。
支援訊號量組。

5.共享記憶體

共享記憶體(Shared Memory),指兩個或多個程序共享一個給定的儲存區。  

特點:

共享記憶體是最快的一種 IPC,因為程序是直接對記憶體進行存取。
因為多個程序可以同時操作,所以需要進行同步。
訊號量+共享記憶體通常結合在一起使用,訊號量用來同步對共享記憶體的訪問。  

參考文件:https://blog.csdn.net/python_jw/article/details/79506702  

三、使用佇列實現程序間通訊  

1.Queue通訊機制

首先講解 一下Queue通訊方式。Queue是多程序安全的佇列,可以使用Queue實現多進 程之間的資料傳遞,

通訊原理:在記憶體中建立佇列資料結構模型。多個程序都可以通過佇列存入內容,取出內容的順序和存入的順序保持一致

有兩個方法: Put和Get可以進行Queue操作:

put:

Put方法用以插入資料到佇列中,它還有兩個可選引數:blocked和timeout。如果blocked為True (預設值),
並且timeout為正值,該方法會阻塞timeout指定的時間, 直到該佇列有剩餘的空間。如果超時,會丟擲Queue.Full異常。
如果blocked為False,但該 Queue已滿,會立即丟擲Queue.Full異常。(即如果blocked為True時,如果佇列滿,會有一定的等待時間)

get:  

Get方法可以從 佇列讀取並且刪除一個元素 。同樣,Get方法有兩個可選引數:blocked 和timeout。如果blocked為True (預設值),
並且timeout為正值,那麼在等待時間內沒有取到任何元素,會丟擲Queue.Empty異常。如果blocked為False,
分兩種情況:如果Queue 有 一個值可用, 則立即返回該值;否則,如果佇列為空, 則立即丟擲 Queue.Empty異常。(形式上與Put相同,不同的僅僅是寫入和獲取的區別)

2.程式碼示例:

from multiprocessing import Process,Lock,Queue


def put(queue):
    for i in range(5):
        queue.put(i)

def get(queue):
    for i in range(5):
        print("獲取了 %s" % queue.get())


if __name__ == '__main__':
    queue = Queue(5)
    p1 = Process(target=put,args=(queue,))
    p2 = Process(target=get,args=(queue,))

    p1.start()
    p2.start()

四、生產者消費者模型介紹

什麼是生產者和消費者模式

生產者消費者模式是通過一個容器來解決生產者和消費者的強耦合問題。生產者和消費者彼此之間不直接通訊,而通過阻塞佇列來進行通訊,所以生產者生產完資料之後不用等待消費者處理,直接扔給阻塞佇列,

消費者不找生產者要資料,而是直接從阻塞佇列裡取,阻塞佇列就相當於一個緩衝區,平衡了生產者和消費者的處理能力。

這個阻塞佇列就是用來給生產者和消費者解耦的

為什麼要使用生產者消費者模型

生產者指的是生產資料的任務,消費者指的是處理資料的任務,在併發程式設計中,如果生產者處理速度很快,而消費者處理速度很慢,那麼生產者就必須等待消費者處理完,才能繼續生產資料。

同樣的道理,如果消費者的處理能力大於生產者,那麼消費者就必須等待生產者。為了解決這個問題於是引入了生產者和消費者模式。

程式碼示例:

from multiprocessing import Process, Queue
import time


def producer(q, name, food):
    count = 0
    while True:
        count +=1
        res = '%s,%s' % (food, count)
        time.sleep(1)  # 生產food得有個過程,就先讓睡一會
        print('生產者[%s] 生產了 [%s]' % (name, res))
        q.put(res)


def consumer(q, name):
    while True:
        res = q.get()
        if res is None: break
        time.sleep(1)
        print('消費者[%s]吃了[%s]' % (name, res))


if __name__ == '__main__':
    # 容器
    q = Queue()
    p = Process(target=producer, args=(q, 'egon', '包子'))
    c = Process(target=consumer, args=(q, 'alex',))

    p.start()
    c.start()

執行結果:

生產者[egon] 生產了 [包子,0]
生產者[egon] 生產了 [包子,1]
消費者[alex]吃了[包子,0]
生產者[egon] 生產了 [包子,2]
消費者[alex]吃了[包子,1]
消費者[alex]吃了[包子,2]