1. 程式人生 > >一個輕量級分散式RPC框架--NettyRpc

一個輕量級分散式RPC框架--NettyRpc

1、背景

最近在搜尋Netty和Zookeeper方面的文章時,看到了這篇文章《輕量級分散式 RPC 框架》,作者用Zookeeper、Netty和Spring寫了一個輕量級的分散式RPC框架。花了一些時間看了下他的程式碼,寫的乾淨簡單,寫的RPC框架可以算是一個簡易版的dubbo。這個RPC框架雖小,但是麻雀雖小,五臟俱全,有興趣的可以學習一下。

本人在這個簡易版的RPC上添加了如下特性:

* 服務非同步呼叫的支援,回撥函式callback的支援

* 客戶端使用長連線(在多次呼叫共享連線)

* 服務端非同步多執行緒處理RPC請求

2、簡介

RPC,即 Remote Procedure Call(遠端過程呼叫),呼叫遠端計算機上的服務,就像呼叫本地服務一樣。RPC可以很好的解耦系統,如WebService就是一種基於Http協議的RPC。

這個RPC整體框架如下:

這個RPC框架使用的一些技術所解決的問題:

服務釋出與訂閱:服務端使用Zookeeper註冊服務地址,客戶端從Zookeeper獲取可用的服務地址。

通訊:使用Netty作為通訊框架。

Spring:使用Spring配置服務,載入Bean,掃描註解。

動態代理:客戶端使用代理模式透明化服務呼叫。

訊息編解碼:使用Protostuff序列化和反序列化訊息。

3、服務端釋出服務

使用註解標註要釋出的服務

服務註解

@Target({ElementType.TYPE})
@Retention(RetentionPolicy.RUNTIME)
@Component
public @interface RpcService { Class<?> value(); }

一個服務介面:

public interface HelloService {

    String hello(String name);

    String hello(Person person);
}

一個服務實現:使用註解標註

@RpcService(HelloService.class)
public class HelloServiceImpl implements HelloService {

    @Override
    
public String hello(String name) { return "Hello! " + name; } @Override public String hello(Person person) { return "Hello! " + person.getFirstName() + " " + person.getLastName(); } }

服務在啟動的時候掃描得到所有的服務介面及其實現:

@Override
    public void setApplicationContext(ApplicationContext ctx) throws BeansException {
        Map<String, Object> serviceBeanMap = ctx.getBeansWithAnnotation(RpcService.class);
        if (MapUtils.isNotEmpty(serviceBeanMap)) {
            for (Object serviceBean : serviceBeanMap.values()) {
                String interfaceName = serviceBean.getClass().getAnnotation(RpcService.class).value().getName();
                handlerMap.put(interfaceName, serviceBean);
            }
        }
    }

在Zookeeper叢集上註冊服務地址:

public class ServiceRegistry {

    private static final Logger LOGGER = LoggerFactory.getLogger(ServiceRegistry.class);

    private CountDownLatch latch = new CountDownLatch(1);

    private String registryAddress;

    public ServiceRegistry(String registryAddress) {
        this.registryAddress = registryAddress;
    }

    public void register(String data) {
        if (data != null) {
            ZooKeeper zk = connectServer();
            if (zk != null) {
                AddRootNode(zk); // Add root node if not exist
                createNode(zk, data);
            }
        }
    }

    private ZooKeeper connectServer() {
        ZooKeeper zk = null;
        try {
            zk = new ZooKeeper(registryAddress, Constant.ZK_SESSION_TIMEOUT, new Watcher() {
                @Override
                public void process(WatchedEvent event) {
                    if (event.getState() == Event.KeeperState.SyncConnected) {
                        latch.countDown();
                    }
                }
            });
            latch.await();
        } catch (IOException e) {
            LOGGER.error("", e);
        }
        catch (InterruptedException ex){
            LOGGER.error("", ex);
        }
        return zk;
    }

