1. 程式人生 > >一文帶你實現RPC框架

一文帶你實現RPC框架

想要獲取更多文章可以訪問我的部落格 - 程式碼無止境。

現在大部分的網際網路公司都會採用微服務架構,但具體實現微服務架構的方式有所不同,主流上分為兩種,一種是基於Http協議的遠端呼叫,另外一種是基於RPC方式的呼叫。兩種方式都有自己的代表框架,前者是著名的Spring Cloud,後者則是有阿里巴巴開源的Dubbo,二者都被廣泛的採用。今天這篇文章,我們就一起來了解一下RPC,並且和大家一起動手實現一個簡單的RPC框架的Demo。

什麼是RPC

RPC是一種遠端呼叫過程,是一種通過網路遠端呼叫其他服務的協議。通俗的說就是,A通過打電話的方式讓B幫忙辦一件事,B辦完事後將結果告知A。 我們下面通過一張圖來大概瞭解一下在一個完整的RPC框架中存在的角色以及整個遠端呼叫的過程。

通過上面的圖可以看出來,在RPC框架中主要有以下4個角色:

  • registry - 註冊中心,當服務提供者啟動時會向註冊中心註冊,然後註冊中心會告知所有的消費者有新的服務提供者。
  • provider - 服務提供者,遠端呼叫過程中的被消費方。
  • consumer - 服務消費者,遠端呼叫過程中的消費方。
  • monitor - 監視器,它主要負責統計服務的消費和呼叫情況。

啟動服務提供者後,服務提供者會以非同步的方式向註冊中心註冊。然後啟動服務消費者,它會訂閱註冊中心中服務提供者列表,當有服務提供者的資訊發生改變時,註冊中心會通知所有的消費者。當消費者發起遠端呼叫時,會通過動態代理將需要請求的引數以及方法簽名等資訊通過Netty傳送給服務提供者,服務提供者收到呼叫的資訊後呼叫對應的方法並將產生的結果返回給消費者,這樣就完成了一個完整的遠端呼叫。當然了這個過程中可能還會將呼叫資訊非同步傳送給monitor用於監控和統計。

閱讀過上面的內容後,你應該對RPC框架有了一個大概的認識。為了更好更深入的瞭解RPC框架的原理,下面我們就一起來動手實現一個簡單的RPC框架吧。

框架核心部分

首先我們要實現的是整個RPC框架的核心部分,這部分的主要包含以下內容:

  1. RPC服務的註解的實現。
  2. 服務提供者初始化、註冊、以及響應遠端呼叫的實現。
  3. 服務消費者訂閱註冊中心、監聽服務提供者的變化的實現。
  4. 動態代理的實現。

整個核心部分將以一個Spring Boot Starter的形式實現,這樣我們可以很方便的在Spring Boot專案中使用它。

註解

我們需要使用一個註解來標識服務提供者所提供服務的實現類,方便在初始化的時候將其交由Spring管理,也只有這樣我們才可以在遠端呼叫發生時可以找到它們。

@Target({ElementType.TYPE})
@Retention(RetentionPolicy.RUNTIME)
@Component
public @interface RpcService {

    Class<?> value();

}

value屬性用來標記這個服務的實現類對應的介面,RPC框架中服務提供者和消費者之間會共同引用一個服務介面的包,當我們需要遠端呼叫的時候實際上只需要呼叫介面中定義的方法即可。
除了一個標識服務實現類的註解之外,我們還需要一個標識服務消費者注入服務實現的註解@RpcConsumer,被其修飾的屬性在初始化的時候都會被我們設定上動態代理,這一點在後面會詳細講到,我們先來看下它的具體實現吧。

@Target({ElementType.FIELD})
@Retention(RetentionPolicy.RUNTIME)
@Component
public @interface RpcConsumer {

    /**
     * 服務名稱
     * @return
     */
    String providerName();

}

服務提供者

服務提供者啟動的時候,我們RPC框架需要做以下幾件事情:

  1. 掃描服務提供者中所有提供服務的類(被@RpcService修飾的類),並將其交由BeanFactory管理。
  2. 啟動Netty服務端,用來收到消費者的呼叫訊息,並且返回呼叫結果。
  3. 向註冊中心註冊,本例中使用的註冊中心是Zookeeper。

這部分我們定義了一個ProviderAutoConfiguration類來實現這幾個步驟,

