1. 程式人生 > >萬物互聯之~RPC專欄

萬物互聯之~RPC專欄

通話 防火 而已 練習 res utf 開放 man 簡化

其他專欄最新篇:協程加強之~兼容答疑篇 | 聊聊數據庫~SQL環境篇

上篇回顧:萬物互聯之~深入篇

3.RPC引入

Code:https://github.com/lotapp/BaseCode/tree/master/python/6.net/6.rpc/

3.1.概念

RPC(Remote Procedure Call):分布式系統常見的一種通信方法(遠程過程調用),通俗講:可以一臺計算機的程序調用另一臺計算機的子程序(可以把它看成之前我們說的進程間通信,只不過這一次的進程不在同一臺PC上了)

PS:RPC的設計思想是力圖使遠程調用中的通訊細節對於使用者透明,調用雙方無需關心網絡通訊的具體實現

引用一張網上的圖:

技術分享圖片

HTTP有點相似,你可以這樣理解:

  1. 老版本的HTTP/1.0是短鏈接,而RPC是長連接進行通信
    • HTTP協議(header、body),RPC可以采取HTTP協議,也可以自定義二進制格式
  2. 後來HTTP/1.1支持了長連接(Connection:keep-alive),基本上和RPC差不多了
    • keep-alive一般都限制有最長時間,或者最多處理的請求數,而RPC是基於長連接的,基本上沒有這個限制
  3. 後來谷歌直接基於HTTP/2.0建立了gRPC,它們之間的基本上也就差不多了
    • 如果硬是要區分就是:HTTP-普通話RPC-方言的區別了
    • RPC高效而小眾,HTTP效率沒RPC高,但更通用
  4. PS:RPCHTTP調用不用經過中間件,而是端到端的直接數據交互
    • 網絡交互可以理解為基於Socket實現的(RPCHTTP都是Socket的讀寫操作)