    private void AddRootNode(ZooKeeper zk){
        try {
            Stat s = zk.exists(Constant.ZK_REGISTRY_PATH, false);
            if (s == null) {
                zk.create(Constant.ZK_REGISTRY_PATH, new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
            }
        } catch (KeeperException e) {
            LOGGER.error(e.toString());
        } catch (InterruptedException e) {
            LOGGER.error(e.toString());
        }
    }

    private void createNode(ZooKeeper zk, String data) {
        try {
            byte[] bytes = data.getBytes();
            String path = zk.create(Constant.ZK_DATA_PATH, bytes, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL);
            LOGGER.debug("create zookeeper node ({} => {})", path, data);
        } catch (KeeperException e) {
            LOGGER.error("", e);
        }
        catch (InterruptedException ex){
            LOGGER.error("", ex);
        }
    }
}
ServiceRegistry

這裡在原文的基礎上加了AddRootNode()判斷服務父節點是否存在,如果不存在則新增一個PERSISTENT的服務父節點,這樣雖然啟動服務時多了點判斷,但是不需要手動命令新增服務父節點了。

關於Zookeeper的使用原理,可以看這裡《ZooKeeper基本原理》。

4、客戶端呼叫服務

使用代理模式呼叫服務:

public class RpcProxy {

    private String serverAddress;
    private ServiceDiscovery serviceDiscovery;

    public RpcProxy(String serverAddress) {
        this.serverAddress = serverAddress;
    }

    public RpcProxy(ServiceDiscovery serviceDiscovery) {
        this.serviceDiscovery = serviceDiscovery;
    }

    @SuppressWarnings("unchecked")
    public <T> T create(Class<?> interfaceClass) {
        return (T) Proxy.newProxyInstance(
                interfaceClass.getClassLoader(),
                new Class<?>[]{interfaceClass},
                new InvocationHandler() {
                    @Override
                    public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
                        RpcRequest request = new RpcRequest();
                        request.setRequestId(UUID.randomUUID().toString());
                        request.setClassName(method.getDeclaringClass().getName());
                        request.setMethodName(method.getName());
                        request.setParameterTypes(method.getParameterTypes());
                        request.setParameters(args);

                        if (serviceDiscovery != null) {
                            serverAddress = serviceDiscovery.discover();
                        }
                        if(serverAddress != null){
                            String[] array = serverAddress.split(":");
                            String host = array[0];
                            int port = Integer.parseInt(array[1]);

                            RpcClient client = new RpcClient(host, port);
                            RpcResponse response = client.send(request);

                            if (response.isError()) {
                                throw new RuntimeException("Response error.",new Throwable(response.getError()));
                            } else {
                                return response.getResult();
                            }
                        }
                        else{
                            throw new RuntimeException("No server address found!");
                        }
                    }
                }
        );
    }
}

這裡每次使用代理遠端呼叫服務,從Zookeeper上獲取可用的服務地址,通過RpcClient send一個Request,等待該Request的Response返回。這裡原文有個比較嚴重的bug,在原文給出的簡單的Test中是很難測出來的,原文使用了obj的wait和notifyAll來等待Response返回,會出現“假死等待”的情況:一個Request傳送出去後,在obj.wait()呼叫之前可能Response就返回了,這時候在channelRead0裡已經拿到了Response並且obj.notifyAll()已經在obj.wait()之前呼叫了,這時候send後再obj.wait()就出現了假死等待,客戶端就一直等待在這裡。使用CountDownLatch可以解決這個問題。

注意:這裡每次呼叫的send時候才去和服務端建立連線,使用的是短連線,這種短連線在高併發時會有連線數問題,也會影響效能。

從Zookeeper上獲取服務地址:

public class ServiceDiscovery {

    private static final Logger LOGGER = LoggerFactory.getLogger(ServiceDiscovery.class);

    private CountDownLatch latch = new CountDownLatch(1);

    private volatile List<String> dataList = new ArrayList<>();

    private String registryAddress;

    public ServiceDiscovery(String registryAddress) {
        this.registryAddress = registryAddress;
        ZooKeeper zk = connectServer();
        if (zk != null) {
            watchNode(zk);
        }
    }

