輕量級分散式 RPC 框架 netty+protostuff+zk +Spring
RPC,即 Remote Procedure Call(遠端過程呼叫),說得通俗一點就是:呼叫遠端計算機上的服務,就像呼叫本地服務一樣。
RPC 可基於 HTTP 或 TCP 協議,Web Service 就是基於 HTTP 協議的 RPC,它具有良好的跨平臺性,但其效能卻不如基於 TCP 協議的 RPC。會兩方面會直接影響 RPC 的效能,一是傳輸方式,二是序列化。
眾所周知,TCP 是傳輸層協議,HTTP 是應用層協議,而傳輸層較應用層更加底層,在資料傳輸方面,越底層越快,因此,在一般情況下,TCP 一定比 HTTP 快。就序列化而言,Java 提供了預設的序列化方式,但在高併發的情況下,這種方式將會帶來一些效能上的瓶頸,於是市面上出現了一系列優秀的序列化框架,比如:Protobuf、Kryo、Hessian、Jackson 等,它們可以取代 Java 預設的序列化,從而提供更高效的效能。
為了支援高併發,傳統的阻塞式 IO 顯然不太合適,因此我們需要非同步的 IO,即 NIO。Java 提供了 NIO 的解決方案,Java 7 也提供了更優秀的 NIO.2 支援,用 Java 實現 NIO 並不是遙不可及的事情,只是需要我們熟悉 NIO 的技術細節。
我們需要將服務部署在分散式環境下的不同節點上,通過服務註冊的方式,讓客戶端來自動發現當前可用的服務,並呼叫這些服務。這需要一種服務登錄檔(Service Registry)的元件,讓它來註冊分散式環境下所有的服務地址(包括:主機名與埠號)。
應用、服務、服務登錄檔之間的關係見下圖:
每臺 Server 上可釋出多個 Service,這些 Service 共用一個 host 與 port,在分散式環境下會提供 Server 共同對外提供 Service。此外,為防止 Service Registry 出現單點故障,因此需要將其搭建為叢集環境。
本文將為您揭曉開發輕量級分散式 RPC 框架的具體過程,該框架基於 TCP 協議,提供了 NIO 特性,提供高效的序列化方式,同時也具備服務註冊與發現的能力。
根據以上技術需求,我們可使用如下技術選型:
- Spring:它是最強大的依賴注入框架,也是業界的權威標準。
- Netty:它使 NIO 程式設計更加容易,遮蔽了 Java 底層的 NIO 細節。
- Protostuff:它基於 Protobuf 序列化框架,面向 POJO,無需編寫 .proto 檔案。
- ZooKeeper:提供服務註冊與發現功能,開發分散式系統的必備選擇,同時它也具備天生的叢集能力。
相關 Maven 依賴請見附錄。
第一步:編寫服務介面
<!-- lang: java -->
public interface HelloService {
String hello(String name);
}
將該介面放在獨立的客戶端 jar 包中,以供應用使用。
第二步:編寫服務介面的實現類
<!-- lang: java -->
@RpcService(HelloService.class) // 指定遠端介面
publicclassHelloServiceImplimplementsHelloService {
@Override
public String hello(String name) {
return"Hello! " + name;
}
}
使用RpcService
註解定義在服務介面的實現類上,需要對該實現類指定遠端介面,因為實現類可能會實現多個介面,一定要告訴框架哪個才是遠端介面。
RpcService
程式碼如下:
<!-- lang: java -->
@Target({ElementType.TYPE})
@Retention(RetentionPolicy.RUNTIME)
@Component // 表明可被 Spring 掃描
public @interface RpcService {
Class<?> value();
}
該註解具備 Spring 的Component
註解的特性,可被 Spring 掃描。
該實現類放在服務端 jar 包中,該 jar 包還提供了一些服務端的配置檔案與啟動服務的載入程式。
第三步:配置服務端
服務端 Spring 配置檔名為spring.xml
,內容如下:
<!-- lang: xml -->
<beans...>
<context:component-scanbase-package="com.xxx.rpc.sample.server"/>
<context:property-placeholderlocation="classpath:config.properties"/>
<!-- 配置服務註冊元件 -->
<beanid="serviceRegistry"class="com.xxx.rpc.registry.ServiceRegistry">
<constructor-argname="registryAddress"value="${registry.address}"/>
</bean>
<!-- 配置 RPC 伺服器 -->
<beanid="rpcServer"class="com.xxx.rpc.server.RpcServer">
<constructor-argname="serverAddress"value="${server.address}"/>
<constructor-argname="serviceRegistry"ref="serviceRegistry"/>
</bean>
</beans>
具體的配置引數在config.properties
檔案中,內容如下:
<!-- lang: java -->
# ZooKeeper 伺服器
registry.address=127.0.0.1:2181
# RPC 伺服器
server.address=127.0.0.1:8000
以上配置表明:連線本地的 ZooKeeper 伺服器,並在 8000 埠上釋出 RPC 服務。
第四步:啟動伺服器併發布服務
為了載入 Spring 配置檔案來發布服務,只需編寫一個載入程式即可:
<!-- lang: java -->
public class RpcBootstrap {
publicstaticvoidmain(String[] args) {
newClassPathXmlApplicationContext("spring.xml");
}
}
執行RpcBootstrap
類的main
方法即可啟動服務端,但還有兩個重要的元件尚未實現,它們分別是:ServiceRegistry
與RpcServer
,下文會給出具體實現細節。
第五步:實現服務註冊
使用 ZooKeeper 客戶端可輕鬆實現服務註冊功能,ServiceRegistry
程式碼如下:
<!-- lang: java -->
public class ServiceRegistry {
privatestaticfinal Logger LOGGER = LoggerFactory.getLogger(ServiceRegistry.class);
private CountDownLatch latch = new CountDownLatch(1);
private String registryAddress;
publicServiceRegistry(String registryAddress){
this.registryAddress = registryAddress;
}
publicvoidregister(String data){
if (data != null) {
ZooKeeper zk = connectServer();
if (zk != null) {
createNode(zk, data);
}
}
}
privateZooKeeper connectServer(){
ZooKeeper zk = null;
try {
zk = new ZooKeeper(registryAddress, Constant.ZK_SESSION_TIMEOUT, new Watcher() {
@Override
publicvoidprocess(WatchedEvent event){
if (event.getState() == Event.KeeperState.SyncConnected) {
latch.countDown();
}
}
});
latch.await();
} catch (IOException | InterruptedException e) {
LOGGER.error("", e);
}
return zk;
}
privatevoidcreateNode(ZooKeeper zk, String data){
try {
byte[] bytes = data.getBytes();
String path = zk.create(Constant.ZK_DATA_PATH, bytes, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL);
LOGGER.debug("create zookeeper node ({} => {})", path, data);
} catch (KeeperException | InterruptedException e) {
LOGGER.error("", e);
}
}
}
其中,通過Constant
配置了所有的常量:
<!-- lang: java -->
public interface Constant {
intZK_SESSION_TIMEOUT = 5000;
StringZK_REGISTRY_PATH = "/registry";
StringZK_DATA_PATH = ZK_REGISTRY_PATH + "/data";
}
注意:首先需要使用 ZooKeeper 客戶端命令列建立/registry
永久節點,用於存放所有的服務臨時節點。
第六步:實現 RPC 伺服器
使用 Netty 可實現一個支援 NIO 的 RPC 伺服器,需要使用ServiceRegistry
註冊服務地址,RpcServer
程式碼如下:
<!-- lang: java -->
publicclassRpcServerimplementsApplicationContextAware, InitializingBean {
privatestaticfinal Logger LOGGER = LoggerFactory.getLogger(RpcServer.class);
private String serverAddress;
private ServiceRegistry serviceRegistry;
private Map<String, Object> handlerMap = new HashMap<>(); // 存放介面名與服務物件之間的對映關係
public RpcServer(String serverAddress) {
this.serverAddress = serverAddress;
}
public RpcServer(String serverAddress, ServiceRegistry serviceRegistry) {
this.serverAddress = serverAddress;
this.serviceRegistry = serviceRegistry;
}
@Override
publicvoid setApplicationContext(ApplicationContext ctx) throws BeansException {
Map<String, Object> serviceBeanMap = ctx.getBeansWithAnnotation(RpcService.class); // 獲取所有帶有 RpcService 註解的 Spring Bean
if (MapUtils.isNotEmpty(serviceBeanMap)) {
for (Object serviceBean : serviceBeanMap.values()) {
String interfaceName = serviceBean.getClass().getAnnotation(RpcService.class).value().getName();
handlerMap.put(interfaceName, serviceBean);
}
}
}
@Override
publicvoid afterPropertiesSet() throws Exception {
EventLoopGroup bossGroup = new NioEventLoopGroup();
EventLoopGroup workerGroup = new NioEventLoopGroup();
try {
ServerBootstrap bootstrap = new ServerBootstrap();
bootstrap.group(bossGroup, workerGroup).channel(NioServerSocketChannel.class)
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
publicvoid initChannel(SocketChannel channel) throws Exception {
channel.pipeline()
.addLast(new RpcDecoder(RpcRequest.class)) // 將 RPC 請求進行解碼(為了處理請求)
.addLast(new RpcEncoder(RpcResponse.class)) // 將 RPC 響應進行編碼(為了返回響應)
.addLast(new RpcHandler(handlerMap)); // 處理 RPC 請求
}
})
.option(ChannelOption.SO_BACKLOG, 128)
.childOption(ChannelOption.SO_KEEPALIVE, true);
String[] array = serverAddress.split(":");
String host = array[0];
int port = Integer.parseInt(array[1]);
ChannelFuture future = bootstrap.bind(host, port).sync();
LOGGER.debug("server started on port {}", port);
if (serviceRegistry != null) {
serviceRegistry.register(serverAddress); // 註冊服務地址
}
future.channel().closeFuture().sync();
} finally {
workerGroup.shutdownGracefully();
bossGroup.shutdownGracefully();
}
}
}
以上程式碼中,有兩個重要的 POJO 需要描述一下,它們分別是RpcRequest
與RpcResponse
。
使用RpcRequest
封裝 RPC 請求,程式碼如下:
<!-- lang: java -->
public class RpcRequest {
private String requestId;
private String className;
private String methodName;
private Class<?>[] parameterTypes;
private Object[] parameters;
// getter/setter...
}
使用RpcResponse
封裝 RPC 響應,程式碼如下:
<!-- lang: java -->
public class RpcResponse {
privateStringrequestId;
privateThrowableerror;
privateObjectresult;
// getter/setter...
}
使用RpcDecoder
提供 RPC 解碼,只需擴充套件 Netty 的ByteToMessageDecoder
抽象類的decode
方法即可,程式碼如下:
<!-- lang: java -->
publicclassRpcDecoderextendsByteToMessageDecoder {
private Class<?> genericClass;
publicRpcDecoder(Class<?> genericClass){
this.genericClass = genericClass;
}
@Override
publicvoiddecode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
if (in.readableBytes() < 4) {
return;
}
in.markReaderIndex();
int dataLength = in.readInt();
if (dataLength < 0) {
ctx.close();
}
if (in.readableBytes() < dataLength) {
in.resetReaderIndex();
return;
}
byte[] data = newbyte[dataLength];
in.readBytes(data);
Object obj = SerializationUtil.deserialize(data, genericClass);
out.add(obj);
}
}
使用RpcEncoder
提供 RPC 編碼,只需擴充套件 Netty 的MessageToByteEncoder
抽象類的encode
方法即可,程式碼如下:
<!-- lang: java -->
publicclass RpcEncoder extends MessageToByteEncoder {
privateClass<?> genericClass;
public RpcEncoder(Class<?> genericClass) {
this.genericClass = genericClass;
}
@Override
public void encode(ChannelHandlerContext ctx, Object in
相關推薦
輕量級分散式 RPC 框架 netty+protostuff+zk +Spring
RPC,即 Remote Procedure Call(遠端過程呼叫),說得通俗一點就是:呼叫遠端計算機上的服務,就像呼叫本地服務一樣。
RPC 可基於 HTTP 或 TCP 協議,Web Service 就是基於 HTTP 協議的 RPC,它具有良好的跨平臺性,但其效能卻不如基於 TCP 協議的 RP
輕量級分散式RPC框架實現(續)
1、背景
最近在搜尋Netty和Zookeeper方面的文章時,看到了這篇文章《輕量級分散式 RPC 框架》,作者用Zookeeper、Netty和Spring寫了一個輕量級的分散式RPC框架。花了一些時間看了下他的程式碼,寫的乾淨簡單,寫的RPC框架可以算是一個簡易版的
一個輕量級分散式RPC框架--NettyRpc
1、背景
最近在搜尋Netty和Zookeeper方面的文章時,看到了這篇文章《輕量級分散式 RPC 框架》,作者用Zookeeper、Netty和Spring寫了一個輕量級的分散式RPC框架。花了一些時間看了下他的程式碼,寫的乾淨簡單,寫的RPC框架可以算是一個簡易版的dubbo。這個RPC框架雖小,但是
Dubbo+zk/Redis 分散式RPC框架
Dubbo是什麼?Dubbo是阿里巴巴SOA服務化治理方案的核心框架,每天為2,000多個服務提供30多億次訪問量支援,並被廣泛應用於阿里巴巴集團的各成員站點。
Dubbo是一個分散式服務框架,致力於提供高效能和透明化的RPC遠端服務呼叫方案,以及SOA服務治理方案。
核心部
基於Netty的分散式 RPC 框架
轉載自:http://blog.csdn.net/z69183787/article/details/52700274
http://blog.csdn.net/z69183787/article/details/52680941
從零寫分散式RPC框架 系列 第一版 (1)架構設計
本系列文章的目的是搭建出一個基於Netty,Zookeeper和SpringBoot的簡易分散式RPC框架,並且釋出到Maven中央倉庫以 spring-boot-starter 的形式對外提供開箱即用的服務。1.0 版本使用 protobuf 來做序列化,最終的使用形式比較接近於 Du
從零寫分散式RPC框架 系列 1.0 (2)RPC-Common模組設計實現
RPC-Common模組提供RPC-Server和RPC-Client的通用物件,封裝統一規則,使RPC Server和RPC Client 可以基於同一協議通訊。主要包含底層通訊的Netty所需的編碼解碼器(RpcEncoder,RpcDecoder),實現自定義協議的傳輸物件(Rpc
從零寫分散式RPC框架 系列 1.0 (5)整合測試
本篇將對前面幾篇模組作整合處理,使用spring-boot-starter的形式進行釋出。然後新建 examples 工程模組對其測試使用。
系列文章:
從零寫分散式RPC框架 系列 1.0 (1)架構設計 從零寫分散式RPC框架 系列 1.0 (2)RPC-Common模組設計
從零寫分散式RPC框架 系列 1.0 (4)RPC-Client模組設計實現
RPC-Client模組負責建立 動態代理物件 供 服務消費者 使用,而動態代理物件的方法執行則是通過RPC呼叫RPC-Server的服務實現。即RPC-Client遮蔽了底層的通訊過程,使得服務消費者可以基於介面透明使用服務提供者的服務。
系列文章:
從零寫分散式RPC框架 系
從零寫分散式RPC框架 系列 1.0 (3)RPC-Server模組設計實現
RPC-Server模組負責(1)將@RpcService註解標記的服務和自身資訊註冊到ZK叢集,(2)對外提供RPC服務實現,處理來自RPC-Client的請求。該模組整體的核心類為 RpcServer ,而真正處理請求的核心類是 RpcServerHandler 。另外還有一個 ZK
Motan 1.1.3 釋出,微博開源的高效能分散式 RPC 框架
Motan 1.1.3 已釋出,這是一個小的修復版本,主要是解決了與 zookeeper string serializer 的相容問題。#707
Motan 是微博團隊開源的一套高效能、易於使用的分散式 RPC 框架。功能包括:
支援通過 spring 配置方式整合
從零寫分散式RPC框架 系列 2.0 (4)使用BeanPostProcessor實現自定義@RpcReference註解注入
之前服務提供方 RpcServer 我們是使用 ApplicationContextAware 來掃描 @RpcService 註解,新增一個註解即可實現服務暴露。現在,我們用 BeanPostProcessor 來實現服務注入,自動將服務實現類注入到被@RpcReference註解標記
從零寫分散式RPC框架 系列 2.0 (3)RPC-Server和RPC-Client模組改造
2.0版本RPC-Server改動不大,主要變化在於RPC-Client使用了服務地址快取,並引入監控機制,第一時間獲取zk叢集中服務地址資訊變化並重新整理本地快取。另外,RPC-Client還使用了RpcClientProperties開放對負載均衡策略和序列化策略的選擇。
系列文
從零寫分散式RPC框架 系列 2.0 (2)RPC-Common模組設計實現
RPC-Common模組相對於1.0版本複雜了很多,最主要的變化在於將 Rpc的Netty處理器從RPC-Server和RPC-Client收回。1.0 版本的設計思路是儘可能減少冗餘依賴,所以RPC-Common一般只放通用的功能。現在則是儘可能都放在RPC-Common模組,以方便工
從零寫分散式RPC框架 系列 2.0 (1)架構升級
針對1.0版本的效能問題,本版本做了從服務地址列表快取等方面做了優化處理,並加入負載均衡引擎、序列化引擎、服務端限流等新功能,並對通訊模型進行改造,使其支援新特性、避免粘包半包問題並對後續升級改造留下支援空間。具體可見 專案GitHub地址 。本文將介紹 2.0 版本的邏輯架構和模型設計
分散式RPC框架效能大比拼
來源:http://colobu.com/2016/09/05/benchmarks-of-popular-rpc-frameworks/
Dubbo 是阿里巴巴公司開源的一個Java高效能優秀的服務框架,使得應用可通過高效能的 RPC 實現服務的輸出和輸入功能,可以和 Spring框架無縫整合。
簡述分散式RPC框架
RPC定義:遠端過程呼叫,是實現分散式計算的基礎。實現方式:1.基於TCP協議的RPC;2.基於HTTP協議的RPC;處理過程:一個簡單的RPC過程包括一個服務消費者和服務提供者,服務消費者需要使用服務提供者的提供的服務,就需要傳給服務提供方相關的資訊,這些資訊包括服務名稱(
超輕量級DI容器框架Google Guice與Spring框架的區別教程詳解及其demo程式碼片段分享
依賴注入,DI(Dependency Injection),它的作用自然不必多說,提及DI容器,例如spring,picoContainer,EJB容器等等,近日,google誕生了更輕巧的DI容器……Guice!
廢話不多講了,先看看Guice是如何實現注入的吧。
定
從零寫分散式RPC框架 系列 1.0 (1)架構設計
本系列文章的目的是搭建出一個基於Netty,Zookeeper和SpringBoot的簡易分散式RPC框架,並且釋出到Maven中央倉庫以 spring-boot-starter 的形式對外提供開箱即用的服務。1.0 版本使用 protobuf 來做序列化,最終
如何實現一個分散式 RPC 框架
遠端過程呼叫(Remote Procedure Call,RPC)是一個計算機通訊協議。該協議允許運行於一臺計算機的程式呼叫另一臺計算機的子程式,而程式設計師無需額外地為這個互動作用程式設計。RPC的主要目標是讓構建分散式應用更加容易,在提供強大的遠端呼叫能力的同時不損失