1. 程式人生 > >輕量級分散式 RPC 框架 netty+protostuff+zk +Spring

輕量級分散式 RPC 框架 netty+protostuff+zk +Spring

RPC,即 Remote Procedure Call(遠端過程呼叫),說得通俗一點就是:呼叫遠端計算機上的服務,就像呼叫本地服務一樣。

RPC 可基於 HTTP 或 TCP 協議,Web Service 就是基於 HTTP 協議的 RPC,它具有良好的跨平臺性,但其效能卻不如基於 TCP 協議的 RPC。會兩方面會直接影響 RPC 的效能,一是傳輸方式,二是序列化。

眾所周知,TCP 是傳輸層協議,HTTP 是應用層協議,而傳輸層較應用層更加底層,在資料傳輸方面,越底層越快,因此,在一般情況下,TCP 一定比 HTTP 快。就序列化而言,Java 提供了預設的序列化方式,但在高併發的情況下,這種方式將會帶來一些效能上的瓶頸,於是市面上出現了一系列優秀的序列化框架,比如:Protobuf、Kryo、Hessian、Jackson 等,它們可以取代 Java 預設的序列化,從而提供更高效的效能。

為了支援高併發,傳統的阻塞式 IO 顯然不太合適,因此我們需要非同步的 IO,即 NIO。Java 提供了 NIO 的解決方案,Java 7 也提供了更優秀的 NIO.2 支援,用 Java 實現 NIO 並不是遙不可及的事情,只是需要我們熟悉 NIO 的技術細節。

我們需要將服務部署在分散式環境下的不同節點上,通過服務註冊的方式,讓客戶端來自動發現當前可用的服務,並呼叫這些服務。這需要一種服務登錄檔(Service Registry)的元件,讓它來註冊分散式環境下所有的服務地址(包括:主機名與埠號)。

應用、服務、服務登錄檔之間的關係見下圖:

系統架構

每臺 Server 上可釋出多個 Service,這些 Service 共用一個 host 與 port,在分散式環境下會提供 Server 共同對外提供 Service。此外,為防止 Service Registry 出現單點故障,因此需要將其搭建為叢集環境。

本文將為您揭曉開發輕量級分散式 RPC 框架的具體過程,該框架基於 TCP 協議,提供了 NIO 特性,提供高效的序列化方式,同時也具備服務註冊與發現的能力。

根據以上技術需求,我們可使用如下技術選型:

  1. Spring:它是最強大的依賴注入框架,也是業界的權威標準。
  2. Netty:它使 NIO 程式設計更加容易,遮蔽了 Java 底層的 NIO 細節。
  3. Protostuff:它基於 Protobuf 序列化框架,面向 POJO,無需編寫 .proto 檔案。
  4. ZooKeeper:提供服務註冊與發現功能,開發分散式系統的必備選擇,同時它也具備天生的叢集能力。

相關 Maven 依賴請見附錄。

第一步:編寫服務介面

<!-- lang: java -->
public interface HelloService {

    String hello(String name);
}

將該介面放在獨立的客戶端 jar 包中,以供應用使用。

第二步:編寫服務介面的實現類

<!-- lang: java -->
@RpcService(HelloService.class) // 指定遠端介面
publicclassHelloServiceImplimplementsHelloService {
@Override
public String hello(String name) {
        return"Hello! " + name;
    }
}

使用RpcService註解定義在服務介面的實現類上,需要對該實現類指定遠端介面,因為實現類可能會實現多個介面,一定要告訴框架哪個才是遠端介面。

RpcService程式碼如下:

<!-- lang: java -->
@Target({ElementType.TYPE})
@Retention(RetentionPolicy.RUNTIME)
@Component // 表明可被 Spring 掃描
public @interface RpcService {

    Class<?> value();
}

該註解具備 Spring 的Component註解的特性,可被 Spring 掃描。

該實現類放在服務端 jar 包中,該 jar 包還提供了一些服務端的配置檔案與啟動服務的載入程式。

第三步:配置服務端

服務端 Spring 配置檔名為spring.xml,內容如下:

