1. 程式人生 > 其它 >自己動手從0開始實現一個分散式RPC框架

自己動手從0開始實現一個分散式RPC框架

簡介:如果一個程式設計師能清楚的瞭解RPC框架所具備的要素,掌握RPC框架中涉及的服務註冊發現、負載均衡、序列化協議、RPC通訊協議、Socket通訊、非同步呼叫、熔斷降級等技術,可以全方位的提升基本素質。雖然也有相關原始碼,但是隻看原始碼容易眼高手低,動手寫一個才是自己真正掌握這門技術的最優路徑。

作者 | 麓行
來源 | 阿里技術公眾號

前言

為什麼要自己寫一個RPC框架,我覺得從個人成長上說,如果一個程式設計師能清楚的瞭解RPC框架所具備的要素,掌握RPC框架中涉及的服務註冊發現、負載均衡、序列化協議、RPC通訊協議、Socket通訊、非同步呼叫、熔斷降級等技術,可以全方位的提升基本素質。雖然也有相關原始碼,但是隻看原始碼容易眼高手低,動手寫一個才是自己真正掌握這門技術的最優路徑。

一 什麼是RPC

RPC(Remote Procedure Call)遠端過程呼叫,簡言之就是像呼叫本地方法一樣呼叫遠端服務。目前外界使用較多的有gRPC、Dubbo、Spring Cloud等。相信大家對RPC的概念都已經很熟悉了,這裡不做過多介紹。

二 分散式RPC框架要素

一款分散式RPC框架離不開三個基本要素:

  • 服務提供方 Serivce Provider
  • 服務消費方 Servce Consumer
  • 註冊中心 Registery

圍繞上面三個基本要素可以進一步擴充套件服務路由、負載均衡、服務熔斷降級、序列化協議、通訊協議等等。

1 註冊中心

主要是用來完成服務註冊和發現的工作。雖然服務呼叫是服務消費方直接發向服務提供方的,但是現在服務都是叢集部署,服務的提供者數量也是動態變化的,所以服務的地址也就無法預先確定。因此如何發現這些服務就需要一個統一註冊中心來承載。

2 服務提供方(RPC服務端)

其需要對外提供服務介面,它需要在應用啟動時連線註冊中心,將服務名及其服務元資料發往註冊中心。同時需要提供服務服務下線的機制。需要維護服務名和真正服務地址對映。服務端還需要啟動Socket服務監聽客戶端請求。

3 服務消費方(RPC客戶端)

客戶端需要有從註冊中心獲取服務的基本能力,它需要在應用啟動時,掃描依賴的RPC服務,併為其生成代理呼叫物件,同時從註冊中心拉取服務元資料存入本地快取,然後發起監聽各服務的變動做到及時更新快取。在發起服務呼叫時,通過代理呼叫物件,從本地快取中獲取服務地址列表,然後選擇一種負載均衡策略篩選出一個目標地址發起呼叫。呼叫時會對請求資料進行序列化,並採用一種約定的通訊協議進行socket通訊。

三 技術選型

1 註冊中心

目前成熟的註冊中心有Zookeeper,Nacos,Consul,Eureka,它們的主要比較如下:

本實現中支援了兩種註冊中心Nacos和Zookeeper,可根據配置進行切換。

2 IO通訊框架

本實現採用Netty作為底層通訊框架,Netty是一個高效能事件驅動型的非阻塞的IO(NIO)框架。

3 通訊協議

TCP通訊過程中會根據TCP緩衝區的實際情況進行包的劃分,所以在業務上認為一個完整的包可能會被TCP拆分成多個包進行傳送,也有可能把多個小的包封裝成一個大的資料包傳送,這就是所謂的TCP粘包和拆包問題。所以需要對傳送的資料包封裝到一種通訊協議裡。

