1. 程式人生 > >從零寫分散式RPC框架 系列 1.0 (3)RPC-Server模組設計實現

從零寫分散式RPC框架 系列 1.0 (3)RPC-Server模組設計實現

RPC-Server模組負責(1)將@RpcService註解標記的服務和自身資訊註冊到ZK叢集,(2)對外提供RPC服務實現,處理來自RPC-Client的請求。該模組整體的核心類為 RpcServer ,而真正處理請求的核心類是 RpcServerHandler 。另外還有一個 ZKServiceRegistry 負責和 ZK叢集互動。

系列文章:

從零寫分散式RPC框架 系列 1.0 (1)架構設計
從零寫分散式RPC框架 系列 1.0 (2)RPC-Common模組設計實現
從零寫分散式RPC框架 系列 1.0 (3)RPC-Server模組設計實現
從零寫分散式RPC框架 系列 1.0 (4)RPC-Client模組設計實現


從零寫分散式RPC框架 系列 1.0 (5)整合測試
使用gpg外掛釋出jar包到Maven中央倉庫 完整實踐

文章目錄

一 介紹

1 整體結構

專案結構

2 模組介紹

注意,因為最終是以 spring-boot-starter 的形式對外提供,所以我把過程命名為 spring-boot-autoconfigure 的格式,再用一個spring-boot-starter對其包裝。
整體結構如下

  1. @RpcService註解
    用於標註 Rpc 服務實現類,其value為 Class<?> 型別,RpcSever類啟動的時候將掃描所有@RpcService標記類,並根據其value獲取其 Rpc實現。
  2. RpcServerHandler
    Rpc服務端處理器,將嵌入Netty 分配的管道流中,並利用反射技術處理來自客戶端的RpcRequest生成結果封裝成RpcResponse返回給客戶端。
  3. RpcServerProperties和ZKProperties
    這兩個類是屬性注入類,都使用了@ConfigurationProperties註解。
  4. ZKServiceRegistry
    主要負責 連線ZK叢集 和 將服務資訊和自身服務地址註冊到ZK叢集 中。
  5. RpcServer
    RPC-Server模組核心類,負責 管理@RpcService標記類 和 啟動RPC服務。
  6. RpcServerAutoConfiguration和spring.factories檔案
    封裝成spring-boot-starter所需配置,開啟以上各類基於spring的自動裝配。

二 pom檔案

spring-boot-configuration-processor用於注入配置屬性
zkclient提供和ZK叢集互動的能力
netty-all結合rpc-netty-common中的元件即可提供Netty服務。

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <artifactId>rpc-netty-server-spring-boot-autoconfigure</artifactId>

    <parent>
        <groupId>com.github.linshenkx</groupId>
        <artifactId>rpc-netty-spring-boot-starter</artifactId>
        <version>1.0.5.RELEASE</version>
        <relativePath>../</relativePath>
    </parent>

    <dependencies>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-configuration-processor</artifactId>
        </dependency>
        <dependency>
            <groupId>com.github.linshenkx</groupId>
            <artifactId>rpc-netty-common</artifactId>
        </dependency>
        <dependency>
            <groupId>org.projectlombok</groupId>
            <artifactId>lombok</artifactId>
            <optional>true</optional>
        </dependency>
        <dependency>
            <groupId>io.netty</groupId>
            <artifactId>netty-all</artifactId>
        </dependency>
        <dependency>
            <groupId>com.101tec</groupId>
            <artifactId>zkclient</artifactId>
        </dependency>
    </dependencies>

</project>

三 簡單元件:@RpcService和屬性注入類

@RpcService的實現比較簡單,需要注意的是 利用@Service 組合註解來將標記類收歸Spring管理,藉此在RpcServer可以方便實現掃描獲取。注意使用時其value應該是對應的服務介面類而不是當前被標記的服務實現類。因為服務介面類才代表契約,而本地服務實現類的命名等則不受限制。

/**
 * @version V1.0
 * @author: lin_shen
 * @date: 2018/10/31
 * @Description:
 * RPC服務註解(標註在rpc服務實現類上)
 * 使用@Service註解使被@RpcService標註的類都能被Spring管理
 */
@Target({ElementType.TYPE})
@Retention(RetentionPolicy.RUNTIME)
@Service
public @interface RpcService {
    Class<?> value();
}

屬性注入類的實現比較簡單,在這裡可以給各個引數配置預設值。
注意這裡並沒有使用@Component將其收歸Spring管理,而是在需要使用對應屬性注入類的時候在類上使用@EnableConfigurationProperties(RpcServerProperties.class)和再用@Autowired將其引入。這樣可以由Spring確保其先於使用類例項化。