    public String discover() {
        String data = null;
        int size = dataList.size();
        if (size > 0) {
            if (size == 1) {
                data = dataList.get(0);
                LOGGER.debug("using only data: {}", data);
            } else {
                data = dataList.get(ThreadLocalRandom.current().nextInt(size));
                LOGGER.debug("using random data: {}", data);
            }
        }
        return data;
    }

    private ZooKeeper connectServer() {
        ZooKeeper zk = null;
        try {
            zk = new ZooKeeper(registryAddress, Constant.ZK_SESSION_TIMEOUT, new Watcher() {
                @Override
                public void process(WatchedEvent event) {
                    if (event.getState() == Event.KeeperState.SyncConnected) {
                        latch.countDown();
                    }
                }
            });
            latch.await();
        } catch (IOException | InterruptedException e) {
            LOGGER.error("", e);
        }
        return zk;
    }

    private void watchNode(final ZooKeeper zk) {
        try {
            List<String> nodeList = zk.getChildren(Constant.ZK_REGISTRY_PATH, new Watcher() {
                @Override
                public void process(WatchedEvent event) {
                    if (event.getType() == Event.EventType.NodeChildrenChanged) {
                        watchNode(zk);
                    }
                }
            });
            List<String> dataList = new ArrayList<>();
            for (String node : nodeList) {
                byte[] bytes = zk.getData(Constant.ZK_REGISTRY_PATH + "/" + node, false, null);
                dataList.add(new String(bytes));
            }
            LOGGER.debug("node data: {}", dataList);
            this.dataList = dataList;
        } catch (KeeperException | InterruptedException e) {
            LOGGER.error("", e);
        }
    }
}
ServiceDiscovery

每次服務地址節點發生變化,都需要再次watchNode,獲取新的服務地址列表。

5、訊息編碼

請求訊息:

public class RpcRequest {

    private String requestId;
    private String className;
    private String methodName;
    private Class<?>[] parameterTypes;
    private Object[] parameters;

    public String getRequestId() {
        return requestId;
    }

    public void setRequestId(String requestId) {
        this.requestId = requestId;
    }

    public String getClassName() {
        return className;
    }

    public void setClassName(String className) {
        this.className = className;
    }

    public String getMethodName() {
        return methodName;
    }

    public void setMethodName(String methodName) {
        this.methodName = methodName;
    }

    public Class<?>[] getParameterTypes() {
        return parameterTypes;
    }

    public void setParameterTypes(Class<?>[] parameterTypes) {
        this.parameterTypes = parameterTypes;
    }

    public Object[] getParameters() {
        return parameters;
    }

    public void setParameters(Object[] parameters) {
        this.parameters = parameters;
    }
}
RpcRequest

響應訊息:

public class RpcResponse {

    private String requestId;
    private String error;
    private Object result;

    public boolean isError() {
        return error != null;
    }

    public String getRequestId() {
        return requestId;
    }

    public void setRequestId(String requestId) {
        this.requestId = requestId;
    }

    public String getError() {
        return error;
    }

    public void setError(String error) {
        this.error = error;
    }

    public Object getResult() {
        return result;
    }

    public void setResult(Object result) {
        this.result = result;
    }
}
RpcResponse

訊息序列化和反序列化工具:(基於 Protostuff 實現)

public class SerializationUtil {

    private static Map<Class<?>, Schema<?>> cachedSchema = new ConcurrentHashMap<>();

    private static Objenesis objenesis = new ObjenesisStd(true);

    private SerializationUtil() {
    }

    @SuppressWarnings("unchecked")
    private static <T> Schema<T> getSchema(Class<T> cls) {
        Schema<T> schema = (Schema<T>) cachedSchema.get(cls);
        if (schema == null) {
            schema = RuntimeSchema.createFrom(cls);
            if (schema != null) {
                cachedSchema.put(cls, schema);
            }
        }
        return schema;
    }