業界的主流協議的解決方案可以歸納如下:

  1. 訊息定長,例如每個報文的大小為固定長度100位元組,如果不夠用空格補足。
  2. 在包尾特殊結束符進行分割。
  3. 將訊息分為訊息頭和訊息體,訊息頭中包含表示訊息總長度(或者訊息體長度)的欄位。

很明顯1,2都有些侷限性,本實現採用方案3,具體協議設計如下:

+--------+--------+--------+--------+--------+--------+--------+--------+--------+--------+--------+--------+--------+--------+--------+--------+ 
|  BYTE  |        |        |        |        |        |        |             ........ 
+--------------------------------------------+--------+-----------------+--------+--------+--------+--------+--------+--------+-----------------+ 
|  magic | version|  type  |           content lenth           |                   content byte[]                                        |        | 
+--------+-----------------------------------------------------------------------------------------+--------------------------------------------+
  • 第一個位元組是魔法數,比如我定義為0X35。
  • 第二個位元組代表協議版本號,以便對協議進行擴充套件,使用不同的協議解析器。
  • 第三個位元組是請求型別,如0代表請求1代表響應。
  • 第四個位元組表示訊息長度,即此四個位元組後面此長度的內容是訊息content。

4 序列化協議

本實現支援3種序列化協議,JavaSerializer、Protobuf及Hessian可以根據配置靈活選擇。建議選用Protobuf,其序列化後碼流小效能高,非常適合RPC呼叫,Google自家的gRPC也是用其作為通訊協議。

5 負載均衡

本實現支援兩種主要負載均衡策略,隨機和輪詢,其中他們都支援帶權重的隨機和輪詢,其實也就是四種策略。

四 整體架構

五 實現

專案總體結構:

1 服務註冊發現

Zookeeper

Zookeeper採用節點樹的資料模型,類似linux檔案系統,/,/node1,/node2 比較簡單。

Zookeeper節點型別是Zookeeper實現很多功能的核心原理,分為持久節點臨時節點、順序節點三種類型的節點。

我們採用的是對每個服務名建立一個持久節點,服務註冊時實際上就是在zookeeper中該持久節點下建立了一個臨時節點,該臨時節點儲存了服務的IP、埠、序列化方式等。

客戶端獲取服務時通過獲取持久節點下的臨時節點列表,解析服務地址資料:

客戶端監聽服務變化:

Nacos

Nacos是阿里開源的微服務管理中介軟體,用來完成服務之間的註冊發現和配置中心,相當於Spring Cloud的Eureka+Config。

不像Zookeeper需要利用提供的建立節點特性來實現註冊發現,Nacos專門提供了註冊發現功能,所以其使用更加方便簡單。主要關注NamingService介面提供的三個方法registerInstance、getAllInstances、subscribe;registerInstance用來完成服務端服務註冊,getAllInstances用來完成客戶端服務獲取,subscribe用來完成客戶端服務變動監聽,這裡就不多做介紹,具體可參照實現原始碼。

2 服務提供方 Serivce Provider

在自動配置類OrcRpcAutoConfiguration完成註冊中心和RPC啟動類(RpcBootStarter)的初始化:

服務端的啟動流程如下:

RPC啟動(RpcBootStarter):

上面監聽Spring容器初始化事件時注意,由於Spring包含多個容器,如web容器和核心容器,他們還有父子關係,為了避免重複執行註冊,只處理頂層的容器即可。

3 服務消費方 Servce Consumer

服務消費方需要在應用啟動完成前為依賴的服務建立好代理物件,這裡有很多種方法,常見的有兩種:

  • 一是在應用的Spring Context初始化完成事件時觸發,掃描所有的Bean,將Bean中帶有OrcRpcConsumer註解的field獲取到,然後建立field型別的代理物件,建立完成後,將代理物件set給此field。後續就通過該代理物件建立服務端連線,併發起呼叫。
  • 二是通過Spring的BeanFactoryPostProcessor,其可以對bean的定義BeanDefinition(配置元資料)進行處理;Spring IOC會在容器例項化任何其他bean之前執行BeanFactoryPostProcessor讀取BeanDefinition,可以修改這些BeanDefinition,也可以新增一些BeanDefinition。