@PostConstruct
public void  init() {
    logger.info("rpc server start scanning provider service...");
    Map<String, Object> beanMap = this.applicationContext.getBeansWithAnnotation(RpcService.class);
    if (null != beanMap && !beanMap.isEmpty()) {
        beanMap.entrySet().forEach(one -> {
            initProviderBean(one.getKey(), one.getValue());
        });
    }
    logger.info("rpc server scan over...");
    // 如果有服務的話才啟動netty server
    if (!beanMap.isEmpty()) {
        startNetty(rpcProperties.getPort());
    }
}

看上面的程式碼,首先我們獲取到了所有被@RpcService註解修飾的實體,並且呼叫了initProviderBean方法逐一對其處理,然後我們啟動了Netty。那麼我們需要在initProviderBean方法中做些什麼呢?其實很簡單,就是逐一將其交由BeanFactory管理。

private void initProviderBean(String beanName, Object bean) {
    RpcService rpcService = this.applicationContext
                .findAnnotationOnBean(beanName, RpcService.class);
    BeanFactory.addBean(rpcService.value(), bean);
}

將服務實現類交由Spring管理之後,我們還需要啟動Netty用來接收遠端呼叫資訊,啟動Netty的程式碼在這裡我就不全部粘出來了,大家可以在原始碼中檢視。在Netty啟動成功之後,其實我們還執行了下面的程式碼,用來向ZK註冊。

new RegistryServer(rpcProperties.getRegisterAddress(),
                    rpcProperties.getTimeout(), rpcProperties.getServerName(),
                    rpcProperties.getHost(), port)
                    .register();

整個註冊的過程也非常容易理解,首先是建立了一個ZK連線,然後是判斷是否有/rpc的根節點,如果沒有的話就建立一個,最後就是在根節點下建立一個EPHEMERAL_SEQUENTIAL型別的節點,這種型別的節點在ZK重啟之後會自動清除,這樣可以保證註冊中心重啟後會自動清除服務提供者的資訊。而在節點中會儲存服務提供者的名稱,IP地址以及埠號的資訊,這樣RPC框架就可以根據這些資訊順利的定位到服務提供者。

public void register() throws ZkConnectException {
    try {
        // 獲取zk連線
        ZooKeeper zooKeeper = new ZooKeeper(addr, timeout, event -> {
            logger.info("registry zk connect success...");
        });
        if (zooKeeper.exists(Constants.ZK_ROOT_DIR, false) == null) {
            zooKeeper.create(Constants.ZK_ROOT_DIR, Constants.ZK_ROOT_DIR.getBytes(),ZooDefs.Ids.OPEN_ACL_UNSAFE,
                    CreateMode.PERSISTENT);
        }
        zooKeeper.create(Constants.ZK_ROOT_DIR + "/" + serverName,
                (serverName + ","+ host + ":" + port).getBytes(),
                ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL);
        logger.info("provider register success {}", serverName);
    } catch (Exception e) {
        throw new ZkConnectException("register to zk exception," + e.getMessage(), e.getCaus());
    }
}

就這樣我們RPC框架與服務提供者相關的內容就完成了,接下來要完成的是服務消費者部分。

服務消費者

對於服務消費者,我們框架需要對它的處理就是,為所有的RPC服務(被@RpcConsumer修飾的屬性)設定上動態代理。具體的設定程式碼如下所示(PS:這段程式碼寫在ConsumerAutoConfiguration類中哦):

@Bean
public BeanPostProcessor beanPostProcessor() {
    return new BeanPostProcessor() {
        @Override
        public Object postProcessBeforeInitialization(Object bean, String beanName)
                throws BeansException {
            Class<?> objClz = bean.getClass();
            for (Field field : objClz.getDeclaredFields()) {
                RpcConsumer rpcConsumer = field.getAnnotation(RpcConsumer.class);
                if (null != rpcConsumer) {
                    Class<?> type = field.getType();
                    field.setAccessible(true);
                    try {
                        field.set(bean, rpcProxy.create(type, rpcConsumer.providerName()));
                    } catch (IllegalAccessException e) {
                        e.printStackTrace();
                    } finally {
                        field.setAccessible(false);
                    }
                }
            }
            return bean;
        }
    };
}

BeanPostProcessor也稱為Bean後置處理器,它是Spring中定義的介面,在Spring容器的建立過程中(具體為Bean初始化前後)會回撥BeanPostProcessor中定義的兩個方法。上面實現的postProcessBeforeInitialization是在Bean初始化之前呼叫的,還有一個postProcessAfterInitialization方法是在Bean初始化之後呼叫的。
如上面程式碼所示,我們會在每一個帶有@RpcConsumer的例項初始化之前利用反射機制為其設定一個RpcProxy的代理,可以看到我們在建立這個動態代理的時候還需要服務提供者的名稱,這是因為在動態代理的實現裡面需要使用服務提供者的名稱來查詢服務提供者的地址資訊。那麼這個動態代理的實現又是怎樣的呢?這就是我們下一步需要做的事情。