@Data
@ConfigurationProperties(prefix = "rpc.server")
public class RpcServerProperties {
    private int port=9000;
}

@Data
@ConfigurationProperties(prefix = "zk")
public class ZKProperties {
    private List<String> addressList = new ArrayList<>();
    private int sessionTimeOut=5000;
    private int connectTimeOut=1000;
    private String registryPath="/defaultRegistry";

}

四 ZKServiceRegistry

init方法

init()方法將於ZKServiceRegistry構造完成後執行,將使用使用者提供的zk地址列表隨機挑選一地址進行連線。後續需進行改進,對於有多個地址的情況,不應該只嘗試一次,如果隨機選擇到的地址剛好由於網路問題無法及時連線,則會影響專案啟動,此時應該選擇其他地址進行嘗試。

register方法

register(String serviceName,String serviceAddress)方法將根據 (1)配置檔案的registryPath(預設為 /defaultRegistry)+(2)服務名ServiceName 在zk叢集生成 永久service節點,再在永久節點下生成 臨時address節點(格式為/address-遞增數字),其臨時節點內容為 serviceAddress。最終的節點路徑形式如 ·/defaultRegistry/com.github.linshenkx.rpclib.HelloService/address-0000000033。在連線與ZK叢集斷開後,臨時節點會自動移除。

由此,當有多個RPC-Server提供同一Service的時候,將在同一永久service節點下生成包含各自地址資訊的臨時address節點。這樣,RPC-Client就可以提供查詢Service節點下的子節點,獲取能提供對應服務實現的RPC-Server列表,實現服務發現。

/**
 * @version V1.0
 * @author: lin_shen
 * @date: 2018/10/31
 * @Description: zookeeper服務註冊中心
 */
@Log4j2
@EnableConfigurationProperties(ZKProperties.class)
public class ZKServiceRegistry {


  @Autowired
  private ZKProperties zkProperties;

  private ZkClient zkClient;

  @PostConstruct
  public void init() {
    // 建立 ZooKeeper 客戶端
    zkClient = new ZkClient(getAddress(zkProperties.getAddressList()), zkProperties.getSessionTimeOut(), zkProperties.getConnectTimeOut());
    log.info("connect to zookeeper");
  }

  public String getAddress(List<String> addressList){
    if(CollectionUtils.isEmpty(addressList)){
      String defaultAddress="localhost:2181";
      log.error("addressList is empty,using defaultAddress:"+defaultAddress);
      return defaultAddress;
    }
    //待改進策略
    String address= getRandomAddress(addressList);
    log.info("using address:"+address);
    return address;
  }

  private String getRandomAddress(List<String> addressList){
    return addressList.get(ThreadLocalRandom.current().nextInt(addressList.size()));
  }

  /**
   * 為服務端提供註冊
   * 將服務地址註冊到對應服務名下
   * 斷開連線後地址自動清除
   * @param serviceName
   * @param serviceAddress
   */
  public void register(String serviceName, String serviceAddress) {
    // 建立 registry 節點(持久)
    String registryPath = zkProperties.getRegistryPath();
    if (!zkClient.exists(registryPath)) {
      zkClient.createPersistent(registryPath);
      log.info("create registry node: {}", registryPath);
    }
    // 建立 service 節點(持久)
    String servicePath = registryPath + "/" + serviceName;
    if (!zkClient.exists(servicePath)) {
      zkClient.createPersistent(servicePath);
      log.info("create service node: {}", servicePath);
    }
    // 建立 address 節點(臨時)
    String addressPath = servicePath + "/address-";
    String addressNode = zkClient.createEphemeralSequential(addressPath, serviceAddress);
    log.info("create address node: {}", addressNode);
  }

}

五 RpcServer

setApplicationContext方法(服務掃描)

RpcServer實現 ApplicationContextAware 介面來獲取 ApplicationContext感知能力。並在ApplicationContextAware介面帶來的 setApplicationContext 方法中完成 將RpcService收歸管理 的任務。需要注意的是這個方法將在類初始化完成後執行。

前文已經介紹到 @RpcService 註解提供組合@Service註解將標記類收歸Spring管理,所以這裡可以利用 ApplicationContext 獲取 所有標記類。再以@Service的value的服務介面類的類名作為key,標記類(即服務實現類)為value存入handlerMap中,收歸RpcServer管理。

