1. 程式人生 > >解密Dubbo:自己動手編寫RPC框架

解密Dubbo:自己動手編寫RPC框架

用,現在我們就來動手自己編寫一個RPC框架,通過這篇文章的學習,你將學習到

  • 分散式系統的概念
  • RPC遠端方法呼叫的應用
  • Dubbo的原理深入理解

當然,如果要完全自己編寫一個RPC框架,我們需要掌握以下知識點

  • 網路程式設計(網路通訊) 本文將使用netty4網路通訊框架
  • 多執行緒相關知識
  • 反射相關知識
  • jdk的動態代理
  • Spring框架的相關知識

如果對於上述的知識點有一部分不是很理解,也不會影響你閱讀本文和對Dubbo的RPC呼叫原理的理解

好了,我們先來簡單的描述一下整個RPC呼叫的業務流程圖

rpc通訊模型.png

為了可以實現上面的RPC呼叫,我們建立的RPC框架的模組之間的關係圖如下:

RPC框架流程圖.png

對於上面的每個模組的具體作用,使用一個表格簡單的進行描述

模組名稱 主要功能
rpc-register 主要完成可註冊中心Zookeeper的互動<br />RPC服務端使用該模組往註冊中心註冊地址和埠<br />RPC客戶端通過該模組獲取實時已近註冊的服務地址和埠
rpc-common 定義RPC通訊的請求訊息和響應訊息的規則,以及訊息的序列化和反序列化的幫助類
rpc-server RPC服務端,啟動RPC服務,掃描app-server中的所有可以提供的服務列表並儲存<br />接受RPC客戶端的訊息並且通過反射呼叫具體的方法<br/>響應RPC客戶端,把方法執行結果返回到RPC客戶端
rpc-client RPC客戶端,通過網路通訊往RPC服務端傳送請求呼叫訊息<br/>接受服務端的響應訊息<br/>配置動態代理類,所有的方法呼叫都通過網路呼叫傳送到RPC服務端
app-common 具體的應用中的介面和JavaBean物件,類似於service模組和bean模組
app-server 通過Spring的配置啟動SpringContext,並且配置RpcServer和RpcRegistry Bean物件的建立<br />實現app-common中的介面,並且在介面上添加註解@RpcService(IProductService.class)可以讓RPCServer識別到該服務<br />啟動服務
app-client 通過Spring的配置建立RpcDiscover物件和RpcProxy物件,其中RpcDiscover用於從註冊中心獲取到服務的地址資訊,RpcProxy用於建立類的動態代理物件