動態代理

在這個RPC框架裡面動態代理主要實現的內容就是,當服務消費者呼叫服務提供者提供的介面時,將呼叫資訊通過Netty傳送給對應的服務呼叫者,然後由服務提供者完成相關的處理並且將處理結果返回給服務消費者。下面我們就一起來看一下RpcProxy的是如何實現這部分功能的。

@Component
public class RpcProxy {

    @Autowired
    private ServiceDiscovery serviceDiscovery;

    public <T> T create(Class<?> interfaceClass, String providerName) {
        return (T) Proxy.newProxyInstance(interfaceClass.getClassLoader(), new Class<?>[]{interfaceClass},
                (proxy, method, args) -> {
            // 通過netty向Rpc服務傳送請求。
            // 構建一個請求。
            RpcRequest request = new RpcRequest();
            request.setRequestId(UUID.randomUUID().toString())
                    .setClassName(method.getDeclaringClass().getName())
                    .setMethodName(method.getName())
                    .setParamTypes(method.getParameterTypes())
                    .setParams(args);
            // 獲取一個服務提供者。
            ProviderInfo providerInfo = serviceDiscovery.discover(providerName);
            // 解析服務提供者的地址資訊,陣列第一個元素為ip地址,第二個元素為埠號。
           String[] addrInfo = providerInfo.getAddr().split(":");
            String host = addrInfo[0];
            int port = Integer.parseInt(addrInfo[1]);
            RpcClient rpcClient = new RpcClient(host, port);
            // 使用Netty向服務提供者傳送呼叫訊息,並接收請求結果。
            RpcResponse response = rpcClient.send(request);
            if (response.isError()) {
                throw response.getError();
            } else {
                return response.getResult();
            }
        });
    }
}

其實在代理裡面首先我們會構造請求資訊實體,然後會根據服務提供者的名稱獲取一個服務提供者的地址,最後再將請求資訊傳送給服務提供者並接收呼叫結果。獲取服務提供者的方法會在後面消費者和提供者的通用配置裡面講解。我們在這裡重點來看一下發送呼叫資訊並接收呼叫結果的實現。

public class RpcClient extends SimpleChannelInboundHandler<RpcResponse> {
    
    ... 此處省略物件屬性資訊,可檢視原始碼。

    public RpcResponse send(RpcRequest request){
        EventLoopGroup workerGroup = new NioEventLoopGroup();
        try {
            ... 此處省略Netty相關配置,可檢視原始碼。
            // 連線伺服器
            ChannelFuture channelFuture = bootstrap.connect(host, port).sync();
            channelFuture.channel().writeAndFlush(request).sync();
            future = new CompletableFuture<>();
            future.get();
            if (response != null) {
                // 關閉netty連線。
                channelFuture.channel().closeFuture().sync();
            }
            return response;
        } catch (Exception e) {
            logger.error("client send msg error,", e);
            return null;
        } finally {
            workerGroup.shutdownGracefully();
        }
    }

    @Override
    protected void channelRead0(ChannelHandlerContext channelHandlerContext,
                                RpcResponse rpcResponse) throws Exception {
        logger.info("client get request result,{}", rpcResponse);
        this.response = rpcResponse;
        future.complete("");
    }
}

通過上面的程式碼可以看出向服務提供者傳送訊息是非同步的,我們通過CompletableFutureget()方法阻塞當前執行緒,直到接收到呼叫結果(PS:我們在channelRead0方法中收到返回結果後會將其設定成完成狀態)。看到這裡,你可能會問服務提供者收到呼叫請求資訊後如何處理的呢?具體的處理邏輯我們寫在了ServerHandler這個類中,可以看出在channelRead0方法收到一條呼叫資訊之後,呼叫handle方法來處理具體的呼叫過程,在handle方法中會使用反射機制找到所呼叫方法的具體實現,然後執行呼叫過程並獲取結果,最後再使用Netty將結果返回給消費者服務。

public class ServerHandler extends SimpleChannelInboundHandler<RpcRequest> {

    @Override
    protected void channelRead0(ChannelHandlerContext channelHandlerContext,
                                RpcRequest request) throws Exception {
        logger.info("provider accept request,{}", request);
        // 返回的物件。
        RpcResponse rpcResponse = new RpcResponse();
        // 將請求id原路帶回
        rpcResponse.setRequestId(request.getRequestId());
        try {
            Object result = handle(request);
            rpcResponse.setResult(result);
        } catch (Exception e) {
            rpcResponse.setError(e);
        }
        channelHandlerContext.writeAndFlush(rpcResponse).addListener(ChannelFutureListener.CLOSE);
    }