<!-- lang: xml -->
<beans...>
<context:component-scanbase-package="com.xxx.rpc.sample.server"/>
<context:property-placeholderlocation="classpath:config.properties"/>
<!-- 配置服務註冊元件 -->
<beanid="serviceRegistry"class="com.xxx.rpc.registry.ServiceRegistry">
<constructor-argname="registryAddress"value="${registry.address}"/>
</bean>
<!-- 配置 RPC 伺服器 -->
<beanid="rpcServer"class="com.xxx.rpc.server.RpcServer">
<constructor-argname="serverAddress"value="${server.address}"/>
<constructor-argname="serviceRegistry"ref="serviceRegistry"/>
</bean>
</beans>

具體的配置引數在config.properties檔案中,內容如下:

<!-- lang: java -->
# ZooKeeper 伺服器
registry.address=127.0.0.1:2181

# RPC 伺服器
server.address=127.0.0.1:8000

以上配置表明:連線本地的 ZooKeeper 伺服器,並在 8000 埠上釋出 RPC 服務。

第四步:啟動伺服器併發布服務

為了載入 Spring 配置檔案來發布服務,只需編寫一個載入程式即可:

<!-- lang: java -->
public class RpcBootstrap {

    publicstaticvoidmain(String[] args) {
        newClassPathXmlApplicationContext("spring.xml");
    }
}

執行RpcBootstrap類的main方法即可啟動服務端,但還有兩個重要的元件尚未實現,它們分別是:ServiceRegistryRpcServer,下文會給出具體實現細節。

第五步:實現服務註冊

使用 ZooKeeper 客戶端可輕鬆實現服務註冊功能,ServiceRegistry程式碼如下:

<!-- lang: java -->
public class ServiceRegistry {

    privatestaticfinal Logger LOGGER = LoggerFactory.getLogger(ServiceRegistry.class);

    private CountDownLatch latch = new CountDownLatch(1);

    private String registryAddress;

    publicServiceRegistry(String registryAddress){
        this.registryAddress = registryAddress;
    }

    publicvoidregister(String data){
        if (data != null) {
            ZooKeeper zk = connectServer();
            if (zk != null) {
                createNode(zk, data);
            }
        }
    }

    privateZooKeeper connectServer(){
        ZooKeeper zk = null;
        try {
            zk = new ZooKeeper(registryAddress, Constant.ZK_SESSION_TIMEOUT, new Watcher() {
                @Override
publicvoidprocess(WatchedEvent event){
                    if (event.getState() == Event.KeeperState.SyncConnected) {
                        latch.countDown();
                    }
                }
            });
            latch.await();
        } catch (IOException | InterruptedException e) {
            LOGGER.error("", e);
        }
        return zk;
    }

    privatevoidcreateNode(ZooKeeper zk, String data){
        try {
            byte[] bytes = data.getBytes();
            String path = zk.create(Constant.ZK_DATA_PATH, bytes, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL);
            LOGGER.debug("create zookeeper node ({} => {})", path, data);
        } catch (KeeperException | InterruptedException e) {
            LOGGER.error("", e);
        }
    }
}

其中,通過Constant配置了所有的常量:

<!-- lang: java -->
public interface Constant {

    intZK_SESSION_TIMEOUT = 5000;

    StringZK_REGISTRY_PATH = "/registry";
    StringZK_DATA_PATH = ZK_REGISTRY_PATH + "/data";
}

注意:首先需要使用 ZooKeeper 客戶端命令列建立/registry永久節點,用於存放所有的服務臨時節點。

第六步:實現 RPC 伺服器

使用 Netty 可實現一個支援 NIO 的 RPC 伺服器,需要使用ServiceRegistry註冊服務地址,RpcServer程式碼如下:

