從零寫分散式RPC框架 系列 1.0 (3)RPC-Server模組設計實現
RPC-Server模組負責(1)將@RpcService註解標記的服務和自身資訊註冊到ZK叢集,(2)對外提供RPC服務實現,處理來自RPC-Client的請求。該模組整體的核心類為 RpcServer ,而真正處理請求的核心類是 RpcServerHandler 。另外還有一個 ZKServiceRegistry 負責和 ZK叢集互動。
系列文章:
從零寫分散式RPC框架 系列 1.0 (1)架構設計
從零寫分散式RPC框架 系列 1.0 (2)RPC-Common模組設計實現
從零寫分散式RPC框架 系列 1.0 (3)RPC-Server模組設計實現
從零寫分散式RPC框架 系列 1.0 (4)RPC-Client模組設計實現
從零寫分散式RPC框架 系列 1.0 (5)整合測試
使用gpg外掛釋出jar包到Maven中央倉庫 完整實踐
文章目錄
一 介紹
1 整體結構
2 模組介紹
注意,因為最終是以 spring-boot-starter 的形式對外提供,所以我把過程命名為 spring-boot-autoconfigure 的格式,再用一個spring-boot-starter對其包裝。
整體結構如下
- @RpcService註解
用於標註 Rpc 服務實現類,其value為 Class<?> 型別,RpcSever類啟動的時候將掃描所有@RpcService標記類,並根據其value獲取其 Rpc實現。 - RpcServerHandler
Rpc服務端處理器,將嵌入Netty 分配的管道流中,並利用反射技術處理來自客戶端的RpcRequest生成結果封裝成RpcResponse返回給客戶端。 - RpcServerProperties和ZKProperties
這兩個類是屬性注入類,都使用了@ConfigurationProperties註解。 - ZKServiceRegistry
主要負責 連線ZK叢集 和 將服務資訊和自身服務地址註冊到ZK叢集 中。 - RpcServer
RPC-Server模組核心類,負責 管理@RpcService標記類 和 啟動RPC服務。 - RpcServerAutoConfiguration和spring.factories檔案
封裝成spring-boot-starter所需配置,開啟以上各類基於spring的自動裝配。
二 pom檔案
spring-boot-configuration-processor用於注入配置屬性
zkclient提供和ZK叢集互動的能力
netty-all結合rpc-netty-common中的元件即可提供Netty服務。
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<artifactId>rpc-netty-server-spring-boot-autoconfigure</artifactId>
<parent>
<groupId>com.github.linshenkx</groupId>
<artifactId>rpc-netty-spring-boot-starter</artifactId>
<version>1.0.5.RELEASE</version>
<relativePath>../</relativePath>
</parent>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-configuration-processor</artifactId>
</dependency>
<dependency>
<groupId>com.github.linshenkx</groupId>
<artifactId>rpc-netty-common</artifactId>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<optional>true</optional>
</dependency>
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-all</artifactId>
</dependency>
<dependency>
<groupId>com.101tec</groupId>
<artifactId>zkclient</artifactId>
</dependency>
</dependencies>
</project>
三 簡單元件:@RpcService和屬性注入類
@RpcService的實現比較簡單,需要注意的是 利用@Service 組合註解來將標記類收歸Spring管理,藉此在RpcServer可以方便實現掃描獲取。注意使用時其value應該是對應的服務介面類而不是當前被標記的服務實現類。因為服務介面類才代表契約,而本地服務實現類的命名等則不受限制。
/**
* @version V1.0
* @author: lin_shen
* @date: 2018/10/31
* @Description:
* RPC服務註解(標註在rpc服務實現類上)
* 使用@Service註解使被@RpcService標註的類都能被Spring管理
*/
@Target({ElementType.TYPE})
@Retention(RetentionPolicy.RUNTIME)
@Service
public @interface RpcService {
Class<?> value();
}
屬性注入類的實現比較簡單,在這裡可以給各個引數配置預設值。
注意這裡並沒有使用@Component將其收歸Spring管理,而是在需要使用對應屬性注入類的時候在類上使用@EnableConfigurationProperties(RpcServerProperties.class)和再用@Autowired將其引入。這樣可以由Spring確保其先於使用類例項化。
@Data
@ConfigurationProperties(prefix = "rpc.server")
public class RpcServerProperties {
private int port=9000;
}
@Data
@ConfigurationProperties(prefix = "zk")
public class ZKProperties {
private List<String> addressList = new ArrayList<>();
private int sessionTimeOut=5000;
private int connectTimeOut=1000;
private String registryPath="/defaultRegistry";
}
四 ZKServiceRegistry
init方法
init()方法將於ZKServiceRegistry構造完成後執行,將使用使用者提供的zk地址列表隨機挑選一地址進行連線。後續需進行改進,對於有多個地址的情況,不應該只嘗試一次,如果隨機選擇到的地址剛好由於網路問題無法及時連線,則會影響專案啟動,此時應該選擇其他地址進行嘗試。
register方法
register(String serviceName,String serviceAddress)方法將根據 (1)配置檔案的registryPath(預設為 /defaultRegistry)+(2)服務名ServiceName 在zk叢集生成 永久service節點,再在永久節點下生成 臨時address節點(格式為/address-遞增數字),其臨時節點內容為 serviceAddress。最終的節點路徑形式如 ·/defaultRegistry/com.github.linshenkx.rpclib.HelloService/address-0000000033
。在連線與ZK叢集斷開後,臨時節點會自動移除。
由此,當有多個RPC-Server提供同一Service的時候,將在同一永久service節點下生成包含各自地址資訊的臨時address節點。這樣,RPC-Client就可以提供查詢Service節點下的子節點,獲取能提供對應服務實現的RPC-Server列表,實現服務發現。
/**
* @version V1.0
* @author: lin_shen
* @date: 2018/10/31
* @Description: zookeeper服務註冊中心
*/
@Log4j2
@EnableConfigurationProperties(ZKProperties.class)
public class ZKServiceRegistry {
@Autowired
private ZKProperties zkProperties;
private ZkClient zkClient;
@PostConstruct
public void init() {
// 建立 ZooKeeper 客戶端
zkClient = new ZkClient(getAddress(zkProperties.getAddressList()), zkProperties.getSessionTimeOut(), zkProperties.getConnectTimeOut());
log.info("connect to zookeeper");
}
public String getAddress(List<String> addressList){
if(CollectionUtils.isEmpty(addressList)){
String defaultAddress="localhost:2181";
log.error("addressList is empty,using defaultAddress:"+defaultAddress);
return defaultAddress;
}
//待改進策略
String address= getRandomAddress(addressList);
log.info("using address:"+address);
return address;
}
private String getRandomAddress(List<String> addressList){
return addressList.get(ThreadLocalRandom.current().nextInt(addressList.size()));
}
/**
* 為服務端提供註冊
* 將服務地址註冊到對應服務名下
* 斷開連線後地址自動清除
* @param serviceName
* @param serviceAddress
*/
public void register(String serviceName, String serviceAddress) {
// 建立 registry 節點(持久)
String registryPath = zkProperties.getRegistryPath();
if (!zkClient.exists(registryPath)) {
zkClient.createPersistent(registryPath);
log.info("create registry node: {}", registryPath);
}
// 建立 service 節點(持久)
String servicePath = registryPath + "/" + serviceName;
if (!zkClient.exists(servicePath)) {
zkClient.createPersistent(servicePath);
log.info("create service node: {}", servicePath);
}
// 建立 address 節點(臨時)
String addressPath = servicePath + "/address-";
String addressNode = zkClient.createEphemeralSequential(addressPath, serviceAddress);
log.info("create address node: {}", addressNode);
}
}
五 RpcServer
setApplicationContext方法(服務掃描)
RpcServer實現 ApplicationContextAware 介面來獲取 ApplicationContext感知能力。並在ApplicationContextAware介面帶來的 setApplicationContext 方法中完成 將RpcService收歸管理 的任務。需要注意的是這個方法將在類初始化完成後執行。
前文已經介紹到 @RpcService 註解提供組合@Service註解將標記類收歸Spring管理,所以這裡可以利用 ApplicationContext 獲取 所有標記類。再以@Service的value的服務介面類的類名作為key,標記類(即服務實現類)為value存入handlerMap中,收歸RpcServer管理。
afterPropertiesSet方法(服務啟動、服務註冊)
RpcServer還實現了InitializingBean 介面來獲取使用 afterPropertiesSet 方法的能力,該方法將在類初始化完成後執行,但晚於setApplicationContext方法。故執行該方法時RpcServer已完成 服務掃描,已在handlerMap中管理著服務實現類。
afterPropertiesSet方法的主要任務有:
- 服務啟動:啟動RPC伺服器(提供Netty連線服務)
其核心實現由 RpcHandler 提供 - 服務註冊:將服務資訊和自身地址註冊到註冊中心(ZK叢集)
其核心實現由 RpcServiceRegistry 提供
注意啟動過程如果丟擲異常將執行優雅關閉。
/**
* @version V1.0
* @author: lin_shen
* @date: 2018/10/31
* @Description: TODO
*/
@Log4j2
@AutoConfigureAfter({ZKServiceRegistry.class})
@EnableConfigurationProperties(RpcServerProperties.class)
public class RpcServer implements ApplicationContextAware, InitializingBean {
private Map<String,Object> handlerMap=new HashMap<>();
@Autowired
private RpcServerProperties rpcProperties;
@Autowired
private ZKServiceRegistry rpcServiceRegistry;
/**
* 在類初始化時執行,將所有被@RpcService標記的類納入管理
* @param applicationContext
* @throws BeansException
*/
@Override
public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
//獲取帶有@RpcService註解的類
Map<String,Object> rpcServiceMap=applicationContext.getBeansWithAnnotation(RpcService.class);
//以@RpcService註解的value的類的類名為鍵將該標記類存入handlerMap
if(!CollectionUtils.isEmpty(rpcServiceMap)){
for(Object object:rpcServiceMap.values()){
RpcService rpcService=object.getClass().getAnnotation(RpcService.class);
String serviceName=rpcService.value().getName();
handlerMap.put(serviceName,object);
}
}
}
/**
* 在所有屬性值設定完成後執行,負責啟動RPC服務
* @throws Exception
*/
@Override
public void afterPropertiesSet() throws Exception {
//管理相關childGroup
EventLoopGroup bossGroup=new NioEventLoopGroup();
//處理相關RPC請求
EventLoopGroup childGroup=new NioEventLoopGroup();
try {
//啟動RPC服務
ServerBootstrap bootstrap=new ServerBootstrap();
bootstrap.group(bossGroup,childGroup);
bootstrap.channel(NioServerSocketChannel.class);
bootstrap.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel channel) throws Exception {
ChannelPipeline pipeline=channel.pipeline();
//解碼RPC請求
pipeline.addLast(new RpcDecoder(RpcRequest.class));
//編碼RPC請求
pipeline.addFirst(new RpcEncoder(RpcResponse.class));
//處理RPC請求
pipeline.addLast(new RpcServerHandler(handlerMap));
}
});
//同步啟動,RPC伺服器啟動完畢後才執行後續程式碼
ChannelFuture future=bootstrap.bind(rpcProperties.getPort()).sync();
log.info("server started,listening on {}",rpcProperties.getPort());
//註冊RPC服務地址
String serviceAddress= InetAddress.getLocalHost().getHostAddress()+":"+rpcProperties.getPort();
for(String interfaceName:handlerMap.keySet()){
rpcServiceRegistry.register(interfaceName,serviceAddress);
log.info("register service:{}=>{}",interfaceName,serviceAddress);
}
//釋放資源
future.channel().closeFuture().sync();
}catch (Exception e){
log.entry("server exception",e);
}finally {
//關閉RPC服務
childGroup.shutdownGracefully();
bossGroup.shutdownGracefully();
}
}
}
六 RpcServerHandler
該類是處理請求的核心類。該類繼承自 Netty 的 SimpleChannelInboundHandler,其傳入泛型為RpcRequest。即處理物件為 來自 RPC-Client 的 RpcRequest。該類主要通過覆寫 channelRead0 來對請求進行處理。
該類在構造時即獲取管理服務實現類的能力。(通過在構造方法中傳入handlerMap實現)
channelRead0 方法
該方法先建立一個響應物件RpcResponse,並將處理的RpcRequest的請求Id設定給它,以形成一一對應關係。再執行handle方法來獲取處理結果(或異常)並設定給RpcResponse,然後將結果返回(實際上是進入下一步,由下一個ChannelHandler繼續處理,在這個專案中即RpcEncoder)。
handler方法
核心中的核心。但本身並不複雜,使用動態代理技術執行目標方法得到結果而已。
首先根據RpcRequest的InterfaceName欄位獲取對應的服務實現類,再從RpcRequest中獲取反射呼叫所需的變數,如方法名、引數型別、引數列表等,最後執行反射呼叫即可。
目前使用的是jdk的動態代理,以後應該加上cglib才更完整。
/**
* @version V1.0
* @author: lin_shen
* @date: 2018/10/31
* @Description: RPC服務端處理器(處理RpcRequest)
*/
@Log4j2
public class RpcServerHandler extends SimpleChannelInboundHandler<RpcRequest> {
/**
* 存放 服務名稱 與 服務例項 之間的對映關係
*/
private final Map<String, Object> handlerMap;
public RpcServerHandler(Map<String, Object> handlerMap) {
this.handlerMap = handlerMap;
}
@Override
public void channelRead0(ChannelHandlerContext ctx, RpcRequest request) throws Exception {
log.info("channelRead0 begin");
// 建立 RPC 響應物件
RpcResponse response = new RpcResponse();
response.setRequestId(request.getRequestId());
try {
// 處理 RPC 請求成功
Object result = handle(request);
response.setResult(result);
} catch (Exception e) {
// 處理 RPC 請求失敗
response.setException(e);
log.error("handle result failure", e);
}
// 寫入 RPC 響應物件(寫入完畢後立即關閉與客戶端的連線)
ctx.writeAndFlush(response).addListener(ChannelFutureListener.CLOSE);
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
log.error("server caught exception", cause);
ctx.close();
}
private Object handle(RpcRequest request) throws Exception {
log.info("handle begin");
// 獲取服務例項
String serviceName = request.getInterfaceName();
Object serviceBean = handlerMap.get(serviceName);
if (serviceBean == null) {
throw new RuntimeException(String.format("can not find service bean by key: %s", serviceName));
}
// 獲取反射呼叫所需的變數
Class<?> serviceClass = serviceBean.getClass();
String methodName = request.getMethodName();
log.info(methodName);
Class<?>[] parameterTypes = request.getParameterTypes();
log.info(parameterTypes[0].getName());
Object[] parameters = request.getParameters();
// 執行反射呼叫
Method method = serviceClass.getMethod(methodName, parameterTypes);
method.setAccessible(true);
log.info(parameters[0].toString());
return method.invoke(serviceBean, parameters);
}
}
七 RpcServerAutoConfiguration 和 spring.factories
這兩個是封裝成spring-boot-starter所需的配置,因為spring-boot預設只會掃描啟動類同級目錄下的註解,對於外部依賴不會掃描,除非指定掃描,但這樣顯然不是我們的目的。所以需要使用這兩個檔案開啟基於spring的自動裝配。
1 spring.factories
在resources/META-INF下建立spring.factories檔案,指定自動裝配的類,書寫格式為
org.springframework.boot.autoconfigure.EnableAutoConfiguration=類名全稱A,類名全稱B
注意類名一定要全稱,多個類用逗號隔開,換行在末尾加 \
,而且通過這種方式裝配會有順序,順序與檔案中宣告一致。
通過這種方式實現自動注入在類多的時候基本不可行,因為可讀性太差了,而且裝配順序需要人工維護。
所以一般是在這裡裝配一個自動配置類,通過自動配置類再去注入其他類,並實現更高階功能。
# AutoConfiguration
org.springframework.boot.autoconfigure.EnableAutoConfiguration=\
com.github.linshenkx.rpcnettyserverspringbootautoconfigure.RpcServerAutoConfiguration
2 RpcServerAutoConfiguration
如下,可以利用 @ConditionalOnClass 避免錯誤裝配,通過 @ConditionalOnMissingBean 提供讓使用者注入實現的機會。也可以在這裡指定裝配順序。
/**
* @version V1.0
* @author: lin_shen
* @date: 2018/11/2
* @Description: TODO
*/
@Configuration
@ConditionalOnClass(RpcServer.class)
public class RpcServerAutoConfiguration {
@ConditionalOnMissingBean
@Bean