afterPropertiesSet方法(服務啟動、服務註冊)

RpcServer還實現了InitializingBean 介面來獲取使用 afterPropertiesSet 方法的能力,該方法將在類初始化完成後執行,但晚於setApplicationContext方法。故執行該方法時RpcServer已完成 服務掃描,已在handlerMap中管理著服務實現類。

afterPropertiesSet方法的主要任務有:

  1. 服務啟動:啟動RPC伺服器(提供Netty連線服務)
    其核心實現由 RpcHandler 提供
  2. 服務註冊:將服務資訊和自身地址註冊到註冊中心(ZK叢集)
    其核心實現由 RpcServiceRegistry 提供

注意啟動過程如果丟擲異常將執行優雅關閉。

/**
 * @version V1.0
 * @author: lin_shen
 * @date: 2018/10/31
 * @Description: TODO
 */
@Log4j2
@AutoConfigureAfter({ZKServiceRegistry.class})
@EnableConfigurationProperties(RpcServerProperties.class)
public class RpcServer implements ApplicationContextAware, InitializingBean {

    private Map<String,Object> handlerMap=new HashMap<>();

    @Autowired
    private RpcServerProperties rpcProperties;

    @Autowired
    private ZKServiceRegistry rpcServiceRegistry;

    /**
     * 在類初始化時執行,將所有被@RpcService標記的類納入管理
     * @param applicationContext
     * @throws BeansException
     */
    @Override
    public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {

        //獲取帶有@RpcService註解的類
        Map<String,Object> rpcServiceMap=applicationContext.getBeansWithAnnotation(RpcService.class);
        //以@RpcService註解的value的類的類名為鍵將該標記類存入handlerMap
        if(!CollectionUtils.isEmpty(rpcServiceMap)){
            for(Object object:rpcServiceMap.values()){
                RpcService rpcService=object.getClass().getAnnotation(RpcService.class);
                String serviceName=rpcService.value().getName();
                handlerMap.put(serviceName,object);
            }
        }

    }


    /**
     * 在所有屬性值設定完成後執行,負責啟動RPC服務
     * @throws Exception
     */
    @Override
    public void afterPropertiesSet() throws Exception {
        //管理相關childGroup
        EventLoopGroup bossGroup=new NioEventLoopGroup();
        //處理相關RPC請求
        EventLoopGroup childGroup=new NioEventLoopGroup();

        try {
            //啟動RPC服務
            ServerBootstrap bootstrap=new ServerBootstrap();
            bootstrap.group(bossGroup,childGroup);
            bootstrap.channel(NioServerSocketChannel.class);
            bootstrap.childHandler(new ChannelInitializer<SocketChannel>() {
                @Override
                protected void initChannel(SocketChannel channel) throws Exception {
                    ChannelPipeline pipeline=channel.pipeline();
                    //解碼RPC請求
                    pipeline.addLast(new RpcDecoder(RpcRequest.class));
                    //編碼RPC請求
                    pipeline.addFirst(new RpcEncoder(RpcResponse.class));
                    //處理RPC請求
                    pipeline.addLast(new RpcServerHandler(handlerMap));
                }
            });
            //同步啟動,RPC伺服器啟動完畢後才執行後續程式碼
            ChannelFuture future=bootstrap.bind(rpcProperties.getPort()).sync();
            log.info("server started,listening on {}",rpcProperties.getPort());
            //註冊RPC服務地址
            String serviceAddress= InetAddress.getLocalHost().getHostAddress()+":"+rpcProperties.getPort();
            for(String interfaceName:handlerMap.keySet()){
                rpcServiceRegistry.register(interfaceName,serviceAddress);
                log.info("register service:{}=>{}",interfaceName,serviceAddress);
            }
            //釋放資源
            future.channel().closeFuture().sync();
        }catch (Exception e){
            log.entry("server exception",e);
        }finally {
            //關閉RPC服務
            childGroup.shutdownGracefully();
            bossGroup.shutdownGracefully();
        }

    }
}

六 RpcServerHandler

該類是處理請求的核心類。該類繼承自 Netty 的 SimpleChannelInboundHandler,其傳入泛型為RpcRequest。即處理物件為 來自 RPC-Client 的 RpcRequest。該類主要通過覆寫 channelRead0 來對請求進行處理。

該類在構造時即獲取管理服務實現類的能力。(通過在構造方法中傳入handlerMap實現)

channelRead0 方法