    /**
     * 序列化(物件 -> 位元組陣列)
     */
    @SuppressWarnings("unchecked")
    public static <T> byte[] serialize(T obj) {
        Class<T> cls = (Class<T>) obj.getClass();
        LinkedBuffer buffer = LinkedBuffer.allocate(LinkedBuffer.DEFAULT_BUFFER_SIZE);
        try {
            Schema<T> schema = getSchema(cls);
            return ProtostuffIOUtil.toByteArray(obj, schema, buffer);
        } catch (Exception e) {
            throw new IllegalStateException(e.getMessage(), e);
        } finally {
            buffer.clear();
        }
    }

    /**
     * 反序列化(位元組陣列 -> 物件)
     */
    public static <T> T deserialize(byte[] data, Class<T> cls) {
        try {
            T message = (T) objenesis.newInstance(cls);
            Schema<T> schema = getSchema(cls);
            ProtostuffIOUtil.mergeFrom(data, message, schema);
            return message;
        } catch (Exception e) {
            throw new IllegalStateException(e.getMessage(), e);
        }
    }
}
SerializationUtil

由於處理的是TCP訊息,本人加了TCP的粘包處理Handler

channel.pipeline().addLast(new LengthFieldBasedFrameDecoder(65536,0,4,0,0))

訊息編解碼時開始4個位元組表示訊息的長度,也就是訊息編碼的時候,先寫訊息的長度,再寫訊息。

6、效能改進

1)服務端請求非同步處理

Netty本身就是一個高效能的網路框架,從網路IO方面來說並沒有太大的問題。

從這個RPC框架本身來說,在原文的基礎上把Server端處理請求的過程改成了多執行緒非同步:

 public void channelRead0(final ChannelHandlerContext ctx,final RpcRequest request) throws Exception {
        RpcServer.submit(new Runnable() {
            @Override
            public void run() {
                LOGGER.debug("Receive request " + request.getRequestId());
                RpcResponse response = new RpcResponse();
                response.setRequestId(request.getRequestId());
                try {
                    Object result = handle(request);
                    response.setResult(result);
                } catch (Throwable t) {
                    response.setError(t.toString());
                    LOGGER.error("RPC Server handle request error",t);
                }
                ctx.writeAndFlush(response).addListener(ChannelFutureListener.CLOSE).addListener(new ChannelFutureListener() {
                    @Override
                    public void operationComplete(ChannelFuture channelFuture) throws Exception {
                        LOGGER.debug("Send response for request " + request.getRequestId());
                    }
                });
            }
        });
    }

Netty 4中的Handler處理在IO執行緒中,如果Handler處理中有耗時的操作(如資料庫相關),會讓IO執行緒等待,影響效能。

2)服務端長連線的管理

 客戶端保持和服務進行長連線,不需要每次呼叫服務的時候進行連線,長連線的管理(通過Zookeeper獲取有效的地址)。

通過監聽Zookeeper服務節點值的變化,動態更新客戶端和服務端保持的長連線。這個事情現在放在客戶端在做,客戶端保持了和所有可用服務的長連線,給客戶端和服務端都造成了壓力,需要解耦這個實現。

3)客戶端請求非同步處理

客戶端請求非同步處理的支援,不需要同步等待:傳送一個非同步請求,返回Feature,通過Feature的callback機制獲取結果。

IAsyncObjectProxy client = rpcClient.createAsync(HelloService.class);
RPCFuture helloFuture = client.call("hello", Integer.toString(i));
String result = (String) helloFuture.get(3000, TimeUnit.MILLISECONDS);

個人覺得該RPC的待改進項:

* 編碼序列化的多協議支援。

專案持續更新中。

參考:

輕量級分散式 RPC 框架:http://my.oschina.net/huangyong/blog/361751

你應該知道的RPC原理:http://www.cnblogs.com/LBSer/p/4853234.html

相關推薦

一個輕量級分散式RPC框架--NettyRpc