本實現也採用第二種方式,處理流程如下:

BeanFactoryPostProcessor的主要實現:

    @Override
    public void postProcessBeanFactory(ConfigurableListableBeanFactory beanFactory)
        throws BeansException {
        this.beanFactory = beanFactory;
        postProcessRpcConsumerBeanFactory(beanFactory, (BeanDefinitionRegistry)beanFactory);
    }

    private void postProcessRpcConsumerBeanFactory(ConfigurableListableBeanFactory beanFactory, BeanDefinitionRegistry beanDefinitionRegistry) {
        String[] beanDefinitionNames = beanFactory.getBeanDefinitionNames();
        int len = beanDefinitionNames.length;
        for (int i = 0; i < len; i++) {
            String beanDefinitionName = beanDefinitionNames[i];
            BeanDefinition beanDefinition = beanFactory.getBeanDefinition(beanDefinitionName);
            String beanClassName = beanDefinition.getBeanClassName();
            if (beanClassName != null) {
                Class<?> clazz = ClassUtils.resolveClassName(beanClassName, classLoader);
                ReflectionUtils.doWithFields(clazz, new FieldCallback() {
                    @Override
                    public void doWith(Field field) throws IllegalArgumentException, IllegalAccessException {
                        parseField(field);
                    }
                });
            }

        }

        Iterator<Entry<String, BeanDefinition>> it = beanDefinitions.entrySet().iterator();
        while (it.hasNext()) {
            Entry<String, BeanDefinition> entry = it.next();
            if (context.containsBean(entry.getKey())) {
                throw new IllegalArgumentException("Spring context already has a bean named " + entry.getKey());
            }
            beanDefinitionRegistry.registerBeanDefinition(entry.getKey(), entry.getValue());
            log.info("register OrcRpcConsumerBean definition: {}", entry.getKey());
        }

    }

    private void parseField(Field field) {
        // 獲取所有OrcRpcConsumer註解
        OrcRpcConsumer orcRpcConsumer = field.getAnnotation(OrcRpcConsumer.class);
        if (orcRpcConsumer != null) {
            // 使用field的型別和OrcRpcConsumer註解一起生成BeanDefinition
            OrcRpcConsumerBeanDefinitionBuilder beanDefinitionBuilder = new OrcRpcConsumerBeanDefinitionBuilder(field.getType(), orcRpcConsumer);
            BeanDefinition beanDefinition = beanDefinitionBuilder.build();
            beanDefinitions.put(field.getName(), beanDefinition);
        }
    }

ProxyFactory的主要實現:

public class JdkProxyFactory implements ProxyFactory{

    @Override
    public Object getProxy(ServiceMetadata serviceMetadata) {
        return Proxy
            .newProxyInstance(serviceMetadata.getClazz().getClassLoader(), new Class[] {serviceMetadata.getClazz()},
                new ClientInvocationHandler(serviceMetadata));
    }

    private class ClientInvocationHandler implements InvocationHandler {

        private ServiceMetadata serviceMetadata;

        public ClientInvocationHandler(ServiceMetadata serviceMetadata) {
            this.serviceMetadata = serviceMetadata;
        }

        @Override
        public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
            String serviceId = ServiceUtils.getServiceId(serviceMetadata);
            // 通過負載均衡器選取一個服務提供方地址
            ServiceURL service = InvocationServiceSelector.select(serviceMetadata);

            OrcRpcRequest request = new OrcRpcRequest();
            request.setMethod(method.getName());
            request.setParameterTypes(method.getParameterTypes());
            request.setParameters(args);
            request.setRequestId(UUID.randomUUID().toString());
            request.setServiceId(serviceId);

