1. 程式人生 > 實用技巧 >RPC基本原理

RPC基本原理

RPC基本原理

RPC簡介

  • 定義

遠端過程呼叫(英語:Remote Procedure Call,縮寫為 RPC,也叫遠端程式呼叫)是一個計算機通訊協議。該協議允許運行於一臺計算機的程式呼叫另一臺計算機的子程式,而程式設計師無需額外地為這個互動作用程式設計。如果涉及的軟體採用面向物件程式設計,那麼遠端過程呼叫亦可稱作遠端呼叫遠端方法呼叫

  • 背景用途

在單臺計算機中,我們可以通過程式呼叫來傳遞控制和資料;或者說通過程式呼叫,我們可以將多個程式組成一個整體來實現某個功能。

如果將這種呼叫機制推廣到多臺彼此間可以進行網路通訊的計算機,由多臺計算機中的多個程式組成一個整體來實現某個功能,這也是可以的。呼叫的一方(發起遠端過程呼叫,然後呼叫這方的環境掛起,引數通過網路傳遞給被呼叫方,被呼叫的一方執行程式,當程式執行完成後,產生的結果再通過網路回傳給呼叫的一方,呼叫的一方恢復繼續執行。這樣一種原型思想,就是我們所說的RPC遠端過程呼叫。

RPC這種思想最早可以追溯到1976年,RPC的發展到今天已經40年有餘了。

如今的計算機應用中,單機效能上很難承受住產品的壓力,需要不斷擴充多臺機器來提升整體的效能。同時為了充分利用這些叢集裡的計算機,需要對其從架構上進行劃分,以提供不同的服務,服務間相互呼叫完成整個產品的功能。RPC就能幫助我們解決這些服務間的資訊傳遞和呼叫。

  • 概念說明

廣義

我們可以將所有通過網路來進行通訊呼叫的實現統稱為RPC。

按照這樣來理解的話,那我們發現HTTP其實也算是一種RPC實現。

狹義

區別於HTTP的實現方式,在傳輸的資料格式上和傳輸的控制上獨立實現。比如在機器間通訊傳輸的資料不採用HTTP協議的方式(分為起始行、header、body三部份),而是使用自定義格式的二進位制方式。

我們更多時候談到的RPC都是指代這種狹義上的理解。

  • 優缺點

相比於傳統HTTP的實現而言:

優點

- 效率高
- 發起RPC呼叫的一方,在編寫程式碼時可忽略RPC的具體實現,如同編寫本地函式呼叫一樣

缺點

- 通用性不如HTTP好 因為傳輸的資料不是HTTP協議格式,所以呼叫雙方需要專門實現的通訊庫,對於不同的程式設計開發語言,都要有相關實現。而HTTP作為一個標準協議,大部分的語言都已有相關的實現,通用性更好。

HTTP更多的面向使用者與產品伺服器的通訊。

RPC更多的面向產品內部伺服器間的通訊。

RPC結構

RPC的設計思想是力圖使遠端呼叫中的通訊細節對於使用者透明,呼叫雙方無需關心網路通訊的具體實現。因而實現RPC要進行一定的封裝。

RPC原理上是按如下結構流程進行實現的。

流程

1. 呼叫者(Caller, 也叫客戶端、Client)以本地呼叫的方式發起呼叫;
2. Client stub(客戶端存根,可理解為輔助助手)收到呼叫後,負責將被呼叫的方法名、引數等打包編碼成特定格式的能進行網路傳輸的訊息體;
3. Client stub將訊息體通過網路傳送給對端(服務端)
4. Server stub(服務端存根,同樣可理解為輔助助手)收到通過網路接收到訊息後按照相應格式進行拆包解碼,獲取方法名和引數;
5. Server stub根據方法名和引數進行本地呼叫;
6. 被呼叫者(Callee,也叫Server)本地呼叫執行後將結果返回給server stub;
7. Server stub將返回值打包編碼成訊息,並通過網路傳送給對端(客戶端);
8. Client stub收到訊息後,進行拆包解碼,返回給Client;
9. Client得到本次RPC呼叫的最終結果。

RPC的目標就是要2~8這些步驟都封裝起來,讓使用者對這些細節透明。

在瞭解了RPC流程之後,為了實現RPC,我們還需要關注兩點:

訊息協議

客戶端呼叫的引數和服務端的返回值這些在網路上傳輸的資料以何種方式打包編碼和拆包解碼。

我們可以使用HTTP協議中關於報文格式的規定(如此一來,就程式設計了HTTP通訊),也可以自己定義某種格式,讓客戶端與服務端雙方都遵循此種格式。

傳輸控制

在網路中資料的收發傳輸控制具體如何實現。

訊息協議

在實現RPC呼叫時,通訊雙方傳輸的資料(訊息)如何表達描述,設計時一般會考慮兩個目標

效能高

效能高包括兩點:

  1. 將原始資料轉換為訊息資料的速度快
  2. 轉換後的訊息資料體積小

跨語言

RPC呼叫沒有要求呼叫雙方的程式語言必須相同,如果能做到跨語言呼叫是最好,這會方便產品開發中不同的功能服務以最合適的語言實現,然後使用RPC實現彼此呼叫。因此RPC呼叫中傳輸的訊息資料應該儘量能讓跟多的語言支援。

本課程中僅以Python為例進行講解。

  • 邊界

在網路傳輸中,一方可能連續向另一方多次傳送訊息,收到資料的一方如何界定資料中包含幾條訊息,這便是訊息邊界問題。

考慮TCP傳輸控制協議,在一條TCP連結中可以多次傳送資料,如果傳送的資料過大,就會被TCP底層實現拆解為多個數據包依次傳送;而如果傳送的資料過小,又可能會將幾條資料組裝成一個數據包進行傳送。

為了解決訊息邊界的問題,有兩種較為常用的方法:分割符法長度宣告法

分隔符法

顧名思義,就是在每條訊息的結尾放置一種特殊的分割符(一種常用的分割符是\r\n),表示已到達本條訊息的末尾。

長度宣告法

長度宣告法是在訊息的起始位置,用一個固定長度的整數值(通常為4位元組)宣告本訊息的長度,接收者先讀取出長度宣告,再按照宣告的長度讀取出相應大小的資料即可。

例如,HTTP協議同時運用了這兩種方法:

HTTP/1.0 200 OK\r\n
Server: Nginx\r\n
Content-Type: text/html; charset=utf-8\r\n
Content-Length: 5096\r\n
\r\n
//  此處為5096位元組的資料
  • 內容

在具體訊息內容的表現形式上,可以使用文字,也可以使用二進位制。

文字

我們可以將資料轉換為具備某種格式的字串(如 JSON),將字串作為訊息內容傳送。

比如一個RPC呼叫請求,方法名為divide,引數為200和100,我們可以用JSON字串的形式來表示這個訊息內容:

{
    "name": "divide",
    "params": {
        "num1": 200,
        "num2": 100
    }
}

採用JSON這種方式,大多數程式語言都已有JSON轉換的工具,實現起來相對便捷。但是我們能夠看到,形成的訊息資料不夠精簡,資料中有較為無意義的,如"、{、}、,、空白字元等,在網路傳輸中會造成浪費。

二進位制

二進位制方式就是將資料在記憶體中的一些列原始二進位制位或位元組直接在網路中傳送,而無需轉換為字串再傳送。

我們能夠看到,採用原始二進位制傳遞,可以省去中間轉換的環節,而且資料量也會大大減少,效率更高。

如果使用二進位制的方式來傳遞上面舉例的RPC呼叫請求,該如何組織資料呢?這就需要實現RPC機制的設計人員來制訂一個呼叫雙方都遵守的協議規則,不同的設計人員可能有不同的想法。

  • 壓縮

如果訊息資料過大,為了減輕網路頻寬的壓力,可以考慮對訊息資料進行壓縮處理。

就如同我們平時對一些檔案、視訊等使用壓縮軟體進行壓縮來減小大小一樣,我們可以在構造好資料準備傳送前,先用演算法將資料進行壓縮處理,然後通過網路傳送到對端,對端收到資料後,先進行解壓縮處理,然後得到原體積資料後再進行解析。

即使是比文字資料小的二進位制資料,我們仍然可以進行壓縮處理。

但是需要注意的是,壓縮處理是一把雙刃劍,雖然能減少資料量減輕頻寬壓力,但是同時額外增加了壓縮和解壓縮的過程,壓縮和解壓縮在處理的時候會有時間的消耗,會導致作業系統的負擔加重。有時壓縮的成本可能比減少資料量帶來的收益還高,就得不償失了。

所以是否採用壓縮處理,要根據具體情況權衡利弊。

二進位制訊息協議實現

  • 概述

我們現在實現一個RPC的服務介面定義,這個RPC呼叫可以完成除法操作。

# 實現本地呼叫的話,很容易,定義一個函式divide
class InvalidOperation(Exception):
    """
    自定義非法操作異常
    """
    def __init__(self, message=None):
        self.message = message or 'invalid operation'

def divide(num1, num2=1):
     """
     除法
     :param num1: int
     :param num2: int, 預設值為1
     :return: float 商 或 InvalidOperation異常
     """
     if num2 == 0:
         raise InvalidOperation()
     val = num1 / num2
     return val
    
# 呼叫的時候,呼叫divde函式即可:

try:
    val = divide(200, 100)
except InvalidOperation as e:
    print(e.message)
else:
    print(val)

但是如果變成RPC呼叫的話,呼叫雙方該以什麼樣的方式傳遞哪些訊息資料呢?

我們使用二進位制方式來實現訊息協議。為了突出訊息協議本身,我們不再進行額外壓縮處理。

我們將上面的過程抽象成介面

float divide(1:int num1, 2:int num2=1) => InvalidOperation

訊息協議分為兩部分說明:

呼叫請求訊息

方法名為divide
第1個呼叫引數為整型int,名為num1
第2個呼叫引數為整型int,名為num2,預設值為1

呼叫返回訊息

正常返回float型別
錯誤會丟擲InvalidOperation異常
  • 實現

divide訊息協議實現

import struct
from io import BytesIO

class DivideProtocol(object):
    """
    float divide(1:int num1, 2:int num2=1)
    """
    def _read_all(self, size):
        """
        讀取指定長度的位元組
        :param size: 長度
        :return: 讀取出的二進位制資料
        """
        if isinstance(self.conn, BytesIO):
            # BytesIO型別,用於演示
            buff = b''
            have = 0
            while have < size:
                chunk = self.conn.read(size - have)
                have += len(chunk)
                buff += chunk
            return buff

        else:
            # socket型別
            buff = b''
            have = 0
            while have < size:
                chunk = self.conn.recv(size - have)
                have += len(chunk)
                buff += chunk
                # 客戶端關閉了連線
                if len(chunk) == 0:
                    raise EOFError()
            return buff

    def args_encode(self, num1, num2=1):
        """
        對呼叫引數進行編碼
        :param num1: int
        :param num2: int
        :return: 編碼後的二進位制資料
        """
        # 處理引數num1, 4位元組整型
        buff = struct.pack('!B', 1)
        buff += struct.pack('!i', num1)

        # 處理引數num2, 4位元組整型,如為預設值1,則不再放到訊息中
        if num2 != 1:
            buff += struct.pack('!B', 2)
            buff += struct.pack('!i', num2)

        # 處理訊息總長度,4位元組無符號整型
        length = len(buff)

        # 處理方法名,字串型別
        name = 'divide'
        # 字串長度,4位元組無符號整型
        msg = struct.pack('!I', len(name))
        msg += name.encode()

        msg += struct.pack('!I', length) + buff

        return msg

    def args_decode(self, connection):
        """
        獲取呼叫引數並進行解碼
        :param connection: 傳輸工具物件,如socket物件或者BytesIO物件,從中可以讀取訊息資料
        :return: 解碼後的引數字典
        """
        # 儲存到當前物件中,供_read_all方式使用
        self.conn = connection
        param_name_map = {
            1: 'num1',
            2: 'num2'
        }
        param_len_map = {
            1: 4,
            2: 4
        }
        # 用於儲存解碼後的引數字典
        args = dict()

        # 讀取訊息總長度,4字無節符號整數
        buff = self._read_all(4)
        length = struct.unpack('!I', buff)[0]

        # 記錄已讀取的長度
        have = 0

        # 讀取第一個引數,4位元組整型
        buff = self._read_all(1)
        have += 1
        param_seq = struct.unpack('!B', buff)[0]
        param_len = param_len_map[param_seq]
        buff = self._read_all(param_len)
        have += param_len
        args[param_name_map[param_seq]] = struct.unpack('!i', buff)[0]

        if have >= length:
            return args

        # 讀取第二個引數,4位元組整型
        buff = self._read_all(1)
        have += 1
        param_seq = struct.unpack('!B', buff)[0]
        param_len = param_len_map[param_seq]
        buff = self._read_all(param_len)
        have += param_len
        args[param_name_map[param_seq]] = struct.unpack('!i', buff)[0]

        return args

    def result_encode(self, result):
        """
        對呼叫的結果進行編碼
        :param result: float 或 InvalidOperation物件
        :return: 編碼後的二進位制資料
        """
        if isinstance(result, float):
            # 沒有異常,正常執行
            # 處理結果型別,1位元組無符號整數
            buff = struct.pack('!B', 1)

            # 處理結果值, 4位元組float
            buff += struct.pack('!f', result)
        else:
            # 發生了InvalidOperation異常
            # 處理結果型別,1位元組無符號整數
            buff = struct.pack('!B', 2)

            # 處理異常結果值, 字串
            # 處理字串長度, 4位元組無符號整數
            buff += struct.pack('!I', len(result.message))
            # 處理字串內容
            buff += result.message.encode()

        return buff

    def result_decode(self, connection):
        """
        對呼叫結果進行解碼
        :param connection: 傳輸工具物件,如socket物件或者BytesIO物件,從中可以讀取訊息資料
        :return: 結果資料
        """
        self.conn = connection

        # 取出結果型別, 1位元組無符號整數
        buff = self._read_all(1)
        result_type = struct.unpack('!B', buff)[0]
        if result_type == 1:
            # float的結果值, 4位元組float
            buff = self._read_all(4)
            result = struct.unpack('!f', buff)[0]
            return result
        else:
            # InvalidOperation物件
            # 取出字串長度, 4位元組無符號整數
            buff = self._read_all(4)
            str_len = struct.unpack('!I', buff)[0]
            buff = self._read_all(str_len)
            message = buff.decode()
            return InvalidOperation(message)

解析方法名實現

class MethodProtocol(object):
    def __init__(self, connection):
        self.conn = connection

    def _read_all(self, size):
        """
        讀取指定長度的位元組
        :param size: 長度
        :return: 讀取出的二進位制資料
        """
        if isinstance(self.conn, BytesIO):
            # BytesIO型別,用於演示
            buff = b''
            have = 0
            while have < size:
                chunk = self.conn.read(size - have)
                have += len(chunk)
                buff += chunk

            return buff

        else:
            # socket型別
            buff = b''
            have = 0
            while have < size:
                print('have=%d size=%d' % (have, size))
                chunk = self.conn.recv(size - have)
                have += len(chunk)
                buff += chunk

                if len(chunk) == 0:
                    raise EOFError()

            return buff

    def get_method_name(self):
        # 獲取方法名
        # 讀取字串長度,4位元組無符號整型
        buff = self._read_all(4)
        str_len = struct.unpack('!I', buff)[0]

        # 讀取字串
        buff = self._read_all(str_len)
        name = buff.decode()
        return name

測試程式碼

if __name__ == '__main__':
    proto = DivideProtocol()
    # 構造訊息
    buff = BytesIO()
    # buff.write(proto.args_encode(100, 200))
    buff.write(proto.args_encode(100))
    # 解讀訊息
    buff.seek(0)
    name = MethodProtocol(buff).get_method_name()
    print(name)
    args = proto.args_decode(buff)
    print(args)
    buff.close()

RPC傳輸

在構造好RPC訊息資料後,就可以通過網路在呼叫雙方進行傳遞了。

傳遞訊息資料常用的有兩種方式:TCPHTTP

  • TCP

由於TCP的可靠性,所以TCP是最常用的方式。我們可以直接藉助socket工具進行TCP開發。

TCP服務端編寫

sock = socket.socket()  # 建立一個套接字
sock.bind()  # 繫結埠
sock.listen()  # 監聽連線
sock.accept()  # 接受新連線
sock.close()  # 關閉伺服器套接字

TCP客戶端編寫

sock = socket.socket()  # 建立一個套接字
sock.connect()  # 連線遠端伺服器
sock.recv() # 讀
sock.send()  # 儘可能地寫
sock.sendall()  # 完全寫
sock.close()  # 關閉

用TCP來進行實現傳輸控制實現

import socket


class Channel(object):
    """
    連線通道
    """
    def __init__(self, host, port):
        self.host = host
        self.port = port

    def get_connection(self):
        """
        獲取一個tcp連線
        """
        sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
        sock.connect((self.host, self.port))
        return sock


class Server(object):
    """
    伺服器
    """
    def __init__(self, host, port):
        self.sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
        self.sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
        self.host = host
        self.port = port
        self.sock.bind((host, port))
  • HTTP

雖然HTTP屬於網路層級中應用層的協議,但是我們仍然可以藉助HTTP來幫助我們傳遞RPC訊息。

我們可以將構造好的RPC訊息資料嵌入到HTTP報文中的body部分,而對於HTTP的path路徑等都無需關心。

HTTP/1.0 POST /
Content-Type: binary
Content-Length: 5096

# 此處放置RPC訊息資料

使用HTTP作為傳輸工具的好處是可以直接利用開發語言中已有的http庫來進行通訊,無需自己操作socket的讀寫。但是通訊效率不如TCP高,所以不常用。

RPC完整實現

  • 訊息協議
import struct
from io import BytesIO
import socket


class InvalidOperation(Exception):
    def __init__(self, message=None):
        self.message = message or 'invalid operation'


class MethodProtocol(object):
    """
    解讀方法名字
    """
    def __init__(self, connection):
        self.conn = connection

    def _read_all(self, size):
        """
        幫助我們讀取二進位制資料
        :param size: 想要讀取的二進位制資料大小
        :return: 二進位制資料 bytes
        """
        # self.conn
        # 讀取二進位制資料
        # socket.recv(4) => ?4
        # BytesIO.read
        if isinstance(self.conn, BytesIO):
            buff = self.conn.read(size)
            return buff
        else:
            # socket
            have = 0
            buff = b''
            while have < size:
                chunk = self.conn.recv(size-have)
                buff += chunk
                l = len(chunk)
                have += l

                if l == 0:
                    # 表示客戶端socket關閉了
                    raise EOFError()
            return buff

    def get_method_name(self):
        """
        提供方法名
        :return: str 方法名
        """
        # 讀取字串長度
        buff = self._read_all(4)
        length = struct.unpack('!I', buff)[0]

        # 讀取字串
        buff = self._read_all(length)
        name = buff.decode()
        return name


class DivideProtocol(object):
    """
    divide過程訊息協議轉換工具
    """
    def args_encode(self, num1, num2=1):
        """
        將原始的呼叫請求引數轉換打包成二進位制訊息資料
        :param num1:  int
        :param num2:  int
        :return: bytes 二進位制訊息資料
        """
        name = 'divide'

        # 處理方法的名字 字串
        # 處理字串的長度
        buff = struct.pack('!I', 6)
        # 處理字元
        buff += name.encode()

        # 處理引數1
        # 處理序號
        buff2 = struct.pack('!B', 1)
        # 處理引數值
        buff2 += struct.pack('!i', num1)

        # 處理引數2
        if num2 != 1:
            # 處理序號
            buff2 += struct.pack('!B', 2)
            # 處理引數值
            buff2 += struct.pack('!i', num2)

        # 處理訊息長度,邊界設定
        length = len(buff2)
        buff += struct.pack('!I', length)

        buff += buff2

        return buff

    def _read_all(self, size):
        """
        幫助我們讀取二進位制資料
        :param size: 想要讀取的二進位制資料大小
        :return: 二進位制資料 bytes
        """
        # self.conn
        # 讀取二進位制資料
        # socket.recv(4) => ?4
        # BytesIO.read
        if isinstance(self.conn, BytesIO):
            buff = self.conn.read(size)
            return buff
        else:
            # socket
            have = 0
            buff = b''
            while have < size:
                chunk = self.conn.recv(size-have)
                buff += chunk
                l = len(chunk)
                have += l

                if l == 0:
                    # 表示客戶端socket關閉了
                    raise EOFError()
            return buff

    def args_decode(self, connection):
        """
        接收呼叫請求訊息資料並進行解析
        :param connection: 連線物件 socket BytesIO
        :return: dict 包含了解析之後的引數
        """
        param_len_map = {
            1: 4,
            2: 4
        }
        param_fmt_map = {
            1: '!i',
            2: '!i'
        }
        param_name_map = {
            1: 'num1',
            2: 'num2'
        }

        # 儲存用來返回的引數字典
        # args = {"num1": xxx, "num2": xxx}
        args = {}

        self.conn = connection
        # 處理方法名已經提前被處理(稍後實現)

        # 處理訊息邊界
        # 讀取二進位制資料
        # socket.recv(4) => ?4
        # BytesIO.read
        buff = self._read_all(4)
        # 將二進位制資料轉換為python的資料型別
        length = struct.unpack('!I', buff)[0]

        # 已經讀取處理的位元組數
        have = 0

        # 處理第一個引數
        # 處理引數序號
        buff = self._read_all(1)
        have += 1
        param_seq = struct.unpack('!B', buff)[0]

        # 處理引數值
        param_len = param_len_map[param_seq]
        buff = self._read_all(param_len)
        have += param_len
        param_fmt = param_fmt_map[param_seq]
        param = struct.unpack(param_fmt, buff)[0]

        param_name = param_name_map[param_seq]
        args[param_name] = param

        if have >= length:
            return args

        # 處理第二個引數
        # 處理引數序號
        buff = self._read_all(1)
        param_seq = struct.unpack('!B', buff)[0]

        # 處理引數值
        param_len = param_len_map[param_seq]
        buff = self._read_all(param_len)
        param_fmt = param_fmt_map[param_seq]
        param = struct.unpack(param_fmt, buff)[0]

        param_name = param_name_map[param_seq]
        args[param_name] = param

        return args

    def result_encode(self, result):
        """
        將原始結果資料轉換為訊息協議二進位制資料
        :param result: 原始結果資料 float InvalidOperation
        :return: bytes 訊息協議二進位制資料
        """
        # 正常
        if isinstance(result, float):
            pass
            # 處理返回值型別
            buff = struct.pack('!B', 1)
            buff += struct.pack('!f', result)
            return buff
        # 異常
        else:
            # 處理返回值型別
            buff = struct.pack('!B', 2)
            # 處理返回值
            length = len(result.message)
            # 處理字串長度
            buff += struct.pack('!I', length)
            # 處理字元
            buff += result.message.encode()
            return buff

    def result_decode(self, connection):
        """
        將返回值訊息資料轉換為原始返回值
        :param connection: socket BytesIO
        :return: float  InvalidOperation物件
        """
        self.conn = connection

        # 處理返回值型別
        buff = self._read_all(1)
        result_type = struct.unpack('!B', buff)[0]

        if result_type == 1:
            # 正常
            # 讀取float資料
            buff = self._read_all(4)
            val = struct.unpack('!f', buff)[0]
            return val
        else:
            # 異常
            # 讀取字串的長度
            buff = self._read_all(4)
            length = struct.unpack('!I', buff)[0]

            # 讀取字串
            buff = self._read_all(length)
            message = buff.decode()

            return InvalidOperation(message)


class Channel(object):
    """
    使用者客戶端建立網路連線
    """
    def __init__(self, host, port):
        """

        :param host: 伺服器地址
        :param port: 伺服器埠號
        """
        self.host = host
        self.port = port

    def get_connection(self):
        """
        獲取連線物件
        :return: 與伺服器通訊的socket
        """
        sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
        sock.connect((self.host, self.port))
        return sock

  • Server
class Server(object):
    """
    RPC伺服器
    """
    def __init__(self, host, port, handlers):
        # 建立socket的工具物件
        sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)

        # 設定socket 重用地址
        sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)

        # 繫結地址
        sock.bind((host, port))
        self.host = host
        self.port = port
        self.sock = sock
        self.handlers = handlers

    def serve(self):
        """
        開啟伺服器執行,提供RPC服務
        :return:
        """
        # 開啟伺服器的監聽,等待客戶端的連線請求
        self.sock.listen(128)
        print("伺服器開始監聽")

        # 接受客戶端的連線請求
        while True:
            client_sock, client_addr = self.sock.accept()
            print('與客戶端%s建立了連線' % str(client_addr))

            # 交給ServerStub,完成客戶端的具體的RPC呼叫請求
            stub = ServerStub(client_sock, self.handlers)
            try:
                while True:
                    stub.process()
            except EOFError:
                # 表示客戶端關閉了連線
                print('客戶端關閉了連線')
                client_sock.close()
  • Client Stub 客戶端存根