1、背景 最近在搜尋Netty和Zookeeper方面的文章時,看到了這篇文章《輕量級分散式 RPC 框架》,作者用Zookeeper、Netty和Spring寫了一個輕量級的分散式RPC框架。花了一些時間看了下他的程式碼,寫的乾淨簡單,寫的RPC框架可以算是一個簡易版的dubbo。這個RPC框架雖小,但是

輕量級分散式RPC框架實現(續)

1、背景 最近在搜尋Netty和Zookeeper方面的文章時,看到了這篇文章《輕量級分散式 RPC 框架》,作者用Zookeeper、Netty和Spring寫了一個輕量級的分散式RPC框架。花了一些時間看了下他的程式碼,寫的乾淨簡單,寫的RPC框架可以算是一個簡易版的

輕量級分散式 RPC 框架 netty+protostuff+zk +Spring

RPC,即 Remote Procedure Call(遠端過程呼叫),說得通俗一點就是:呼叫遠端計算機上的服務,就像呼叫本地服務一樣。 RPC 可基於 HTTP 或 TCP 協議,Web Service 就是基於 HTTP 協議的 RPC,它具有良好的跨平臺性,但其效能卻不如基於 TCP 協議的 RP

如何實現一個分散式 RPC 框架

遠端過程呼叫(Remote Procedure Call,RPC)是一個計算機通訊協議。該協議允許運行於一臺計算機的程式呼叫另一臺計算機的子程式,而程式設計師無需額外地為這個互動作用程式設計。RPC的主要目標是讓構建分散式應用更加容易,在提供強大的遠端呼叫能力的同時不損失

三百行代碼完成一個簡單的rpc框架

rpc dubbo demo 花了半天的時間寫了個簡單的rpc框架,是因為我最初看dubbo源碼的時候發現dubbo雖然看起來很龐大,但是隱隱約約總感覺,其實其絕大多數功能,都是基於可擴張性和服務治理的需要而編寫的。我看過dubbo和grpc的源碼,這兩個都是非常優秀的rpc框架,但是為了讓初學r

如何實現一個TCC分散式事務框架的一點思考

一個TCC事務框架需要解決的當然是分散式事務的管理。關於TCC事務機制的介紹,可以參考TCC事務機制簡介。 TCC事務模型雖然說起來簡單,然而要基於TCC實現一個通用的分散式事務框架,卻比它看上去要複雜的多,不只是簡單的呼叫一下Confirm/Cancel業務就可以了的。 本文將以Spring容器為例,試圖

從零寫分散式RPC框架 系列 第一版 (1)架構設計

本系列文章的目的是搭建出一個基於Netty,Zookeeper和SpringBoot的簡易分散式RPC框架,並且釋出到Maven中央倉庫以 spring-boot-starter 的形式對外提供開箱即用的服務。1.0 版本使用 protobuf 來做序列化,最終的使用形式比較接近於 Du

從零寫分散式RPC框架 系列 1.0 (2)RPC-Common模組設計實現