<!-- lang: java -->
publicclassRpcServerimplementsApplicationContextAware, InitializingBean {
privatestaticfinal Logger LOGGER = LoggerFactory.getLogger(RpcServer.class);

    private String serverAddress;
    private ServiceRegistry serviceRegistry;

    private Map<String, Object> handlerMap = new HashMap<>(); // 存放介面名與服務物件之間的對映關係
public RpcServer(String serverAddress) {
        this.serverAddress = serverAddress;
    }

    public RpcServer(String serverAddress, ServiceRegistry serviceRegistry) {
        this.serverAddress = serverAddress;
        this.serviceRegistry = serviceRegistry;
    }

    @Override
publicvoid setApplicationContext(ApplicationContext ctx) throws BeansException {
        Map<String, Object> serviceBeanMap = ctx.getBeansWithAnnotation(RpcService.class); // 獲取所有帶有 RpcService 註解的 Spring Bean
if (MapUtils.isNotEmpty(serviceBeanMap)) {
            for (Object serviceBean : serviceBeanMap.values()) {
                String interfaceName = serviceBean.getClass().getAnnotation(RpcService.class).value().getName();
                handlerMap.put(interfaceName, serviceBean);
            }
        }
    }

    @Override
publicvoid afterPropertiesSet() throws Exception {
        EventLoopGroup bossGroup = new NioEventLoopGroup();
        EventLoopGroup workerGroup = new NioEventLoopGroup();
        try {
            ServerBootstrap bootstrap = new ServerBootstrap();
            bootstrap.group(bossGroup, workerGroup).channel(NioServerSocketChannel.class)
                .childHandler(new ChannelInitializer<SocketChannel>() {
                    @Override
publicvoid initChannel(SocketChannel channel) throws Exception {
                        channel.pipeline()
                            .addLast(new RpcDecoder(RpcRequest.class)) // 將 RPC 請求進行解碼(為了處理請求)
                            .addLast(new RpcEncoder(RpcResponse.class)) // 將 RPC 響應進行編碼(為了返回響應)
                            .addLast(new RpcHandler(handlerMap)); // 處理 RPC 請求
                    }
                })
                .option(ChannelOption.SO_BACKLOG, 128)
                .childOption(ChannelOption.SO_KEEPALIVE, true);

            String[] array = serverAddress.split(":");
            String host = array[0];
            int port = Integer.parseInt(array[1]);

            ChannelFuture future = bootstrap.bind(host, port).sync();
            LOGGER.debug("server started on port {}", port);

            if (serviceRegistry != null) {
                serviceRegistry.register(serverAddress); // 註冊服務地址
            }

            future.channel().closeFuture().sync();
        } finally {
            workerGroup.shutdownGracefully();
            bossGroup.shutdownGracefully();
        }
    }
}

以上程式碼中,有兩個重要的 POJO 需要描述一下,它們分別是RpcRequestRpcResponse

使用RpcRequest封裝 RPC 請求,程式碼如下:

<!-- lang: java -->
public class RpcRequest {

    private String requestId;
    private String className;
    private String methodName;
    private Class<?>[] parameterTypes;
    private Object[] parameters;

    // getter/setter...
}

使用RpcResponse封裝 RPC 響應,程式碼如下:

<!-- lang: java -->
public class RpcResponse {

    privateStringrequestId;
    privateThrowableerror;
    privateObjectresult;

    // getter/setter...
}

使用RpcDecoder提供 RPC 解碼,只需擴充套件 Netty 的ByteToMessageDecoder抽象類的decode方法即可,程式碼如下:

<!-- lang: java -->
publicclassRpcDecoderextendsByteToMessageDecoder {

    private Class<?> genericClass;

    publicRpcDecoder(Class<?> genericClass){
        this.genericClass = genericClass;
    }

    @Override
    publicvoiddecode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
        if (in.readableBytes() < 4) {
            return;
        }
        in.markReaderIndex();
        int dataLength = in.readInt();
        if (dataLength < 0) {
            ctx.close();
        }
        if (in.readableBytes() < dataLength) {
            in.resetReaderIndex();
            return;
        }
        byte[] data = newbyte[dataLength];
        in.readBytes(data);

        Object obj = SerializationUtil.deserialize(data, genericClass);
        out.add(obj);
    }
}

使用RpcEncoder提供 RPC 編碼,只需擴充套件 Netty 的MessageToByteEncoder抽象類的encode方法即可,程式碼如下:

