從零寫分散式RPC框架 系列 2.0 (3)RPC-Server和RPC-Client模組改造
2.0版本RPC-Server改動不大,主要變化在於RPC-Client使用了服務地址快取,並引入監控機制,第一時間獲取zk叢集中服務地址資訊變化並重新整理本地快取。另外,RPC-Client還使用了RpcClientProperties開放對負載均衡策略和序列化策略的選擇。
系列文章:
專欄:從零開始寫分散式RPC框架
專案GitHub地址:https://github.com/linshenkx/rpc-netty-spring-boot-starter
手寫通用型別負載均衡路由引擎(含隨機、輪詢、雜湊等及其帶權形式)
實現 序列化引擎(支援 JDK預設、Hessian、Json、Protostuff、Xml、Avro、ProtocolBuffer、Thrift等序列化方式)
從零寫分散式RPC框架 系列 2.0 (1)架構升級
從零寫分散式RPC框架 系列 2.0 (2)RPC-Common模組設計實現
從零寫分散式RPC框架 系列 2.0 (3)RPC-Server和RPC-Client模組改造
從零寫分散式RPC框架 系列 2.0 (4)使用BeanPostProcessor實現自定義@RpcReference註解注入
文章目錄
RPC-Server
1 結構圖
注意,RpcService註解移動到了RPC-Common模組下,另外新加了ServiceInfo代表將存到註冊中心的服務資訊(也在RPC-Common模組下),其他的除了RpcServer基本沒有變化
2 RpcService註解
主要是多了weight和workerThreads,分別代表權重和最大工作執行緒數。
/**
* @version V1.0
* @author: lin_shen
* @date: 2018/10/31
* @Description:
* RPC服務註解(標註在rpc服務實現類上)
* 使用@Service註解使被@RpcService標註的類都能被Spring管理
*/
@Target({ElementType.TYPE})
@Retention(RetentionPolicy.RUNTIME)
@Service
public @interface RpcService {
Class<?> value();
int weight() default 1;
int workerThreads() default 10;
}
3 ServiceInfo
/**
* @version V1.0
* @author: lin_shen
* @date: 18-11-13
* @Description: 服務資訊,用於儲存到註冊中心
*/
@Data
@AllArgsConstructor
@NoArgsConstructor
public class ServiceInfo implements WeightGetAble {
private String host;
private int port;
/**
* 權重資訊
*/
private int weight;
/**
* 最大工作執行緒數
*/
private int workerThreads;
public ServiceInfo (ServiceInfo serviceInfo){
this.host = serviceInfo.host;
this.port = serviceInfo.port;
this.weight = serviceInfo.weight;
this.workerThreads = serviceInfo.workerThreads;
}
@Override
public int getWeightFactors() {
return getWeight();
}
}
4 RpcServer
RpcServer主要是多了對 serviceSemaphoreMap 和 serviceRpcServiceMap的管理。其中serviceSemaphoreMap 將作為引數傳入RpcServerHandler提供限流資訊,而serviceRpcServiceMap將註冊到ZK叢集。
/**
* @version V1.0
* @author: lin_shen
* @date: 2018/10/31
* @Description: TODO
*/
@Log4j2
@AutoConfigureAfter({ZKServiceRegistry.class})
@EnableConfigurationProperties(RpcServerProperties.class)
public class RpcServer implements ApplicationContextAware, InitializingBean {
/**
* 存放 服務名稱 與 服務例項 之間的對映關係
*/
private Map<String,Object> handlerMap=new HashMap<>();
/**
* 存放 服務名稱 與 訊號量 之間的對映關係
* 用於限制每個服務的工作執行緒數
*/
private Map<String, Semaphore> serviceSemaphoreMap=new HashMap<>();
/**
* 存放 服務名稱 與 服務資訊 之間的對映關係
*/
private Map<String, RpcService> serviceRpcServiceMap=new HashMap<>();
@Autowired
private RpcServerProperties rpcProperties;
@Autowired
private ZKServiceRegistry rpcServiceRegistry;
/**
* 在類初始化時執行,將所有被@RpcService標記的類納入管理
* @param applicationContext
* @throws BeansException
*/
@Override
public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
//獲取帶有@RpcService註解的類
Map<String,Object> rpcServiceMap=applicationContext.getBeansWithAnnotation(RpcService.class);
//以@RpcService註解的value的類的類名為鍵將該標記類存入handlerMap和serviceSemaphoreMap
if(!CollectionUtils.isEmpty(rpcServiceMap)){
for(Object object:rpcServiceMap.values()){
RpcService rpcService=object.getClass().getAnnotation(RpcService.class);
String serviceName=rpcService.value().getName();
handlerMap.put(serviceName,object);
serviceSemaphoreMap.put(serviceName,new Semaphore(rpcService.workerThreads()));
serviceRpcServiceMap.put(serviceName,rpcService);
}
}
}
/**
* 在所有屬性值設定完成後執行,負責啟動RPC服務
* @throws Exception
*/
@Override
public void afterPropertiesSet() throws Exception {
//管理相關childGroup
EventLoopGroup bossGroup=new NioEventLoopGroup();
//處理相關RPC請求
EventLoopGroup childGroup=new NioEventLoopGroup();
try {
//啟動RPC服務
ServerBootstrap bootstrap=new ServerBootstrap();
bootstrap.group(bossGroup,childGroup);
bootstrap.channel(NioServerSocketChannel.class);
bootstrap.option(ChannelOption.SO_BACKLOG,1024)
.childOption(ChannelOption.TCP_NODELAY,true)
.handler(new LoggingHandler(LogLevel.INFO));
bootstrap.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel channel) throws Exception {
ChannelPipeline pipeline=channel.pipeline();
//解碼RPC請求
pipeline.addLast(new RemotingTransporterDecoder());
//編碼RPC請求
pipeline.addFirst(new RemotingTransporterEncoder());
//處理RPC請求
pipeline.addLast(new RpcServerHandler(handlerMap,serviceSemaphoreMap));
}
});
//同步啟動,RPC伺服器啟動完畢後才執行後續程式碼
ChannelFuture future=bootstrap.bind(rpcProperties.getPort()).sync();
log.info("server started,listening on {}",rpcProperties.getPort());
//啟動後註冊服務
registry();
//釋放資源
future.channel().closeFuture().sync();
}catch (Exception e){
log.entry("server exception",e);
}finally {
//關閉RPC服務
childGroup.shutdownGracefully();
bossGroup.shutdownGracefully();
}
}
private void registry() throws UnknownHostException {
//註冊RPC服務地址
String hostAddress=InetAddress.getLocalHost().getHostAddress();
int port=rpcProperties.getPort();
for(String interfaceName:handlerMap.keySet()){
ServiceInfo serviceInfo=
new ServiceInfo(hostAddress,port,serviceRpcServiceMap.get(interfaceName).weight(),serviceRpcServiceMap.get(interfaceName).workerThreads());
String serviceInfoString= JSON.toJSONString(serviceInfo);
rpcServiceRegistry.register(interfaceName,serviceInfoString);
log.info("register service:{}=>{}",interfaceName,serviceInfoString);
}
}
}
RPC-Client
1 結構圖
新增RpcClientProperties提供配置屬性讀入(路由策略和序列化方式),ZKServiceDiscovery增加ConcurrentMap<String,List> servicePathsMap來管理服務地址列表。RpcClient相應作出調整。
2 RpcClientProperties
注意這裡屬性用的是列舉型別而不是字串,另外預設路由策略是隨機,預設序列化策略是json
@Data
@ConfigurationProperties(prefix = "rpc.client")
public class RpcClientProperties {
private RouteStrategyEnum routeStrategy= RouteStrategyEnum.Random;
private SerializeTypeEnum serializeType=SerializeTypeEnum.JSON;
}
3 ZKServiceDiscovery
這裡使用了IZkChildListener 來對目標路徑下子節點變化進行監控,如果發生變化(新增或刪減)則重新執行discover方法拉取最新服務地址列表。
zkChildListenerMap的作用是管理服務和對應的服務地址列表監聽器,避免重複註冊監聽器。
/**
* @version V1.0
* @author: lin_shen
* @date: 2018/10/31
* @Description: zookeeper服務註冊中心
*/
@Component
@Log4j2
@EnableConfigurationProperties(ZKProperties.class)
public class ZKServiceDiscovery {
@Autowired
private ZKProperties zkProperties;
/**
* 服務名和服務地址列表的Map
*/
private ConcurrentMap<String,List<String>> servicePathsMap=new ConcurrentHashMap<>();
/**
* 服務監聽器 Map,監聽子節點服務資訊
*/
private ConcurrentMap<String, IZkChildListener> zkChildListenerMap=new ConcurrentHashMap<>();
private ZkClient zkClient;
@PostConstruct
public void init() {
// 建立 ZooKeeper 客戶端
zkClient = new ZkClient(zkProperties.getAddress(), zkProperties.getSessionTimeOut(), zkProperties.getConnectTimeOut());
log.info("connect to zookeeper");
}
/**
*
* 根據服務名獲取服務地址並保持監控
* @param serviceName
* @return
*/
public void discover(String serviceName){
log.info("discovering:"+serviceName);
String servicePath=zkProperties.getRegistryPath()+"/"+serviceName;
//找不到對應服務
if(!zkClient.exists(servicePath)){
throw new RuntimeException("can not find any service node on path: "+servicePath);
}
//獲取服務地址列表
List<String> addressList=zkClient.getChildren(servicePath);
if(CollectionUtils.isEmpty(addressList)){
throw new RuntimeException("can not find any address node on path: "+servicePath);
}
//儲存地址列表
List<String> paths=new ArrayList<>(addressList.size());
for(String address:addressList){
paths.add(zkClient.readData(servicePath+"/"+address));
}
servicePathsMap.put(serviceName,paths);
//保持監控
if(!zkChildListenerMap.containsKey(serviceName)){
IZkChildListener iZkChildListener= (parentPath, currentChilds) -> {
//當子節點列表變化時重新discover
discover(serviceName);
log.info("子節點列表發生變化 ");
};
zkClient.subscribeChildChanges(servicePath, iZkChildListener);
zkChildListenerMap.put(serviceName,iZkChildListener);
}
}
public List<String> getAddressList(String serviceName){
List<String> addressList=servicePathsMap.get(serviceName);
if(addressList==null||addressList.isEmpty()){
discover(serviceName);
return servicePathsMap.get(serviceName);
}
return addressList;
}
}
4 RpcClient
主要是配合RemotingTransporter做了調整和升級,整體變化不大。
另外一個要注意的就是 ConcurrentMap<String,RouteStrategy> serviceRouteStrategyMap,用於在使用輪詢策略時,為不同的服務呼叫保管對應的輪詢器(輪詢器內部儲存index記錄,是有狀態的)。
@Log4j2
@Component
@AutoConfigureAfter(ZKServiceDiscovery.class)
@EnableConfigurationProperties(RpcClientProperties.class)
public class RpcClient {
@Autowired
private ZKServiceDiscovery zkServiceDiscovery;
@Autowired
private RpcClientProperties rpcClientProperties;
/**
* 維持服務的 輪詢 路由狀態
* 不同服務狀態不同(服務列表也不同)
* 非輪詢無需維持狀態
*/
private ConcurrentMap<String,RouteStrategy> serviceRouteStrategyMap=new ConcurrentHashMap<>();
/**
* 存放請求編號與響應物件的對映關係
*/
private ConcurrentMap<Long, RemotingTransporter> remotingTransporterMap=new ConcurrentHashMap<>();
@SuppressWarnings("unchecked")
public <T> T create(final Class<?> interfaceClass){
//建立動態代理物件
return (T) Proxy.newProxyInstance(interfaceClass.getClassLoader(),
new Class<?>[]{interfaceClass},
(proxy, method, args) -> {
//建立RPC請求物件
RpcRequest rpcRequest=new RpcRequest();
rpcRequest.setInterfaceName(method.getDeclaringClass().getName());
rpcRequest.setMethodName(method.getName());
rpcRequest.setParameterTypes(method.getParameterTypes());
rpcRequest.setParameters(args);
//獲取RPC服務資訊列表
String serviceName=interfaceClass.getName();
List<String> addressList=zkServiceDiscovery.getAddressList(serviceName);
List<ServiceInfo> serviceInfoList=new ArrayList<>(addressList.size());
for(String serviceInfoString:addressList){
serviceInfoList.add(JSON.parseObject(serviceInfoString,ServiceInfo.class));
}
//根據配置檔案獲取路由策略
log.info("使用負載均衡策略:"+rpcClientProperties.getRouteStrategy());
log.info("使用序列化策略:"+rpcClientProperties.getSerializeType());
RouteStrategy routeStrategy ;
//如果使用輪詢,則需要儲存狀態(按服務名儲存)
if(RouteStrategyEnum.Polling==rpcClientProperties.getRouteStrategy()){
routeStrategy=serviceRouteStrategyMap.getOrDefault(serviceName,RouteEngine.queryClusterStrategy(RouteStrategyEnum.Polling));
serviceRouteStrategyMap.put(serviceName,routeStrategy);
}else {
routeStrategy= RouteEngine.queryClusterStrategy(rpcClientProperties.getRouteStrategy());
}
//根據路由策略選取服務提供方
ServiceInfo serviceInfo = routeStrategy.select(serviceInfoList);
RemotingTransporter remotingTransporter=new RemotingTransporter();
//設定flag為請求,雙路,非ping,非其他,序列化方式為 配置檔案中SerializeTypeEnum對應的code
remotingTransporter.setFlag(new RemotingTransporter.Flag(true,true,false,false,rpcClientProperties.getSerializeType().getCode()));
remotingTransporter.setBodyContent(rpcRequest);
log.info("get serviceInfo:"+serviceInfo);
//從RPC服務地址中解析主機名與埠號
//傳送RPC請求
RpcResponse rpcResponse=send(remotingTransporter,serviceInfo.getHost(),serviceInfo.getPort());
//獲取響應結果
if(rpcResponse==null){
log.error("send request failure",new IllegalStateException("response is null"));
return null;
}
if(rpcResponse.getException()!=null){
log.error("response has exception",rpcResponse.getException());
return null;
}
return rpcResponse.getResult();
}
);
}
private RpcResponse send(RemotingTransporter remotingTransporter,String host,int port){
log.info("sen