簡單概括一下RPC的優缺點就是:

  1. 優點:
    1. 效率更高(可以自定義二進制格式)
    2. 發起RPC調用的一方,在編寫代碼時可忽略RPC的具體實現(跟編寫本地函數調用一般
  2. 缺點:
    • 通用性不如HTTP(方言普及程度肯定不如普通話),如果傳輸協議不是HTTP協議格式,調用雙方就需要專門實現通信庫

PS:HTTP更多是ClientServer的通訊;RPC更多是內部服務器間的通訊

3.2.引入

上面說這麽多,可能還沒有來個案例實在,我們看個案例:

本地調用sum()

def sum(a, b):
    """return a+b"""
    return a + b

def main():
    result = sum(1, 2)
    print(f"1+2={result}")

if __name__ == "__main__":
    main()

輸出:(這個大家都知道)

1+2=3

1.xmlrpc案例

官方文檔:

https://docs.python.org/3/library/xmlrpc.client.html
https://docs.python.org/3/library/xmlrpc.server.html

都說RPC用起來就像本地調用一樣,那麽用起來啥樣呢?看個案例:

服務端:(CentOS7:192.168.36.123:50051)

from xmlrpc.server import SimpleXMLRPCServer

def sum(a, b):
    """return a+b"""
    return a + b

# PS:50051是gRPC默認端口
server = SimpleXMLRPCServer(('', 50051))
# 把函數註冊到RPC服務器中
server.register_function(sum)
print("Server啟動ing,Port:50051")
server.serve_forever()

客戶端:(Win10:192.168.36.144

from xmlrpc.client import ServerProxy

stub = ServerProxy("http://192.168.36.123:50051")
result = stub.sum(1, 2)
print(f"1+2={result}")

輸出:(Client用起來是不是和本地差不多?就是通過代理訪問了下RPCServer而已)

1+2=3

技術分享圖片

PS:CentOS服務器不是你綁定個端口就一定能訪問的,如果不能記讓防火墻開放對應的端口

這個之前在說MariaDB環境的時候有詳細說:https://www.cnblogs.com/dotnetcrazy/p/9887708.html#_map4

# 添加 --permanent永久生效(沒有此參數重啟後失效)
firewall-cmd --zone=public --add-port=80/tcp --permanent

2.ZeroRPC案例:

zeroRPC用起來和這個差不多,也簡單舉個例子吧:

把服務的某個方法註冊到RPCServer中,供外部服務調用

import zerorpc

class Test(object):
    def say_hi(self, name):
        return f"Hi,My Name is{name}"


# 註冊一個Test的實例
server = zerorpc.Server(Test())
server.bind("tcp://0.0.0.0:50051")
server.run()

調用服務端代碼

import zerorpc

client = zerorpc.Client("tcp://192.168.36.123:50051")
result = client.say_hi("RPC")
print(result)

3.3.簡單版自定義RPC

看了上面的引入案例,是不是感覺RPC不過如此?NoNoNo,要是真這麽簡單也就談不上RPC架構了,上面兩個是最簡單的RPC服務了,可以這麽說:生產環境基本上用不到,只能當案例練習罷了,對Python來說,最常用的RPC就兩個gRPC and Thrift

PS:國產最出名的是Dubbo and Tars,Net最常用的是gRPCThriftSurging

1.RPC服務的流程

要自己實現一個RPC Server那麽就得了解整個流程了:

  1. Client(調用者)以本地調用的方式發起調用
  2. 通過RPC服務進行遠程過程調用(RPC的目標就是要把這些步驟都封裝起來,讓使用者感覺不到這個過程)
    1. 客戶端的RPC Proxy組件收到調用後,負責將被調用的方法名、參數等打包編碼成自定義的協議
    2. 客戶端的RPC Proxy組件在打包完成後通過網絡把數據包發送給RPC Server
    3. 服務端的RPC Proxy組件把通過網絡接收到的數據包按照相應格式進行拆包解碼,獲取方法名和參數
    4. 服務端的RPC Proxy組件根據方法名和參數進行本地調用
    5. RPC Server(被調用者)本地執行後將結果返回給服務端的RPC Proxy
    6. 服務端的RPC Proxy組件將返回值打包編碼成自定義的協議數據包,並通過網絡發送給客戶端的RPC Proxy組件
    7. 客戶端的RPC Proxy組件收到數據包後,進行拆包解碼,把數據返回給Client
  3. Client(調用者)得到本次RPC調用的返回結果

用一張時序圖來描述下整個過程:
技術分享圖片

PS:RPC Proxy有時候也叫Stub(存根):(Client Stub,Server Stub)

為屏蔽客戶調用遠程主機上的對象,必須提供某種方式來模擬本地對象,這種本地對象稱為存根(stub),存根負責接收本地方法調用,並將它們委派給各自的具體實現對象

PRC服務實現的過程中其實就兩核心點:

  1. 消息協議:客戶端調用的參數和服務端的返回值這些在網絡上傳輸的數據以何種方式打包編碼和拆包解碼
    • 經典代表:Protocol Buffers
  2. 傳輸控制:在網絡中數據的收發傳輸控制具體如何實現(TCP/UDP/HTTP

2.手寫RPC

下面我們就根據上面的流程來手寫一個簡單的RPC:

1.Client調用:

# client.py
from client_stub import ClientStub

def main():
    stub = ClientStub(("192.168.36.144", 50051))

    result = stub.get("sum", (1, 2))
    print(f"1+2={result}")

    result = stub.get("sum", (1.1, 2))
    print(f"1.1+2={result}")

    time_str = stub.get("get_time")
    print(time_str)

if __name__ == "__main__":
    main()

輸出:

1+2=3
1.1+2.2=3.1
Wed Jan 16 22

2.Client Stub,客戶端存根:(主要有打包解包、和RPC服務器通信的方法)

# client_stub.py
import socket

class ClientStub(object):
    def __init__(self, address):
        """address ==> (ip,port)"""
        self.socket = socket.socket()
        self.socket.connect(address)

    def convert(self, obj):
        """根據類型轉換成對應的類型編號"""
        if isinstance(obj, int):
            return 1
        if isinstance(obj, float):
            return 2
        if isinstance(obj, str):
            return 3

    def pack(self, func, args):
        """打包:把方法和參數拼接成自定義的協議
        格式:func:函數名@params:類型-參數,類型2-參數2...
        """
        result = f"func:{func}"
        if args:
            params = ""
            # params:類型-參數,類型2-參數2...
            for item in args:
                params += f"{self.convert(item)}-{item},"
            # 去除最後一個,
            result += f"@params:{params[:-1]}"
        # print(result)  # log 輸出
        return result.encode("utf-8")

    def unpack(self, data):
        """解包:獲取返回結果"""
        msg = data.decode("utf-8")
        # 格式應該是"data:xxxx"
        params = msg.split(":")
        if len(params) > 1:
            return params[1]
        return None

    def get(self, func, args=None):
        """1.客戶端的RPC Proxy組件收到調用後,負責將被調用的方法名、參數等打包編碼成自定義的協議"""
        data = self.pack(func, args)
        # 2.客戶端的RPC Proxy組件在打包完成後通過網絡把數據包發送給RPC Server
        self.socket.send(data)
        # 等待服務端返回結果
        data = self.socket.recv(2048)
        if data:
            return self.unpack(data)
        return None

簡要說明下:(我根據流程在Code裏面標註了,看起來應該很輕松)

之前有說到核心其實就是消息協議and傳輸控制,我客戶端存根的消息協議是自定義的格式(後面會說簡化方案):func:函數名@params:類型-參數,類型2-參數2...,傳輸我是基於TCP進行了簡單的封裝


3.Server端:(實現很簡單)

# server.py
import socket
from server_stub import ServerStub

class RPCServer(object):
    def __init__(self, address, mycode):
        self.mycode = mycode
        # 服務端存根(RPC Proxy)
        self.server_stub = ServerStub(mycode)
        # TCP Socket
        self.socket = socket.socket()
        # 端口復用
        self.socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
        # 綁定端口
        self.socket.bind(address)

    def run(self):
        self.socket.listen()
        while True:
            # 等待客戶端連接
            client_socket, client_addr = self.socket.accept()
            print(f"來自{client_addr}的請求:\n")
            # 交給服務端存根(Server Proxy)處理
            self.server_stub.handle(client_socket, client_addr)

if __name__ == "__main__":
    from server_code import MyCode
    server = RPCServer(('', 50051), MyCode())
    print("Server啟動ing,Port:50051")
    server.run()

為了簡潔,服務端代碼我單獨放在了server_code.py中:

# 5.RPC Server(被調用者)本地執行後將結果返回給服務端的RPC Proxy
class MyCode(object):
    def sum(self, a, b):
        return a + b

    def get_time(self):
        import time
        return time.ctime()

4.然後再看看重頭戲Server Stub:

# server_stub.py
import socket

class ServerStub(object):
    def __init__(self, mycode):
        self.mycode = mycode

    def convert(self, num, obj):
        """根據類型編號轉換類型"""
        if num == "1":
            obj = int(obj)
        if num == "2":
            obj = float(obj)
        if num == "3":
            obj = str(obj)
        return obj

    def unpack(self, data):
        """3.服務端的RPC Proxy組件把通過網絡接收到的數據包按照相應格式進行拆包解碼,獲取方法名和參數"""
        msg = data.decode("utf-8")
        # 格式應該是"格式:func:函數名@params:類型編號-參數,類型編號2-參數2..."
        array = msg.split("@")
        func = array[0].split(":")[1]
        if len(array) > 1:
            args = list()
            for item in array[1].split(":")[1].split(","):
                temps = item.split("-")
                # 類型轉換
                args.append(self.convert(temps[0], temps[1]))
            return (func, tuple(args))  # (func,args)
        return (func, )

    def pack(self, result):
        """打包:把方法和參數拼接成自定義的協議"""
        # 格式:"data:返回值"
        return f"data:{result}".encode("utf-8")

    def exec(self, func, args=None):
        """4.服務端的RPC Proxy組件根據方法名和參數進行本地調用"""
        # 如果沒有這個方法則返回None
        func = getattr(self.mycode, func, None)
        if args:
            return func(*args)  # 解包
        else:
            return func()  # 無參函數

    def handle(self, client_socket, client_addr):
        while True:
            # 獲取客戶端發送的數據包
            data = client_socket.recv(2048)
            if data:
                try:
                    data = self.unpack(data)  # 解包
                    if len(data) == 1:
                        data = self.exec(data[0])  # 執行無參函數
                    elif len(data) > 1:
                        data = self.exec(data[0], data[1])  # 執行帶參函數
                    else:
                        data = "RPC Server Error Code:500"
                except Exception as ex:
                    data = "RPC Server Function Error"
                    print(ex)
                # 6.服務端的RPC Proxy組件將返回值打包編碼成自定義的協議數據包,並通過網絡發送給客戶端的RPC Proxy組件
                data = self.pack(data)  # 把函數執行結果按指定協議打包
                # 把處理過的數據發送給客戶端
                client_socket.send(data)
            else:
                print(f"客戶端:{client_addr}已斷開\n")
                break

再簡要說明一下:裏面方法其實主要就是解包執行函數返回值打包

輸出圖示:
技術分享圖片

再貼一下上面的時序圖:
技術分享圖片

課外拓展:

HTTP1.0、HTTP1.1 和 HTTP2.0 的區別
https://www.cnblogs.com/heluan/p/8620312.html

簡述分布式RPC框架
https://blog.csdn.net/jamebing/article/details/79610994

分布式基礎—RPC
http://www.dataguru.cn/article-14244-1.html

下節預估:RPC服務進一步簡化與演變手寫一個簡單的REST接口

萬物互聯之~RPC專欄