該方法先建立一個響應物件RpcResponse,並將處理的RpcRequest的請求Id設定給它,以形成一一對應關係。再執行handle方法來獲取處理結果(或異常)並設定給RpcResponse,然後將結果返回(實際上是進入下一步,由下一個ChannelHandler繼續處理,在這個專案中即RpcEncoder)。

handler方法

核心中的核心。但本身並不複雜,使用動態代理技術執行目標方法得到結果而已。
首先根據RpcRequest的InterfaceName欄位獲取對應的服務實現類,再從RpcRequest中獲取反射呼叫所需的變數,如方法名、引數型別、引數列表等,最後執行反射呼叫即可。
目前使用的是jdk的動態代理,以後應該加上cglib才更完整。

/**
 * @version V1.0
 * @author: lin_shen
 * @date: 2018/10/31
 * @Description: RPC服務端處理器(處理RpcRequest)
 */
@Log4j2
public class RpcServerHandler extends SimpleChannelInboundHandler<RpcRequest> {


  /**
   * 存放 服務名稱 與 服務例項 之間的對映關係
   */
  private final Map<String, Object> handlerMap;

  public RpcServerHandler(Map<String, Object> handlerMap) {
    this.handlerMap = handlerMap;
  }

  @Override
  public void channelRead0(ChannelHandlerContext ctx, RpcRequest request) throws Exception {
    log.info("channelRead0 begin");
    // 建立 RPC 響應物件
    RpcResponse response = new RpcResponse();
    response.setRequestId(request.getRequestId());
    try {
      // 處理 RPC 請求成功
      Object result = handle(request);
      response.setResult(result);
    } catch (Exception e) {
      // 處理 RPC 請求失敗
      response.setException(e);
      log.error("handle result failure", e);
    }
    // 寫入 RPC 響應物件(寫入完畢後立即關閉與客戶端的連線)
    ctx.writeAndFlush(response).addListener(ChannelFutureListener.CLOSE);
  }

  @Override
  public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
    log.error("server caught exception", cause);
    ctx.close();
  }

  private Object handle(RpcRequest request) throws Exception {
    log.info("handle begin");
    // 獲取服務例項
    String serviceName = request.getInterfaceName();
    Object serviceBean = handlerMap.get(serviceName);
    if (serviceBean == null) {
      throw new RuntimeException(String.format("can not find service bean by key: %s", serviceName));
    }
    // 獲取反射呼叫所需的變數
    Class<?> serviceClass = serviceBean.getClass();
    String methodName = request.getMethodName();
    log.info(methodName);
    Class<?>[] parameterTypes = request.getParameterTypes();
    log.info(parameterTypes[0].getName());
    Object[] parameters = request.getParameters();
    // 執行反射呼叫
    Method method = serviceClass.getMethod(methodName, parameterTypes);
    method.setAccessible(true);
    log.info(parameters[0].toString());
    return method.invoke(serviceBean, parameters);
  }
}

七 RpcServerAutoConfiguration 和 spring.factories

這兩個是封裝成spring-boot-starter所需的配置,因為spring-boot預設只會掃描啟動類同級目錄下的註解,對於外部依賴不會掃描,除非指定掃描,但這樣顯然不是我們的目的。所以需要使用這兩個檔案開啟基於spring的自動裝配。

1 spring.factories

在resources/META-INF下建立spring.factories檔案,指定自動裝配的類,書寫格式為
org.springframework.boot.autoconfigure.EnableAutoConfiguration=類名全稱A,類名全稱B
注意類名一定要全稱,多個類用逗號隔開,換行在末尾加 \ ,而且通過這種方式裝配會有順序,順序與檔案中宣告一致。
通過這種方式實現自動注入在類多的時候基本不可行,因為可讀性太差了,而且裝配順序需要人工維護。
所以一般是在這裡裝配一個自動配置類,通過自動配置類再去注入其他類,並實現更高階功能。

# AutoConfiguration
org.springframework.boot.autoconfigure.EnableAutoConfiguration=\
com.github.linshenkx.rpcnettyserverspringbootautoconfigure.RpcServerAutoConfiguration

2 RpcServerAutoConfiguration

如下,可以利用 @ConditionalOnClass 避免錯誤裝配,通過 @ConditionalOnMissingBean 提供讓使用者注入實現的機會。也可以在這裡指定裝配順序。

/**
 * @version V1.0
 * @author: lin_shen
 * @date: 2018/11/2
 * @Description: TODO
 */
@Configuration
@ConditionalOnClass(RpcServer.class)
public class RpcServerAutoConfiguration {
    @ConditionalOnMissingBean
    @Bean