使用者直接操作的物件,由client stub構造訊息資料藉助連線傳送給服務端,並接收解析服務端的返回訊息傳遞給使用者。

class ClientStub(object):
    """
    用來幫助客戶端完成遠端過程呼叫  RPC呼叫

    stub = ClientStub()
    stub.divide(200, 100)
    stub.add()
    """
    def __init__(self, channel):
        self.channel = channel
        self.conn = self.channel.get_connection()

    def divide(self, num1, num2=1):
        # 將呼叫的引數打包成訊息協議的資料
        proto = DivideProtocol()
        args = proto.args_encode(num1, num2)

        # 將訊息資料通過網路傳送給伺服器
        self.conn.sendall(args)

        # 接受伺服器返回的返回值訊息資料,並進行解析
        result = proto.result_decode(self.conn)

        # 將結果值(正常float 或 異常InvalidOperation)返回給客戶端
        if isinstance(result, float):
            # 正常
            return result
        else:
            # 異常
            raise result
  • Server Stub 服務端存根

幫助服務端接收呼叫訊息資料並解析,在本地呼叫程式後將結果構造返回值訊息返回給客戶端。

class ServerStub(object):
    """
    幫助服務端完成遠端過程呼叫
    """
    def __init__(self, connection, handlers):
        """

        :param connection: 與客戶端的連線
        :param handlers: 真正本地被呼叫的方法(函式 過程)
        class Handlers:

            @staticmethod
            def divide(num1, num2=1):
                pass

            def add():
                pass
        """
        self.conn = connection
        self.method_proto = MethodProtocol(self.conn)
        self.process_map = {
            'divide': self._process_divide,
        }
        self.handlers = handlers

    def process(self):
        """
        當服務端接受了一個客戶端的連線,建立好連線後,完成遠端呼叫處理
        :return:
        """
        # 接收訊息資料,並解析方法的名字
        name = self.method_proto.get_method_name()

        # 根據解析獲得的方法(過程)名,呼叫響應的過程協議,接受並解析訊息資料
        # self.process_map[name]()
        _process = self.process_map[name]
        _process()

    def _process_divide(self):
        """
        處理除法過程呼叫
        :return:
        """
        # 建立用於除法過程呼叫引數協議資料解析的工具
        proto = DivideProtocol()
        # 解析呼叫引數訊息資料
        args = proto.args_decode(self.conn)
        # args = {"num1": xxx, "num2": xxx}

        # 進行除法的本地過程呼叫
        # 將本地呼叫過程的返回值(包括可能的異常)打包成訊息協議資料,通過網路返回給客戶端
        try:
            val = self.handlers.divide(**args)
        except InvalidOperation as e:
            ret_message = proto.result_encode(e)
        else:
            ret_message = proto.result_encode(val)

        self.conn.sendall(ret_message)
  • 測試