RPC-Common模組提供RPC-Server和RPC-Client的通用物件,封裝統一規則,使RPC Server和RPC Client 可以基於同一協議通訊。主要包含底層通訊的Netty所需的編碼解碼器(RpcEncoder,RpcDecoder),實現自定義協議的傳輸物件(Rpc

從零寫分散式RPC框架 系列 1.0 (5)整合測試

本篇將對前面幾篇模組作整合處理,使用spring-boot-starter的形式進行釋出。然後新建 examples 工程模組對其測試使用。 系列文章: 從零寫分散式RPC框架 系列 1.0 (1)架構設計 從零寫分散式RPC框架 系列 1.0 (2)RPC-Common模組設計

從零寫分散式RPC框架 系列 1.0 (4)RPC-Client模組設計實現

RPC-Client模組負責建立 動態代理物件 供 服務消費者 使用,而動態代理物件的方法執行則是通過RPC呼叫RPC-Server的服務實現。即RPC-Client遮蔽了底層的通訊過程,使得服務消費者可以基於介面透明使用服務提供者的服務。 系列文章: 從零寫分散式RPC框架 系

從零寫分散式RPC框架 系列 1.0 (3)RPC-Server模組設計實現

RPC-Server模組負責(1)將@RpcService註解標記的服務和自身資訊註冊到ZK叢集,(2)對外提供RPC服務實現,處理來自RPC-Client的請求。該模組整體的核心類為 RpcServer ,而真正處理請求的核心類是 RpcServerHandler 。另外還有一個 ZK

使用akka實現一個簡單的RPC框架(一)

一、概述 目前大多數的分散式架構底層通訊都是通過RPC實現的,RPC框架非常多,比如前我們學過的Hadoop專案的RPC通訊框架,但是Hadoop在設計之初就是為了執行長達數小時的批量而設計的,在某些極端的情況下,任務提交的延遲很高,所有Hadoop的RPC顯得有些笨重。

Motan 1.1.3 釋出,微博開源的高效能分散式 RPC 框架

   Motan 1.1.3 已釋出,這是一個小的修復版本,主要是解決了與 zookeeper string serializer 的相容問題。#707 Motan 是微博團隊開源的一套高效能、易於使用的分散式 RPC 框架。功能包括: 支援通過 spring 配置方式整合

從零寫分散式RPC框架 系列 2.0 (4)使用BeanPostProcessor實現自定義@RpcReference註解注入

之前服務提供方 RpcServer 我們是使用 ApplicationContextAware 來掃描 @RpcService 註解,新增一個註解即可實現服務暴露。現在,我們用 BeanPostProcessor 來實現服務注入,自動將服務實現類注入到被@RpcReference註解標記

從零寫分散式RPC框架 系列 2.0 (3)RPC-Server和RPC-Client模組改造

2.0版本RPC-Server改動不大,主要變化在於RPC-Client使用了服務地址快取,並引入監控機制,第一時間獲取zk叢集中服務地址資訊變化並重新整理本地快取。另外,RPC-Client還使用了RpcClientProperties開放對負載均衡策略和序列化策略的選擇。 系列文

從零寫分散式RPC框架 系列 2.0 (2)RPC-Common模組設計實現

RPC-Common模組相對於1.0版本複雜了很多,最主要的變化在於將 Rpc的Netty處理器從RPC-Server和RPC-Client收回。1.0 版本的設計思路是儘可能減少冗餘依賴,所以RPC-Common一般只放通用的功能。現在則是儘可能都放在RPC-Common模組,以方便工

從零寫分散式RPC框架 系列 2.0 (1)架構升級

針對1.0版本的效能問題,本版本做了從服務地址列表快取等方面做了優化處理,並加入負載均衡引擎、序列化引擎、服務端限流等新功能,並對通訊模型進行改造,使其支援新特性、避免粘包半包問題並對後續升級改造留下支援空間。具體可見 專案GitHub地址 。本文將介紹 2.0 版本的邏輯架構和模型設計

分散式RPC框架效能大比拼

來源:http://colobu.com/2016/09/05/benchmarks-of-popular-rpc-frameworks/ Dubbo 是阿里巴巴公司開源的一個Java高效能優秀的服務框架,使得應用可通過高效能的 RPC 實現服務的輸出和輸入功能,可以和 Spring框架無縫整合。

簡單分享一個輕量級自動化測試框架目錄結構設計

很多人在做自動化測試的過程中會遇到一個瓶頸,就是能夠寫指令碼,但是不知道怎麼去組織程式碼,怎麼搭建測試框架,今天博主就放點乾貨,分享一個輕量級的自動化測試框架的目錄結構,如下圖: 分層如下

自己實現一個簡單的RPC框架

RPC的全稱是Remote Procedure Call,它是一種程序間的通訊方式。允許像呼叫本地服務一樣呼叫遠端服務。 對於RPC的總結: 簡單:RPC概念的語義十分簡單和清晰,這樣建立分散式計算更容易。 高效:過程呼叫看起來十分簡單而且十分高效。