1. 程式人生 > >深入Python程序間通訊原理

深入Python程序間通訊原理

繼上節使用原生多程序並行執行,基於Redis作為訊息佇列完成了圓周率的計算,本節我們使用原生作業系統訊息佇列來替換Redis。

檔案

使用檔案進行通訊是最簡單的一種通訊方式,子程序將結果輸出到臨時檔案,父程序從檔案中讀出來。檔名使用子程序的程序id來命名。程序隨時都可以通過 os.getpid() 來獲取自己的程序id。

# coding: utf-8

import os
import sys
import math


def slice(mink, maxk):
    s = 0.0
    for k in range(mink, maxk):
        s += 1.0/(2*k+1)/(2
*k+1) return s def pi(n): pids = [] unit = n / 10 for i in range(10): # 分10個子程序 mink = unit * i maxk = mink + unit pid = os.fork() if pid > 0: pids.append(pid) else: s = slice(mink, maxk) # 子程序開始計算 with open("%d"
% os.getpid(), "w") as f: f.write(str(s)) sys.exit(0) # 子程序結束 sums = [] for pid in pids: os.waitpid(pid, 0) # 等待子程序結束 with open("%d" % pid, "r") as f: sums.append(float(f.read())) os.remove("%d" % pid) # 刪除通訊的檔案 return math.sqrt(sum(sums) * 8
) print pi(10000000)

輸出

3.14159262176

管道pipe

管道是Unix程序間通訊最常用的方法之一,它通過在父子程序之間開通讀寫通道來進行雙工交流。我們通過os.read()和os.write()來對檔案描述符進行讀寫操作,使用os.close()關閉描述符。

上圖為單程序的管道

上圖為父子程序分離後的管道

# coding: utf-8

import os
import sys
import math


def slice(mink, maxk):
    s = 0.0
    for k in range(mink, maxk):
        s += 1.0/(2*k+1)/(2*k+1)
    return s


def pi(n):
    childs = {}
    unit = n / 10
    for i in range(10):  # 分10個子程序
        mink = unit * i
        maxk = mink + unit
        r, w = os.pipe()
        pid = os.fork()
        if pid > 0:
            childs[pid] = r  # 將子程序的pid和讀描述符存起來
            os.close(w)  # 父程序關閉寫描述符,只讀
        else:
            os.close(r)  # 子程序關閉讀描述符,只寫
            s = slice(mink, maxk)  # 子程序開始計算
            os.write(w, str(s))
            os.close(w)  # 寫完了,關閉寫描述符
            sys.exit(0)  # 子程序結束
    sums = []
    for pid, r in childs.items():
        sums.append(float(os.read(r, 1024)))
        os.close(r)  # 讀完了,關閉讀描述符
        os.waitpid(pid, 0)  # 等待子程序結束
    return math.sqrt(sum(sums) * 8)


print pi(10000000)

輸出

3.14159262176

無名套接字socketpair

我們知道跨網路通訊免不了要通過套接字進行通訊,但是本例的多程序是在同一個機器上,用不著跨網路,使用普通套接字進行通訊有點浪費。

上圖為單程序的socketpair

上圖為父子程序分離後的socketpair

為了解決這個問題,Unix系統提供了無名套接字socketpair,不需要埠也可以建立套接字,父子程序通過socketpair來進行全雙工通訊。

socketpair返回兩個套接字物件,一個用於讀一個用於寫,它有點類似於pipe,只不過pipe返回的是兩個檔案描述符,都是整數。所以寫起程式碼形式上跟pipe幾乎沒有什麼區別。

我們使用sock.send()和sock.recv()來對套接字進行讀寫,通過sock.close()來關閉套接字物件。

# coding: utf-8

import os
import sys
import math
import socket


def slice(mink, maxk):
    s = 0.0
    for k in range(mink, maxk):
        s += 1.0/(2*k+1)/(2*k+1)
    return s


def pi(n):
    childs = {}
    unit = n / 10
    for i in range(10):  # 分10個子程序
        mink = unit * i
        maxk = mink + unit
        rsock, wsock = socket.socketpair()
        pid = os.fork()
        if pid > 0:
            childs[pid] = rsock
            wsock.close()
        else:
            rsock.close()
            s = slice(mink, maxk)  # 子程序開始計算
            wsock.send(str(s))
            wsock.close()
            sys.exit(0)  # 子程序結束
    sums = []
    for pid, rsock in childs.items():
        sums.append(float(rsock.recv(1024)))
        rsock.close()
        os.waitpid(pid, 0)  # 等待子程序結束
    return math.sqrt(sum(sums) * 8)


print pi(10000000)

輸出

3.14159262176

OS訊息佇列

作業系統也提供了跨程序的訊息佇列物件可以讓我們直接使用,只不過python沒有預設提供包裝好的api來直接使用。我們必須使用第三方擴充套件來完成OS訊息佇列通訊。第三方擴充套件是通過使用Python包裝的C實現來完成的。

OS訊息佇列有兩種形式,一種是posix訊息佇列,另一種是systemv訊息佇列,有些作業系統兩者都支援,有些只支援其中的一個,比如macos僅支援systemv訊息佇列,我本地的python的docker映象是debian linux,它僅支援posix訊息佇列。

posix訊息佇列我們先使用posix訊息佇列來完成圓周率的計算,posix訊息佇列需要提供一個唯一的名稱,它必須是 / 開頭。close()方法僅僅是減少核心訊息佇列物件的引用,而不是徹底關閉它。unlink()方法才能徹底銷燬它。O_CREAT選項表示如果不存在就建立。向佇列裡塞訊息使用send方法,收取訊息使用receive方法,receive方法返回一個tuple,tuple的第一個值是訊息的內容,第二個值是訊息的優先順序。之所以有優先順序,是因為posix訊息佇列支援訊息的排序,在send方法的第二個引數可以提供優先順序整數值,預設為0,越大優先順序越高。