接下來我們來看一下具體的實現程式碼

  1. rpc-register

    這個模組使用者和註冊中心進行互動,主要包括三個類

    • Constant常量定義,設定連線ZKServer的相關引數
    • RpcRegistry:往註冊中心ZKServer設定地址資訊,RPC-Server需要使用
    • RpcDiscover: 從註冊中心ZKServer獲取服務端的網路地址資訊 RPC-client需要使用

    具體的實現程式碼

    package cn.wolfcode.rpc.register;
    public interface Constant {
        //定義客戶端連線session會話超時時間,單位為毫秒,該值的設定和zkServer設定的心跳時間有關係
        int SESSION_TIMEOUT=4000;
        // 定義用於儲存rpc通訊服務端的地址資訊的目錄
        String REGISTRY_PATH="/rpc";
        // 定義資料存放的具體目錄
        String DATA_PATH=REGISTRY_PATH+"/data";
    }
    
    package cn.wolfcode.rpc.register;
    
    import lombok.AllArgsConstructor;
    import lombok.Getter;
    import lombok.NoArgsConstructor;
    import lombok.Setter;
    import org.apache.zookeeper.*;
    import org.apache.zookeeper.data.Stat;
    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;
    
    @Setter@Getter
    @AllArgsConstructor()
    @NoArgsConstructor
    public class RpcRegistry {
    
        public static final Logger LOGGER=LoggerFactory.getLogger(RpcRegistry.class);
        //zkServer的地址資訊
        private String registryAddress;
        //zk客戶端程式
        private  ZooKeeper zooKeeper;
    
        public void createNode(String data) throws Exception{
            //建立一個客戶端程式, 對於註冊可以不用監聽事件
            zooKeeper= new ZooKeeper(registryAddress, Constant.SESSION_TIMEOUT, new Watcher() {
                @Override
                public void process(WatchedEvent event) {
                }
            });
            if(zooKeeper!=null){
                try{
                    //判斷註冊的目錄是否存在
                    Stat stat = zooKeeper.exists(Constant.REGISTRY_PATH, false);
                    if(stat==null){
                        //如果不存在, 建立一個持久的節點目錄
                        zooKeeper.create(Constant.REGISTRY_PATH,null,ZooDefs.Ids.OPEN_ACL_UNSAFE,CreateMode.PERSISTENT);
                    }
                    //建立一個臨時的序列節點,並且儲存資料資訊
                    zooKeeper.create(Constant.DATA_PATH,data.getBytes(),ZooDefs.Ids.OPEN_ACL_UNSAFE,CreateMode.EPHEMERAL_SEQUENTIAL);
                }catch (Exception e){
                    LOGGER.error("",e);
                    e.printStackTrace();
                }
            }else{
                LOGGER.debug("zooKeeper connect is null");
            }
        }
        //測試程式
        public static void main(String[] args) throws Exception {
            RpcRegistry rpcRegistry = new RpcRegistry();
            rpcRegistry.setRegistryAddress("192.168.158.151:2181");
            rpcRegistry.createNode("testdata");
            //讓程式等待輸入,程式一直處於執行狀態
            System.in.read();
        }
    }
    
    package cn.wolfcode.rpc.register;
    import lombok.Getter;
    import lombok.Setter;
    import org.apache.zookeeper.WatchedEvent;
    import org.apache.zookeeper.Watcher;
    import org.apache.zookeeper.ZooKeeper;
    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;
    import java.util.ArrayList;
    import java.util.List;
    import java.util.Random;
    @Setter
    @Getter
    //地址發現,用於實時的獲取最新的RPC服務資訊
    public class RpcDiscover {
        public static final Logger LOGGER=LoggerFactory.getLogger(RpcRegistry.class);
        //服務端地址 zkServer的地址
        private String registryAddress;
        //獲取到的所有提供服務的伺服器列表
        private volatile List<String> dataList=new ArrayList<>();
    
        private ZooKeeper  zooKeeper=null;
    
        //初始化zkClient客戶端
        public RpcDiscover(String registryAddress) throws Exception {
            this.registryAddress = registryAddress;
            zooKeeper=new ZooKeeper(registryAddress, Constant.SESSION_TIMEOUT, new Watcher() {
                @Override
                public void process(WatchedEvent watchedEvent) {
                    if(watchedEvent.getType()==Event.EventType.NodeChildrenChanged){
                        //監聽zkServer的伺服器列表變化
                        watchNode();
                    }
                }
            });
            //獲取節點相關資料
            watchNode();
        }
        // 從dataList列表隨機獲取一個可用的服務端的地址資訊給rpc-client
        public String discover(){
            int size=dataList.size();
            if(size>0){
                int index= new Random().nextInt(size);
                return dataList.get(index);
            }
            throw new RuntimeException("沒有找到對應的伺服器");
        }
    
        //監聽服務端的列表資訊
        private void watchNode(){
            try{
                //獲取子節點資訊
                List<String> nodeList = zooKeeper.getChildren(Constant.REGISTRY_PATH, true);
                List<String> dataList=new ArrayList<>();
                for (String node : nodeList) {
                    byte[] bytes = zooKeeper.getData(Constant.REGISTRY_PATH + "/" + node, false, null);
                    dataList.add(new String(bytes));
                }
                this.dataList=dataList;
            }catch (Exception e){
                LOGGER.error("",e);
                e.printStackTrace();
            }
        }
    
        //測試程式
        public static void main(String[] args) throws Exception {
            //列印獲取到的連線地址資訊
            System.out.println(new RpcDiscover("192.168.158.151:2181").discover());
            System.in.read();
        }
    }
    
  2. rpc-common

    定義RPC通訊的請求訊息和響應訊息的規則,以及訊息的序列化和反序列化的幫助類,主要包括

    • RpcRequest 請求訊息封裝物件
    • RpcResponse 響應訊息封裝物件
    • SerializationUtil 訊息的序列化,煩序列化幫助類
    • RpcEncoder 把訊息物件轉換為位元組陣列進行通訊
    • RpcDecoder 把獲取到的位元組陣列轉換為對應的訊息物件

    具體程式碼如下

    package cn.wolfcode.rpc.common;
    
    import lombok.*;
    
    @Setter
    @Getter
    @NoArgsConstructor
    @AllArgsConstructor
    @ToString
    //RPC通訊的資料請求規則
    public class RpcRequest {
        // 請求訊息的訊息Id
        private String requestId;
        // 請求的具體的類名(介面名稱)
        private String className;
        // 請求的具體的方法名稱
        private String methodName;
        // 請求的方法引數型別列表
        private Class<?>[] parameterTypes;
        // 請求的方法引數列表
        private Object[] parameters;
    }
    
    package cn.wolfcode.rpc.common;
    
    import lombok.*;
    
    @Setter
    @Getter
    @NoArgsConstructor
    @AllArgsConstructor
    @ToString
    //RPC通訊訊息的響應資料規則
    public class RpcResponse {
        //響應的訊息id
        private String responseId;
        //請求的訊息id
        private String requestId;
        // 響應的訊息是否成功
        private boolean success;
        // 響應的資料結果
        private Object result;
        // 如果有異常資訊,在該物件中記錄異常資訊
        private Throwable throwable;
    }
    
    package cn.wolfcode.rpc.common;
    
    import com.dyuproject.protostuff.LinkedBuffer;
    import com.dyuproject.protostuff.ProtostuffIOUtil;
    import com.dyuproject.protostuff.Schema;
    import com.dyuproject.protostuff.runtime.RuntimeSchema;
    import org.objenesis.Objenesis;
    import org.objenesis.ObjenesisStd;
    
    import java.util.Map;
    import java.util.concurrent.ConcurrentHashMap;
    
    /**
     * 序列化工具類(基於 Protostuff 實現) 用於把物件序列化位元組陣列, 把位元組陣列反序列化物件
     */
    public class SerializationUtil {
    
        private static Map<Class<?>, Schema<?>> cachedSchema = new ConcurrentHashMap<Class<?>, Schema<?>>();
    
        private static Objenesis objenesis = new ObjenesisStd(true);
    
        private SerializationUtil() {
        }
        /**
         * 獲取類的schema
         * @param cls
         * @return
         */
        @SuppressWarnings("unchecked")
        private static <T> Schema<T> getSchema(Class<T> cls) {
            Schema<T> schema = (Schema<T>) cachedSchema.get(cls);
            if (schema == null) {
                schema = RuntimeSchema.createFrom(cls);
                    if (schema != null) {
                    cachedSchema.put(cls, schema);
                }
            }
            return schema;
        }
    
        /**
         * 序列化(物件 -> 位元組陣列)
         */
        @SuppressWarnings("unchecked")
        public static <T> byte[] serialize(T obj) {
            Class<T> cls = (Class<T>) obj.getClass();
            LinkedBuffer buffer = LinkedBuffer.allocate(LinkedBuffer.DEFAULT_BUFFER_SIZE);
            try {
                Schema<T> schema = getSchema(cls);
                return ProtostuffIOUtil.toByteArray(obj, schema, buffer);//序列化
            } catch (Exception e) {
                throw new IllegalStateException(e.getMessage(), e);
            } finally {
                buffer.clear();
            }
        }
    
        /**
         * 反序列化(位元組陣列 -> 物件)
         */
        public static <T> T deserialize(byte[] data, Class<T> cls) {
            try {
                /*
                 * 如果一個類沒有引數為空的構造方法時候,那麼你直接呼叫newInstance方法試圖得到一個例項物件的時候是會丟擲異常的
                 * 通過ObjenesisStd可以完美的避開這個問題
                 * */
                T message = (T) objenesis.newInstance(cls);//例項化
                Schema<T> schema = getSchema(cls);//獲取類的schema
                ProtostuffIOUtil.mergeFrom(data, message, schema);
                return message;
            } catch (Exception e) {
                throw new IllegalStateException(e.getMessage(), e);
            }
        }
    }
    
    package cn.wolfcode.rpc.common;
    
    import io.netty.buffer.ByteBuf;
    import io.netty.channel.ChannelHandlerContext;
    import io.netty.handler.codec.MessageToByteEncoder;
    
    //對傳遞的訊息進行編碼, 因為是請求/響應物件的傳遞,先編碼為位元組陣列在傳送到伺服器解碼
    public class RpcEncoder extends MessageToByteEncoder {
        // 傳遞的資料的物件型別
        private Class genericClass;
    
        public RpcEncoder(Class genericClass) {
            this.genericClass = genericClass;
        }
    
        @Override
        protected void encode(ChannelHandlerContext ctx, Object msg, ByteBuf out) throws Exception {
            if(genericClass.isInstance(msg)){
                //序列化請求訊息為位元組陣列
                byte[] bytes = SerializationUtil.serialize(msg);
                // 把資料寫入到下一個通道(channel)或者是發往服務端
                out.writeBytes(bytes);
            }
        }
    }
    
    package cn.wolfcode.rpc.common;
    
    import io.netty.buffer.ByteBuf;
    import io.netty.channel.ChannelHandlerContext;
    import io.netty.handler.codec.ByteToMessageDecoder;
    
    import java.util.List;
    //對傳遞的訊息進行解碼, 接受到的資料是位元組陣列,需要把陣列轉換為對應的請求/響應訊息物件
    public class RpcDecoder extends ByteToMessageDecoder {
    
        private Class<?> genericClass;
    
        public RpcDecoder(Class<?> genericClass) {
            this.genericClass = genericClass;
        }
    
        @Override
        //解碼方法,把位元組陣列轉換為訊息物件
        protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
            //訊息的長度
            int size=in.readableBytes();
            if(size<4){//保證所有的訊息都完全接受完成
                return;
            }
            byte[] bytes =new byte[size];
            //把傳遞的位元組陣列讀取到bytes中
            in.readBytes(bytes);
            // 反序列化為物件(RPCRequest/RPCResponse物件)
            Object object = SerializationUtil.deserialize(bytes, genericClass);
            //輸出物件
            out.add(object);
            //重新整理快取
            ctx.flush();
        }
    }
    
  3. rpc-server

    ​ RPC服務端,啟動RPC服務,掃描app-server中的所有可以提供的服務列表並儲存,接受RPC客戶端的訊息並且通過反射呼叫具體的方法,響應RPC客戶端,把方法執行結果返回到RPC客戶端

    主要包括:

    • RpcService 定義一個註解,用於標記服務程式的提供者,通過Spring掃描出所有的服務並且儲存
    • RpcServerHandler 處理RPC客戶端請求,呼叫服務提供者的具體方法,響應執行結果
    • RpcServer 掃描所有的服務(標記了@RPCService的類),啟動RPC服務
    package cn.wolfcode.rpc.server;
    
    import java.lang.annotation.ElementType;
    import java.lang.annotation.Retention;
    import java.lang.annotation.RetentionPolicy;
    import java.lang.annotation.Target;
    
    /**
     * 這個註解用於貼在每個提供服務的實現類,
     * 在Spring容器啟動的時候,自動掃描到貼了該註解的所有的服務
     */
    @Retention(RetentionPolicy.RUNTIME)
    @Target({ElementType.TYPE})
    public @interface RpcService {
        public Class<?> value();
    }
    
    package cn.wolfcode.rpc.server;
    
    import cn.wolfcode.rpc.common.RpcDecoder;
    import cn.wolfcode.rpc.common.RpcEncoder;
    import cn.wolfcode.rpc.common.RpcRequest;
    import cn.wolfcode.rpc.common.RpcResponse;
    import cn.wolfcode.rpc.register.RpcRegistry;
    import io.netty.bootstrap.ServerBootstrap;
    import io.netty.channel.ChannelFuture;
    import io.netty.channel.ChannelInitializer;
    import io.netty.channel.ChannelOption;
    import io.netty.channel.nio.NioEventLoopGroup;
    import io.netty.channel.socket.SocketChannel;
    import io.netty.channel.socket.nio.NioServerSocketChannel;
    import lombok.AllArgsConstructor;
    import lombok.Getter;
    import lombok.NoArgsConstructor;
    import lombok.Setter;
    import org.apache.commons.collections4.MapUtils;
    import org.springframework.beans.BeansException;
    import org.springframework.beans.factory.InitializingBean;
    import org.springframework.context.ApplicationContext;
    import org.springframework.context.ApplicationContextAware;
    
    import java.util.HashMap;
    import java.util.Map;
    
    @Setter
    @Getter
    @NoArgsConstructor
    @AllArgsConstructor
    //RPC服務端啟動,實現Spring的感知介面
    public class RpcServer implements ApplicationContextAware,InitializingBean {
        //用於儲存所有提供服務的方法, 其中key為類的全路徑名, value是所有的實現類
        private final Map<String,Object> serviceBeanMap=new HashMap<>();
        //rpcRegistry 用於註冊相關的地址資訊
        private RpcRegistry rpcRegistry;
        //提供服務的地址資訊 格式為 192.168.158.151:9000 類似
        private String serverAddress;
        //在Spring容器啟動完成後會執行該方法
        @Override
        public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
            //獲取到所有貼了RpcService註解的Bean物件
            Map<String, Object> serviceBeanMap = applicationContext.getBeansWithAnnotation(RpcService.class);
            if(MapUtils.isNotEmpty(serviceBeanMap)){
                for (Object object : serviceBeanMap.values()) {
                    //獲取到類的路徑名稱
                    String serviceName = object.getClass().getAnnotation(RpcService.class).value().getName();
                    //把獲取到的資訊儲存到serviceBeanMap中
                    this.serviceBeanMap.put(serviceName,object);
                }
            }
            System.out.println("伺服器: "+serverAddress +" 提供的服務列表: "+ serviceBeanMap );
        }
        // 初始化完成後執行
        @Override
        public void afterPropertiesSet() throws Exception {
            //建立服務端的通訊物件
            ServerBootstrap server = new ServerBootstrap();
            // 建立非同步通訊的事件組 用於建立TCP連線的
            NioEventLoopGroup bossGroup = new NioEventLoopGroup();
            // 建立非同步通訊的事件組 用於處理Channel(通道)的I/O事件
            NioEventLoopGroup workerGroup = new NioEventLoopGroup();
            try{
                //開始設定server的相關引數
                server.group(bossGroup,workerGroup)
                        //啟動非同步ServerSocket
                        .channel(NioServerSocketChannel.class)
                        //初始化通道資訊
                        .childHandler(new ChannelInitializer<SocketChannel>() {
                            @Override
                            protected void initChannel(SocketChannel ch) throws Exception {
                                ch.pipeline().addLast(new RpcDecoder(RpcRequest.class))//1 解碼請求引數
                                        .addLast(new RpcEncoder(RpcResponse.class))//2 編碼響應資訊
                                        .addLast(new RpcServerHandler(serviceBeanMap));//3 請求處理
                            }
                        }).option(ChannelOption.SO_BACKLOG, 128)
                        .childOption(ChannelOption.SO_KEEPALIVE, true);;
                String host=serverAddress.split(":")[0] ;//獲取到主機地址
                int port=Integer.valueOf(serverAddress.split(":")[1]);//埠
                ChannelFuture future = server.bind(host, port).sync();//開啟非同步通訊服務
                System.out.println("伺服器啟動成功:"+future.channel().localAddress());
                rpcRegistry.createNode(serverAddress);
                System.out.println("向zkServer註冊服務地址資訊");
                future.channel().closeFuture().sync();//等待通訊完成
            }catch (Exception e){
                e.printStackTrace();
            }finally {
                //優雅的關閉socket
                bossGroup.shutdownGracefully();
                workerGroup.shutdownGracefully();
            }
        }
    }
    
   package cn.wolfcode.rpc.server;


   import cn.wolfcode.rpc.common.RpcRequest;
   import cn.wolfcode.rpc.common.RpcResponse;
   import io.netty.channel.ChannelFutureListener;
   import io.netty.channel.ChannelHandlerContext;
   import io.netty.channel.ChannelInboundHandlerAdapter;
   import lombok.Getter;
   import lombok.NoArgsConstructor;
   import lombok.Setter;

   import java.lang.reflect.Method;
   import java.util.Map;
   import java.util.UUID;

   @Setter
   @Getter
   @NoArgsConstructor
   public class RpcServerHandler extends ChannelInboundHandlerAdapter{

       private Map<String,Object> serviceBeanMap;

       public RpcServerHandler(Map<String, Object> serviceBeanMap) {
           this.serviceBeanMap = serviceBeanMap;
       }

       @Override
       public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
           System.out.println("RpcServerHandler.channelRead");
           System.out.println(msg);
           RpcRequest rpcRequest= (RpcRequest) msg;
           RpcResponse rpcResponse=handler(rpcRequest);
           //告訴客戶端,關閉socket連線
           ctx.writeAndFlush(rpcResponse).addListener(ChannelFutureListener.CLOSE);
       }

       private RpcResponse handler(RpcRequest rpcRequest) {
           //建立一個響應訊息物件
           RpcResponse rpcResponse =new RpcResponse();
           //設定響應訊息ID
           rpcResponse.setResponseId(UUID.randomUUID().toString());
           //請求訊息ID
           rpcResponse.setRequestId(rpcRequest.getRequestId());
           try{
               //獲取到類名(介面名稱)
               String className = rpcRequest.getClassName();
               //獲取到方法名
               String methodName = rpcRequest.getMethodName();
               //獲取到引數型別列表
               Class<?>[] parameterTypes = rpcRequest.getParameterTypes();
               //獲取到引數列表
               Object[] parameters = rpcRequest.getParameters();
               //獲取到具位元組碼物件
               Class<?> clz = Class.forName(className);
               //獲取到實現類
               Object serviceBean = serviceBeanMap.get(className);
               if(serviceBean==null){
                   throw  new RuntimeException(className+"沒有找到對應的serviceBean:"+className+":beanMap:"+serviceBeanMap);
               }
               //反射呼叫方法
               Method method = clz.getMethod(methodName, parameterTypes);
               if(method==null)
                   throw new RuntimeException("沒有找到對應的方法");
               Object result = method.invoke(serviceBean, parameters);
               rpcResponse.setSuccess(true);
               //設定方法呼叫的結果
               rpcResponse.setResult(result);
           }catch (Exception e){
               rpcResponse.setSuccess(false);
               rpcResponse.setThrowable(e);
               e.printStackTrace();
           }
           return rpcResponse;
       }
   }
  1. rpc-client

    ​ RPC客戶端,通過網路通訊往RPC服務端傳送請求呼叫訊息,接受服務端的響應訊息,配置動態代理類,所有的方法呼叫都通過網路呼叫傳送到RPC服務端

    其中包括的主要程式碼:

    • RpcProxy 對於每一個類都建立一個動態代理物件,並且在invoke方法建立rpc客戶端並且傳送網路通訊請求
    • RpcClient RPC通訊客戶端,啟動RPC通訊服務,建立TCP連線,傳送請求,接受響應

    具體實現程式碼:

   package cn.wolfcode.rpc.client;


   import cn.wolfcode.rpc.common.RpcDecoder;
   import cn.wolfcode.rpc.common.RpcEncoder;
   import cn.wolfcode.rpc.common.RpcRequest;
   import cn.wolfcode.rpc.common.RpcResponse;
   import cn.wolfcode.rpc.register.RpcDiscover;
   import io.netty.bootstrap.Bootstrap;
   import io.netty.channel.*;
   import io.netty.channel.nio.NioEventLoopGroup;
   import io.netty.channel.socket.SocketChannel;
   import io.netty.channel.socket.nio.NioSocketChannel;
   //RPC通訊客戶端,往服務端傳送請求,並且接受服務端的響應
   public class RpcClient extends SimpleChannelInboundHandler<RpcResponse> {
       //訊息響應物件
       private RpcResponse rpcResponse;
       //訊息請求物件
       private RpcRequest rpcRequest;
       // 同步鎖 資源物件
       private Object object=new Object();
       // 用於獲取服務地址列表資訊
       private RpcDiscover rpcDiscover;
       //建構函式
       public RpcClient(RpcRequest rpcRequest,RpcDiscover rpcDiscover) {
           this.rpcDiscover = rpcDiscover;
           this.rpcRequest=rpcRequest;
       }
       @Override
       protected void channelRead0(ChannelHandlerContext ctx, RpcResponse msg) throws Exception {
           this.rpcResponse=msg;//響應訊息
           synchronized (object){
               ctx.flush();//重新整理快取
               object.notifyAll();//喚醒等待
           }
       }
       //傳送訊息
       public RpcResponse send()  throws Exception {
           //建立一個socket通訊物件
           Bootstrap client = new Bootstrap();
           //建立一個通訊組,負責Channel(通道)的I/O事件的處理
           NioEventLoopGroup loopGroup = new NioEventLoopGroup();
           try{
               client.group(loopGroup)//設定引數
                       .channel(NioSocketChannel.class)//使用非同步socket通訊
                       .handler(new ChannelInitializer<SocketChannel>() {
                           @Override
                           protected void initChannel(SocketChannel ch) throws Exception {
                               ch.pipeline().addLast(new RpcEncoder(RpcRequest.class))//編碼請求物件
                                       .addLast(new RpcDecoder(RpcResponse.class))//解碼響應物件
                                       .addLast(RpcClient.this);//傳送請求物件
                           }
                       }).option(ChannelOption.SO_KEEPALIVE, true);;
               String serverAddress = rpcDiscover.discover();//獲取一個伺服器地址
               String host=serverAddress.split(":")[0];
               int port=Integer.valueOf(serverAddress.split(":")[1]);
               ChannelFuture future = client.connect(host,port).sync();
               System.out.println("客戶端準備傳送資料:"+rpcRequest);
               future.channel().writeAndFlush(rpcRequest).sync();
               synchronized (object){
                   object.wait();//執行緒等待,等待客戶端響應
               }
               if (rpcResponse != null) {
                   future.channel().closeFuture().sync();//等待服務端關閉socket
               }
               return rpcResponse;
           }finally {
               loopGroup.shutdownGracefully();//優雅關閉socket
           }
       }

       /**
        * 異常處理
        */
       @Override
       public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause)
               throws Exception {
           ctx.close();
       }
   }
   package cn.wolfcode.rpc.client;


   import cn.wolfcode.rpc.common.RpcRequest;
   import cn.wolfcode.rpc.common.RpcResponse;
   import cn.wolfcode.rpc.register.RpcDiscover;
   import lombok.Getter;
   import lombok.Setter;

   import java.lang.reflect.InvocationHandler;
   import java.lang.reflect.Method;
   import java.lang.reflect.Proxy;
   import java.util.UUID;

   @Setter
   @Getter
   //動態代理類,用於獲取到每個類的代理物件
   //對於被代理物件的所有的方法呼叫都會執行invoke方法
   public class RpcProxy {
       //用於獲取到RPC-Server的地址資訊
       private RpcDiscover rpcDiscover;

       @SuppressWarnings("all")
       public <T> T getInstance(Class<T> interfaceClass){
           T instance = (T)Proxy.newProxyInstance(interfaceClass.getClassLoader(), new Class<?>[]{interfaceClass}, new InvocationHandler() {
               @Override
               public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
                   //建立請求物件
                   RpcRequest rpcRequest = new RpcRequest();
                   //獲取到被呼叫的類名 和RPC-Server中的serviceMap中的key進行匹配
                   String className=method.getDeclaringClass().getName();
                   //獲取到方法的引數列表
                   Class<?>[] parameterTypes = method.getParameterTypes();
                   //生成一個請求的id
                   rpcRequest.setRequestId(UUID.randomUUID().toString());
                   rpcRequest.setClassName(className);//類名
                   rpcRequest.setParameterTypes(parameterTypes);//引數型別列表
                   rpcRequest.setParameters(args);//引數列表
                   rpcRequest.setMethodName(method.getName());//呼叫的放方法名稱
                   RpcResponse rpcResponse = new RpcClient(rpcRequest, rpcDiscover).send();//建立一個RPCclient物件,並且傳送訊息到服務端
                   //返回呼叫結果
                   return rpcResponse.getResult();
               }
           });
           //返回一個代理物件
           return instance;
       }
   }
  1. app-common

    這是具體應用的通用模組,和具體的專案結構有關係,這裡主要包括介面定義和JavaBean物件的定義

    具體程式碼為:

    package cn.wolfcode.app.common;
    
    public interface IProductService {
        /**
         * 儲存產品
         * @param product
         */
        void save(Product product);
    
        /**
         * 根據產品id刪除產品
         * @param productId
         */
        void deleteById(Long productId);
    
        /**
         * 修改產品資訊
         * @param product
         */
        void update(Product product);
    
        /**
         * 根據產品id獲取到產品資訊
         * @param productId
         * @return
         */
        Product get(Long productId);
    }
    
    package cn.wolfcode.app.common;
    
    import lombok.*;
    
    import java.math.BigDecimal;
    
    /**
     * 產品資訊
     */
    @Setter
    @Getter
    @ToString
    @AllArgsConstructor
    @NoArgsConstructor
    public class Product {
        private Long id;//id
        private String sn;//產品編號
        private String name;//產品名稱
        private BigDecimal price;//產品價格
    }
    
  2. app-server

    這個模組主要是定義服務的具體實現和啟動Spring容器,在啟動Spring容器的時候需要建立RpcRegistry,RpcServer物件

    具體程式碼實現:

    package cn.wolfcode.app.server;
    
    import cn.wolfcode.app.common.IProductService;
    import cn.wolfcode.app.common.Product;
    import cn.wolfcode.rpc.server.RpcService;
    import org.springframework.stereotype.Component;
    
    import java.math.BigDecimal;
    
    @Component
    @RpcService(IProductService.class)
    public class ProductServiceImpl implements IProductService {
        @Override
        public void save(Product product) {
            System.out.println("產品儲存成功: "+product);
        }
    
        @Override
        public void deleteById(Long productId) {
            System.out.println("產品刪除成功: "+ productId);
        }
    
        @Override
        public void update(Product product) {
            System.out.println("產品修改成功: "+ product);
        }
    
        @Override
        public Product get(Long productId) {
            System.out.println("產品獲取成功");
            return new Product(1L,"001","膝上型電腦",BigDecimal.TEN);
        }
    }
    
    package cn.wolfcode.app.server;
    
    import org.springframework.context.support.ClassPathXmlApplicationContext;
    
    public class BootAppServer {
        public static void main(String[] args) {
            //啟動Spring容器
            new ClassPathXmlApplicationContext("classpath:application.xml");
        }
    }
    

    其中配置檔案:

    • application.xml Spring的配置檔案
    • log4j.properties 日誌配置檔案
    • rpc.properties 服務提供者的地址和埠 以及zkServer的連線地址和埠
    <?xml version="1.0" encoding="UTF-8"?>
    <beans xmlns="http://www.springframework.org/schema/beans"
           xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
           xmlns:context="http://www.springframework.org/schema/context"
           xsi:schemaLocation="http://www.springframework.org/schema/beans
           http://www.springframework.org/schema/beans/spring-beans.xsd
           http://www.springframework.org/schema/context
           http://www.springframework.org/schema/context/spring-context.xsd">
    
        <context:component-scan base-package="cn.wolfcode.app.server"/>
    
        <context:property-placeholder location="classpath:rpc.properties"/>
    
        <bean id="serviceRegistry" class="cn.wolfcode.rpc.register.RpcRegistry">
            <property name="registryAddress" value="${registry.address}"/>
        </bean>
        <bean id="rpcServer" class="cn.wolfcode.rpc.server.RpcServer">
            <property name="serverAddress" value="${server.address}"/>
            <property name="rpcRegistry" ref="serviceRegistry"/>
        </bean>
    </beans>
    
    log4j.rootLogger=ERROR,console
    
    log4j.appender.console=org.apache.log4j.ConsoleAppender
    log4j.appender.console.target=System.out
    log4j.appender.console.layout=org.apache.log4j.PatternLayout
    log4j.appender.console.layout.ConversionPattern=%m%n
    
    log4j.logger.cn.wolfcode.rpc=DEBUG
    
    # zookeeper server
    registry.address=192.168.158.151:2181
    
    # rpc server
    server.address=192.168.158.1:9090
    
  1. app-client

    通過Spring的配置建立RpcDiscover物件和RpcProxy物件,其中RpcDiscover用於從註冊中心獲取到服務的地址資訊,RpcProxy用於建立類的動態代理物件

    測試類:使用Spring的Junit進行測試

   package cn.wolfcode.app.client;

   import cn.wolfcode.app.common.IProductService;
   import cn.wolfcode.app.common.Product;
   import cn.wolfcode.rpc.client.RpcProxy;
   import org.junit.Before;
   import org.junit.Test;
   import org.junit.runner.RunWith;
   import org.springframework.beans.factory.annotation.Autowired;
   import org.springframework.test.context.ContextConfiguration;
   import org.springframework.test.context.junit4.SpringJUnit4ClassRunner;

   import java.math.BigDecimal;

   //模擬客戶端啟動
   @RunWith(SpringJUnit4ClassRunner.class)
   @ContextConfiguration(locations="classpath:application.xml")
   public class APP {
       @Autowired
       private RpcProxy rpcProxy;

       private IProductService productService;

       @Before
       public void init() {
           productService = rpcProxy.getInstance(IProductService.class);
       }


       @Test
       public void testSave() throws Exception {
           productService.save(new Product(2L,"002","內衣",BigDecimal.TEN));
       }

       @Test
       public void testDelete() throws Exception {
           productService.deleteById(2L);
       }

       @Test
       public void testUpdate() throws Exception {
           productService.update(new Product(2L,"002","內衣",BigDecimal.ONE));
       }

       @Test
       public void testGet() throws Exception {
           Product product = productService.get(1L);
           System.out.println("獲取到的產品資訊為:"+product);
       }
   }