    private Object handle(RpcRequest request) throws Exception {
        String className = request.getClassName();
        Class<?> objClz = Class.forName(className);
        Object o = BeanFactory.getBean(objClz);
        // 獲取呼叫的方法名稱。
        String methodName = request.getMethodName();
        // 引數型別
        Class<?>[] paramsTypes = request.getParamTypes();
        // 具體引數。
        Object[] params = request.getParams();
        // 呼叫實現類的指定的方法並返回結果。
        Method method = objClz.getMethod(methodName, paramsTypes);
        Object res = method.invoke(o, params);
        return res;
    }
}

消費者和提供者的通用配置

除了ProviderAutoConfigurationConsumerAutoConfiguration兩個配置類,我們還定義了一個RpcAutoConfiguration類來配置一些其他的東西,如下所示。

public class RpcAutoConfiguration {
    ...

    @Bean
    @ConditionalOnMissingBean
    public ServiceDiscovery serviceDiscovery() {
        ServiceDiscovery serviceDiscovery =
                null;
        try {
            serviceDiscovery = new ServiceDiscovery(rpcProperties.getRegisterAddress());
        } catch (ZkConnectException e) {
            logger.error("zk connect failed:", e);
        }
        return serviceDiscovery;
    }

    @Bean
    @ConditionalOnMissingBean
    public RpcProxy rpcProxy() {
        RpcProxy rpcProxy = new RpcProxy();
        rpcProxy.setServiceDiscovery(serviceDiscovery());
        return rpcProxy;
    }
}

在這個配置類裡面,主要初始化了一個ServiceDiscovery的物件以及一個RpcProxy的物件。其中RpcProxy是動態代理,在上面我們已經詳細瞭解過了。那麼這裡就來著重瞭解一下ServiceDiscovery是幹啥的吧。
大家還記得我們在文章開始的時候貼出來的那張圖片嗎?在服務消費者初始化的時候會去訂閱服務提供者內容的變化,ServiceDiscovery的主要功能就是這個,其主要程式碼如下所示(如果你需要完整的程式碼,可以檢視本文原始碼)。

public class ServiceDiscovery {

    // 儲存服務提供者的資訊。
    private volatile List<ProviderInfo> dataList = new ArrayList<>();

    public ServiceDiscovery(String registoryAddress) throws ZkConnectException {
        try {
            // 獲取zk連線。
            ZooKeeper zooKeeper = new ZooKeeper(registoryAddress, 2000, new Watcher() {
                @Override
                public void process(WatchedEvent event) {
                    logger.info("consumer connect zk success!");
                }
            });
            watchNode(zooKeeper);
        } catch (Exception e) {
            throw new ZkConnectException("connect to zk exception," + e.getMessage(), e.getCause());
        }
    }

    /**
     * 監聽服務提供者的變化
     */
    public void watchNode(final ZooKeeper zk) {
        ...
    }

    /**
     * 獲取一個服務提供者
     */
    public ProviderInfo discover(String providerName) {
        ....
    }
}

在這個類的構造方法裡面,我們和ZK註冊中心建立了一個連線,並且在watchNode方法中監聽服務提供者節點的變化,當有服務提供者資訊有變化時會去修改dataList裡的內容,這樣可以保證在服務本地維持一份可用的服務提供者的資訊。而在遠端呼叫發生的時候我們會通過discover方法(PS:前面有見到過哦)去dataList裡面尋找一個可用的服務提供者來提供服務。

Starter的配置

我們還需要在resources目錄下新建一個META-INF目錄,然後在該目錄下新建一個spring.factories檔案,裡面的內容如下面程式碼所示。它主要是用來指定在Spring Boot專案啟動的時候需要載入的其他配置。如果你有不明白的地方可以查詢一下Spring Boot自定義Stater的相關內容。

org.springframework.boot.autoconfigure.EnableAutoConfiguration=\
cn.itweknow.sbrpccorestarter.config.RpcAutoConfiguration,\
cn.itweknow.sbrpccorestarter.config.ProviderAutoConfiguration,\
cn.itweknow.sbrpccorestarter.config.ConsumerAutoConfiguration

到這一步我們框架的核心部分就完成了,它將會以一個Spring Boot Stater的形式提供給服務提供者和服務消費者使用,接下來我們就將分別定義一個服務提供者和一個消費者來測試我們自己實現的RPC框架。

建立服務提供者