# coding: utf-8

import os
import sys
import math
from posix_ipc import MessageQueue as Queue


def slice(mink, maxk):
    s = 0.0
    for k in range(mink, maxk):
        s += 1.0/(2*k+1)/(2*k+1)
    return s


def pi(n):
    pids = []
    unit = n / 10
    q = Queue("/pi", flags=os.O_CREAT)
    for i in range(10):  # 分10個子程序
        mink = unit * i
        maxk = mink + unit
        pid = os.fork()
        if pid > 0:
            pids.append(pid)
        else:
            s = slice(mink, maxk)  # 子程序開始計算
            q.send(str(s))
            q.close()
            sys.exit(0)  # 子程序結束
    sums = []
    for pid in pids:
        sums.append(float(q.receive()[0]))
        os.waitpid(pid, 0)  # 等待子程序結束
    q.close()
    q.unlink()  # 徹底銷燬佇列
    return math.sqrt(sum(sums) * 8)


print pi(10000000)

輸出

3.14159262176

systemv訊息佇列systemv訊息佇列和posix訊息佇列用起來有所不同。systemv的訊息佇列是以整數key作為名稱,如果不指定,它就建立一個唯一的未佔用的整數key。它還提供訊息型別的整數引數,但是不支援訊息優先順序。

# coding: utf-8

import os
import sys
import math
import sysv_ipc
from sysv_ipc import MessageQueue as Queue


def slice(mink, maxk):
    s = 0.0
    for k in range(mink, maxk):
        s += 1.0/(2*k+1)/(2*k+1)
    return s


def pi(n):
    pids = []
    unit = n / 10
    q = Queue(key=None, flags=sysv_ipc.IPC_CREX)
    for i in range(10):  # 分10個子程序
        mink = unit * i
        maxk = mink + unit
        pid = os.fork()
        if pid > 0:
            pids.append(pid)
        else:
            s = slice(mink, maxk)  # 子程序開始計算
            q.send(str(s))
            sys.exit(0)  # 子程序結束
    sums = []
    for pid in pids:
        sums.append(float(q.receive()[0]))
        os.waitpid(pid, 0)  # 等待子程序結束
    q.remove()  # 銷燬訊息佇列
    return math.sqrt(sum(sums) * 8)


print pi(10000000)

輸出

3.14159262176

共享記憶體

共享記憶體也是非常常見的多程序通訊方式,作業系統負責將同一份實體地址的記憶體對映到多個程序的不同的虛擬地址空間中。進而每個程序都可以操作這份記憶體。考慮到實體記憶體的唯一性,它屬於臨界區資源,需要在程序訪問時搞好併發控制,比如使用訊號量。我們通過一個訊號量來控制所有子程序的順序讀寫共享記憶體。我們分配一個8位元組double型別的共享記憶體用來儲存極限的和,每次從共享記憶體中讀出來時,要使用struct進行反序列化(unpack),將新的值寫進去之前也要使用struct進行序列化(pack)。每次讀寫操作都需要將讀寫指標移動到記憶體開頭位置(lseek)。

# coding: utf-8

import os
import sys
import math
import struct
import posix_ipc
from posix_ipc import Semaphore
from posix_ipc import SharedMemory as Memory


def slice(mink, maxk):
    s = 0.0
    for k in range(mink, maxk):
        s += 1.0/(2*k+1)/(2*k+1)
    return s


def pi(n):
    pids = []
    unit = n / 10
    sem_lock = Semaphore("/pi_sem_lock", flags=posix_ipc.O_CREX, initial_value=1)  # 使用一個訊號量控制多個程序互斥訪問共享記憶體
    memory = Memory("/pi_rw", size=8, flags=posix_ipc.O_CREX)
    os.lseek(memory.fd, 0, os.SEEK_SET)  # 初始化和為0.0的double值
    os.write(memory.fd, struct.pack('d', 0.0))
    for i in range(10):  # 分10個子程序
        mink = unit * i
        maxk = mink + unit
        pid = os.fork()
        if pid > 0:
            pids.append(pid)
        else:
            s = slice(mink, maxk)  # 子程序開始計算
            sem_lock.acquire()
            try:
                os.lseek(memory.fd, 0, os.SEEK_SET)
                bs = os.read(memory.fd, 8)  # 從共享記憶體讀出來當前值
                cur_val, = struct.unpack('d', bs)  # 反序列化,逗號不能少
                cur_val += s  # 加上當前程序的計算結果
                bs = struct.pack('d', cur_val) # 序列化
                os.lseek(memory.fd, 0, os.SEEK_SET)
                os.write(memory.fd, bs)  # 寫進共享記憶體
                memory.close_fd()
            finally:
                sem_lock.release()
            sys.exit(0)  # 子程序結束
    sums = []
    for pid in pids:
        os.waitpid(pid, 0)  # 等待子程序結束
    os.lseek(memory.fd, 0, os.SEEK_SET)
    bs = os.read(memory.fd, 8)  # 讀出最終這結果
    sums, = struct.unpack('d', bs)  # 反序列化
    memory.close_fd()  # 關閉共享記憶體
    memory.unlink()  # 銷燬共享記憶體
    sem_lock.unlink()  #  銷燬訊號量
    return math.sqrt(sums * 8)


print pi(10000000)

輸出

3.14159262176

python學習交流群:125240963

轉載至https://zhuanlan.zhihu.com/p/37370601