配置檔案資訊

application.xml

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
       xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
       xmlns:context="http://www.springframework.org/schema/context"
       xsi:schemaLocation="http://www.springframework.org/schema/beans
       http://www.springframework.org/schema/beans/spring-beans.xsd
       http://www.springframework.org/schema/context
       http://www.springframework.org/schema/context/spring-context.xsd">

    <context:component-scan base-package="cn.wolfcode.app.client"/>

    <context:property-placeholder location="classpath:rpc.properties"/>

    <bean id="serviceRpcDiscover" class="cn.wolfcode.rpc.register.RpcDiscover">
        <constructor-arg name="registryAddress" value="${registry.address}"/>
    </bean>

    <bean id="rpcProxy" class="cn.wolfcode.rpc.client.RpcProxy">
        <property name="rpcDiscover" ref="serviceRpcDiscover"/>
    </bean>

</beans>

log4j.properties

log4j.rootLogger=ERROR,console

log4j.appender.console=org.apache.log4j.ConsoleAppender
log4j.appender.console.target=System.out
log4j.appender.console.layout=org.apache.log4j.PatternLayout
log4j.appender.console.layout.ConversionPattern=%m%n

log4j.logger.cn.wolfcode.rpc=DEBUG

rpc.properties

# zookeeper server
registry.address=192.168.158.151:2181

如果要正常執行,請部署一個zookeeper註冊中心,修改rpc.properites的地址即可

  • 先執行app-server中的BootAppServer
  • 在執行app-client中的APP測試用例