服務端server.py

from services import Server
from services import InvalidOperation


class Handlers:
    @staticmethod
    def divide(num1, num2=1):
        """
        除法
        :param num1:
        :param num2:
        :return:
        """
        if num2 == 0:
            raise InvalidOperation()
        val = num1 / num2
        return val


if __name__ == '__main__':
    server = Server('127.0.0.1', 8000, Handlers)
    server.serve()

客戶端client.py

from services import ClientStub
from services import Channel
from services import InvalidOperation
import time

channel = Channel('127.0.0.1', '8000')
stub = ClientStub(channel)

for i in range(5):
    try:
        val = stub.divide(i*100, 10)
    except InvalidOperation as e:
        print(e.message)
    else:
        print(val)
    time.sleep(1)

RPC伺服器

在前面的實現中,我們只實現了RPC伺服器的最簡易模型——單程序單執行緒。

為了能讓RPC伺服器同時處理多個客戶端的請求,提升效能,我們可以改寫伺服器,採用多執行緒、多程序等。

在此我們僅實現多執行緒模型,其餘不再贅述。

import threading

class ThreadServer(object):
    """
    多執行緒RPC伺服器
    """
    def __init__(self, host, port, handlers):
        # 建立socket的工具物件
        sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)

        # 設定socket 重用地址
        sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)

        # 繫結地址
        sock.bind((host, port))
        self.host = host
        self.port = port
        self.sock = sock
        self.handlers = handlers

    def serve(self):
        """
        開啟伺服器執行,提供RPC服務
        :return:
        """
        # 開啟伺服器的監聽,等待客戶端的連線請求
        self.sock.listen(128)
        print("伺服器開始監聽")

        # 接受客戶端的連線請求
        while True:
            client_sock, client_addr = self.sock.accept()
            print('與客戶端%s建立了連線' % str(client_addr))

            # 建立子執行緒處理這個客戶端
            t = threading.Thread(target=self.handle, args=(client_sock,))
            # 開啟子執行緒執行
            t.start()

    def handle(self, client_sock):
        """
        子執行緒呼叫的方法,用來處理一個客戶端的請求
        :return:
        """
        # 交給ServerStub,完成客戶端的具體的RPC呼叫請求
        stub = ServerStub(client_sock, self.handlers)
        try:
            while True:
                stub.process()
        except EOFError:
            # 表示客戶端關閉了連線
            print('客戶端關閉了連線')
            client_sock.close()