            OrcRpcResponse response = InvocationClientContainer.getInvocationClient(service.getServerNet()).invoke(request, service);
            if (response.getStatus() == RpcStatusEnum.SUCCESS) {
                return response.getData();
            } else if (response.getException() != null) {
                throw new OrcRpcException(response.getException().getMessage());
            } else {
                throw new OrcRpcException(response.getStatus().name());
            }
        }
    }
}

本實現只使用JDK動態代理,也可以使用cglib或Javassist實現以獲得更好的效能,JdkProxyFactory中。

4 IO模組

UML圖如下:

結構比較清晰,分三大模組:客戶端呼叫適配模組、服務端請求響應適配模組和Netty IO服務模組。

客戶端呼叫適配模組

此模組比較簡單,主要是為客戶端呼叫時建立服務端接,並將連線存入快取,避免後續同服務呼叫重複建立連線,連線建立成功後發起呼叫。下面是DefaultInvocationClient的實現:

服務端請求響應適配模組

服務請求響應模組也比較簡單,是根據請求中的服務名,從快取中獲取服務元資料,然後從請求中獲取呼叫的方法和引數型別資訊,反射獲取呼叫方法資訊。然後從spring context中獲取bean進行反射呼叫。

Netty IO服務模組

Netty IO服務模組是核心,稍複雜一些,客戶端和服務端主要處理流程如下:

其中,重點是這四個類的實現:NettyNetClient、NettyNetServer、NettyClientChannelRequestHandler和NettyServerChannelRequestHandler,上面的UML圖和下面流程圖基本上講清楚了它們的關係和一次請求的處理流程,這裡就不再展開了。

下面重點講一下編碼解碼器。

在技術選型章節中,提及了採用的通訊協議,定義了私有的RPC協議:

+--------+--------+--------+--------+--------+--------+--------+--------+--------+--------+--------+--------+--------+--------+--------+--------+ 
|  BYTE  |        |        |        |        |        |        |             ........ 
+--------------------------------------------+--------+-----------------+--------+--------+--------+--------+--------+--------+-----------------+ 
|  magic | version|  type  |           content lenth           |                   content byte[]                                        |        | 
+--------+-----------------------------------------------------------------------------------------+--------------------------------------------+
  • 第一個位元組是魔法數定義為0X35。
  • 第二個位元組代表協議版本號。
  • 第三個位元組是請求型別,0代表請求1代表響應。
  • 第四個位元組表示訊息長度,即此四個位元組後面此長度的內容是訊息content。

編碼器的實現如下:

@Override
protected void encode(ChannelHandlerContext channelHandlerContext, ProtocolMsg protocolMsg, ByteBuf byteBuf)
    throws Exception {
    // 寫入協議頭
    byteBuf.writeByte(ProtocolConstant.MAGIC);
    // 寫入版本
    byteBuf.writeByte(ProtocolConstant.DEFAULT_VERSION);
    // 寫入請求型別
    byteBuf.writeByte(protocolMsg.getMsgType());
    // 寫入訊息長度
    byteBuf.writeInt(protocolMsg.getContent().length);
    // 寫入訊息內容
    byteBuf.writeBytes(protocolMsg.getContent());
}

解碼器的實現如下:

六 測試

在本人MacBook Pro 13寸,4核I5,16g記憶體,使用Nacos註冊中心,啟動一個伺服器,一個客戶端情況下,採用輪詢負載均衡策略的情況下,使用Apache ab測試。

在啟用8個執行緒發起10000個請求的情況下,可以做到 18秒完成所有請求,qps550:

在啟用100個執行緒發起10000個請求的情況下,可以做到 13.8秒完成所有請求,qps724:

七 總結

在實現這個RPC框架的過程中,我也重新學習了很多知識,比如通訊協議、IO框架等。也橫向學習了當前最熱的gRPC,藉此又看了很多相關的原始碼,收穫很大。後續我也會繼續維護升級這個框架,比如引入熔斷降級等機制,做到持續學習持續進步。

原文連結

本文為阿里雲原創內容,未經允許不得轉載。