在建立服務提供者之前,我們需要新建一個與服務消費者之間共享的服務介面。因為前面提到過,在服務消費者眼裡的遠端呼叫實際上就是呼叫本地的介面方法而已。在這個專案裡我們就建立了一個HelloRpcService.java的介面,如下所示:

public interface HelloRpcService {
    String sayHello();
}

在介面定義完成之後,我們就來建立我們的服務提供者,並且實現上面定義的HelloRpcService介面。在服務提供者服務裡還需要依賴RPC框架的核心Starter以及服務介面包,我們需要在pom.xml中新增下面的依賴。

<dependency>
    <groupId>cn.itweknow</groupId>
    <artifactId>sb-rpc-core-starter</artifactId>
    <version>0.0.1-SNAPSHOT</version>
</dependency>

<dependency>
    <groupId>cn.itweknow</groupId>
    <artifactId>sb-rpc-api</artifactId>
    <version>0.0.1-SNAPSHOT</version>
</dependency>

新增完依賴後,我們就來看下HelloRpcService的具體實現吧:

@RpcService(HelloRpcService.class)
public class HelloRpcServiceImpl implements HelloRpcService {
    
    @Override
    public String sayHello() {
        return "Hello RPC!";
    }
}

其實現很簡單,主要是要需要在實現類上加上@RpcService註解,這樣在專案啟動的時候RPC框架才會掃描到它,並將其交給BeanFactory管理。接下來還需要配置的是一些RPC框架需要的配置項,包括服務名稱,ZK的地址以及Netty啟動的埠等資訊。這些資訊在框架是通過RpcProperties這個配置類來讀取的,有興趣的同學可以在原始碼中找到它。

spring.rpc.host=localhost
# netty服務的埠號
spring.rpc.port=21810
# zk地址
spring.rpc.register-address=localhost:2181
spring.rpc.server-name=provider
# 連線zk的超時時間
spring.rpc.timeout=2000

建立服務消費者

服務消費者同樣也需要RPC核心框架的Starter以及服務介面的依賴,和RPC框架的一些基礎配置項,和服務提供者類似,這裡就不粘出來了。這裡需要說明的一點是,為了方便測試,服務消費者是一個Web服務,所以它還添加了spring-boot-starter-web的依賴。下面我們就一起來看下服務消費者是如何呼叫遠端服務的吧。

@RestController
@RequestMapping("/hello-rpc")
public class HelloRpcController {


    @RpcConsumer(providerName = "provider")
    private HelloRpcService helloRpcService;

    @GetMapping("/hello")
    public String hello() {
        return helloRpcService.sayHello();
    }
}

我們在消費者服務中寫了一個hello的介面,在接口裡面呼叫了HelloRpcService接口裡的sayHello()方法,看過前面內容的同學應該知道,被@RpcConsumer修飾的helloRpcService屬性在初始化的時候會為其設定一個動態代理,當我們呼叫這個接口裡面的方法時,會通過Netty向服務提供者傳送呼叫資訊,然後由服務提供者呼叫相應方法並返回結果。
到這一步,我們可以說完成了一個簡單的RPC框架以及其使用,下面我們就一起來驗證一下結果吧。

測試

在測試之前我們需要在自己本地電腦上安裝Zookeeper,具體的安裝方式非常簡單。可以參考這篇文章。
安裝好Zookeeper後,我們需要完成以下幾個步驟:

  1. 啟動Zookeeper。
  2. 啟動服務提供者。
  3. 啟動服務消費者。

第一次啟動服務消費者的過程中,你的控制檯可以能會報一個找不到/rpc節點的錯誤,產生這個錯誤的原因是我們在第一次啟動的時候ZK裡面並不存在/rpc這個節點,但是如果你仔細研究原始碼的話,會發現當這個節點不存在的時候,我們會建立一個。所以直接忽略這個異常即可。完成以上幾步之後,我們只需要在瀏覽器中訪問http://127.0.0.1:8080/hello-rpc/hello,如果你看到了下面的結果,那麼恭喜你,整個RPC框架完美的執行成功了。

結束語

本文的主要內容是和大家一起完成了一個Demo版的RPC框架,其主要目的是讓大家更深刻的理解RPC的原理以及其呼叫過程。當然由於文章篇幅的原因,很多程式碼沒有直接在文中給出,您可以在Github上找到完整的實現。如果您有什麼問題可以在Github上提交Issue或者傳送郵件到我的郵箱([email protected]),如果您覺得這篇文章寫的還行的話,希望您能給我個Star,這是對我最好的鼓勵。

PS:學習不止,碼不停蹄!如果您喜歡我的文章,就關注我吧!