<!-- lang: java -->
publicclass RpcEncoder extends MessageToByteEncoder {

    privateClass<?> genericClass;

    public RpcEncoder(Class<?> genericClass) {
        this.genericClass = genericClass;
    }

    @Override
    public void encode(ChannelHandlerContext ctx, Object in
            
           

相關推薦

輕量級分散式 RPC 框架 netty+protostuff+zk +Spring

RPC,即 Remote Procedure Call(遠端過程呼叫),說得通俗一點就是:呼叫遠端計算機上的服務,就像呼叫本地服務一樣。 RPC 可基於 HTTP 或 TCP 協議,Web Service 就是基於 HTTP 協議的 RPC,它具有良好的跨平臺性,但其效能卻不如基於 TCP 協議的 RP

輕量級分散式RPC框架實現(續)

1、背景 最近在搜尋Netty和Zookeeper方面的文章時,看到了這篇文章《輕量級分散式 RPC 框架》,作者用Zookeeper、Netty和Spring寫了一個輕量級的分散式RPC框架。花了一些時間看了下他的程式碼,寫的乾淨簡單,寫的RPC框架可以算是一個簡易版的

一個輕量級分散式RPC框架--NettyRpc

1、背景 最近在搜尋Netty和Zookeeper方面的文章時,看到了這篇文章《輕量級分散式 RPC 框架》,作者用Zookeeper、Netty和Spring寫了一個輕量級的分散式RPC框架。花了一些時間看了下他的程式碼,寫的乾淨簡單,寫的RPC框架可以算是一個簡易版的dubbo。這個RPC框架雖小,但是

Dubbo+zk/Redis 分散式RPC框架

Dubbo是什麼?Dubbo是阿里巴巴SOA服務化治理方案的核心框架,每天為2,000多個服務提供30多億次訪問量支援,並被廣泛應用於阿里巴巴集團的各成員站點。 Dubbo是一個分散式服務框架,致力於提供高效能和透明化的RPC遠端服務呼叫方案,以及SOA服務治理方案。 核心部

基於Netty分散式 RPC 框架

轉載自:http://blog.csdn.net/z69183787/article/details/52700274               http://blog.csdn.net/z69183787/article/details/52680941        

從零寫分散式RPC框架 系列 第一版 (1)架構設計

本系列文章的目的是搭建出一個基於Netty,Zookeeper和SpringBoot的簡易分散式RPC框架,並且釋出到Maven中央倉庫以 spring-boot-starter 的形式對外提供開箱即用的服務。1.0 版本使用 protobuf 來做序列化,最終的使用形式比較接近於 Du

從零寫分散式RPC框架 系列 1.0 (2)RPC-Common模組設計實現

RPC-Common模組提供RPC-Server和RPC-Client的通用物件,封裝統一規則,使RPC Server和RPC Client 可以基於同一協議通訊。主要包含底層通訊的Netty所需的編碼解碼器(RpcEncoder,RpcDecoder),實現自定義協議的傳輸物件(Rpc

從零寫分散式RPC框架 系列 1.0 (5)整合測試

本篇將對前面幾篇模組作整合處理,使用spring-boot-starter的形式進行釋出。然後新建 examples 工程模組對其測試使用。 系列文章: 從零寫分散式RPC框架 系列 1.0 (1)架構設計 從零寫分散式RPC框架 系列 1.0 (2)RPC-Common模組設計

從零寫分散式RPC框架 系列 1.0 (4)RPC-Client模組設計實現

RPC-Client模組負責建立 動態代理物件 供 服務消費者 使用,而動態代理物件的方法執行則是通過RPC呼叫RPC-Server的服務實現。即RPC-Client遮蔽了底層的通訊過程,使得服務消費者可以基於介面透明使用服務提供者的服務。 系列文章: 從零寫分散式RPC框架 系

從零寫分散式RPC框架 系列 1.0 (3)RPC-Server模組設計實現

RPC-Server模組負責(1)將@RpcService註解標記的服務和自身資訊註冊到ZK叢集,(2)對外提供RPC服務實現,處理來自RPC-Client的請求。該模組整體的核心類為 RpcServer ,而真正處理請求的核心類是 RpcServerHandler 。另外還有一個 ZK

Motan 1.1.3 釋出,微博開源的高效能分散式 RPC 框架

   Motan 1.1.3 已釋出,這是一個小的修復版本,主要是解決了與 zookeeper string serializer 的相容問題。#707 Motan 是微博團隊開源的一套高效能、易於使用的分散式 RPC 框架。功能包括: 支援通過 spring 配置方式整合

從零寫分散式RPC框架 系列 2.0 (4)使用BeanPostProcessor實現自定義@RpcReference註解注入

之前服務提供方 RpcServer 我們是使用 ApplicationContextAware 來掃描 @RpcService 註解,新增一個註解即可實現服務暴露。現在,我們用 BeanPostProcessor 來實現服務注入,自動將服務實現類注入到被@RpcReference註解標記

從零寫分散式RPC框架 系列 2.0 (3)RPC-Server和RPC-Client模組改造

2.0版本RPC-Server改動不大,主要變化在於RPC-Client使用了服務地址快取,並引入監控機制,第一時間獲取zk叢集中服務地址資訊變化並重新整理本地快取。另外,RPC-Client還使用了RpcClientProperties開放對負載均衡策略和序列化策略的選擇。 系列文

從零寫分散式RPC框架 系列 2.0 (2)RPC-Common模組設計實現

RPC-Common模組相對於1.0版本複雜了很多,最主要的變化在於將 Rpc的Netty處理器從RPC-Server和RPC-Client收回。1.0 版本的設計思路是儘可能減少冗餘依賴,所以RPC-Common一般只放通用的功能。現在則是儘可能都放在RPC-Common模組,以方便工

從零寫分散式RPC框架 系列 2.0 (1)架構升級

針對1.0版本的效能問題,本版本做了從服務地址列表快取等方面做了優化處理,並加入負載均衡引擎、序列化引擎、服務端限流等新功能,並對通訊模型進行改造,使其支援新特性、避免粘包半包問題並對後續升級改造留下支援空間。具體可見 專案GitHub地址 。本文將介紹 2.0 版本的邏輯架構和模型設計

分散式RPC框架效能大比拼

來源:http://colobu.com/2016/09/05/benchmarks-of-popular-rpc-frameworks/ Dubbo 是阿里巴巴公司開源的一個Java高效能優秀的服務框架,使得應用可通過高效能的 RPC 實現服務的輸出和輸入功能,可以和 Spring框架無縫整合。

簡述分散式RPC框架

RPC定義:遠端過程呼叫,是實現分散式計算的基礎。實現方式:1.基於TCP協議的RPC;2.基於HTTP協議的RPC;處理過程:一個簡單的RPC過程包括一個服務消費者和服務提供者,服務消費者需要使用服務提供者的提供的服務,就需要傳給服務提供方相關的資訊,這些資訊包括服務名稱(

輕量級DI容器框架Google Guice與Spring框架的區別教程詳解及其demo程式碼片段分享

依賴注入,DI(Dependency Injection),它的作用自然不必多說,提及DI容器,例如spring,picoContainer,EJB容器等等,近日,google誕生了更輕巧的DI容器……Guice! 廢話不多講了,先看看Guice是如何實現注入的吧。 定

從零寫分散式RPC框架 系列 1.0 (1)架構設計

本系列文章的目的是搭建出一個基於Netty,Zookeeper和SpringBoot的簡易分散式RPC框架,並且釋出到Maven中央倉庫以 spring-boot-starter 的形式對外提供開箱即用的服務。1.0 版本使用 protobuf 來做序列化,最終

如何實現一個分散式 RPC 框架

遠端過程呼叫(Remote Procedure Call,RPC)是一個計算機通訊協議。該協議允許運行於一臺計算機的程式呼叫另一臺計算機的子程式,而程式設計師無需額外地為這個互動作用程式設計。RPC的主要目標是讓構建分散式應用更加容易,在提供強大的遠端呼叫能力的同時不損失