1. 程式人生 > >基於Netty的分散式 RPC 框架

基於Netty的分散式 RPC 框架

轉載自:http://blog.csdn.net/z69183787/article/details/52700274
              http://blog.csdn.net/z69183787/article/details/52680941

       採用Zookeeper、Netty和spring實現了一個輕量級的分散式RPC框架,這個RPC框架可以算是一個簡易版的dubbo。框架雖小,但是麻雀雖小,五臟俱全。
       使用瞭如下技術選型:
           Spring:依賴注入框架。
           Netty:它使 NIO 程式設計更加容易,遮蔽了 Java 底層的 NIO 細節。
           Protostuff:它基於 Protobuf 序列化框架,面向 POJO,無需編寫 .proto 檔案。
           ZooKeeper:提供服務註冊與發現功能,開發分散式系統的必備選擇。
       下面只是些關鍵部分的程式碼示例,具體請參考原作者的git:
https://github.com/luxiaoxun/NettyRpc
或則 轉載的原文。

1.使用註解標註要釋出的服務:
@Target({ElementType.TYPE})  
@Retention(RetentionPolicy.RUNTIME)  
@Component  
public @interface RpcService {  
    Class<?> value();  
}
       使用RpcService註解定義在服務介面的實現類上,需要對該實現類指定遠端介面的class,因為實現類可能會實現多個介面,一定要告訴框架哪個才是遠端介面。

2. 定義服務介面和實現:
public interface HelloService {  
  ……
}  
@RpcService(HelloService.class)  
public class HelloServiceImpl implements HelloService {  
  ……
}  

3. RpcRequest與RpcResponse如下:
public class RpcRequest {  
  
    private String requestId;  
    private String className;  
    private String methodName;  
    private Class<?>[] parameterTypes;  
    private Object[] parameters;  
  
    // getter/setter...  
}  
public class RpcResponse {  
  
    private String requestId;  
    private Throwable error;  
    private Object result;  
  
    // getter/setter...  
}

4. 實現 RPC 伺服器:
public class RpcServer implements ApplicationContextAware, InitializingBean {  
    
    private String serverAddress;  
    private ServiceRegistry serviceRegistry;  
  
    private Map<String, Object> handlerMap = new HashMap<>(); // 存放介面名與服務物件之間的對映關係  
  
    public RpcServer(String serverAddress) {  
        this.serverAddress = serverAddress;  
    }  
  
    public RpcServer(String serverAddress, ServiceRegistry serviceRegistry) {  
        this.serverAddress = serverAddress;  
        this.serviceRegistry = serviceRegistry;  
    }  
  
    @Override  
    public void setApplicationContext(ApplicationContext ctx) throws BeansException {  
        Map<String, Object> serviceBeanMap = ctx.getBeansWithAnnotation(RpcService.class); // 獲取所有帶有 RpcService 註解的 Spring Bean  
        if (MapUtils.isNotEmpty(serviceBeanMap)) {  
            for (Object serviceBean : serviceBeanMap.values()) {  
                String interfaceName = serviceBean.getClass().getAnnotation(RpcService.class).value().getName();  
                handlerMap.put(interfaceName, serviceBean);  
            }  
        }  
    }  
  
    @Override  
    public void afterPropertiesSet() throws Exception {  
        EventLoopGroup bossGroup = new NioEventLoopGroup();  
        EventLoopGroup workerGroup = new NioEventLoopGroup();  
        try {  
            ServerBootstrap bootstrap = new ServerBootstrap();  
            bootstrap.group(bossGroup, workerGroup).channel(NioServerSocketChannel.class)  
                .childHandler(new ChannelInitializer<SocketChannel>() {  
                    @Override  
                    public void initChannel(SocketChannel channel) throws Exception {  
                        channel.pipeline()  
                            .addLast(new RpcDecoder(RpcRequest.class)) // 將 RPC 請求進行解碼(為了處理請求)  
                            .addLast(new RpcEncoder(RpcResponse.class)) // 將 RPC 響應進行編碼(為了返回響應)  
                            .addLast(new RpcHandler(handlerMap)); // 處理 RPC 請求  
                    }  
                })  
                .option(ChannelOption.SO_BACKLOG, 128)  
                .childOption(ChannelOption.SO_KEEPALIVE, true);  
  
            String[] array = serverAddress.split(":");  
            String host = array[0];  
            int port = Integer.parseInt(array[1]);  
  
            ChannelFuture future = bootstrap.bind(host, port).sync();  
  
            if (serviceRegistry != null) {  
                serviceRegistry.register(serverAddress); // 註冊服務地址  
            }  
  
            future.channel().closeFuture().sync();  
        } finally {  
            workerGroup.shutdownGracefully();  
            bossGroup.shutdownGracefully();  
        }  
    }  
}  
       這裡的serverAddress是”ip:port”,ServiceRegistry是使用 ZooKeeper實現服務註冊類,將serverAddress在資料節點上。

5. RpcDecoder提供 RPC 解碼,只需擴充套件 Netty 的ByteToMessageDecoder抽象類的decode方法即可,RpcEncoder提供 RPC 編碼,只需擴充套件 Netty 的MessageToByteEncoder抽象類的encode方法即可,程式碼如下:
public class RpcDecoder extends ByteToMessageDecoder {  
  
    private Class<?> genericClass;  
  
    public RpcDecoder(Class<?> genericClass) {  
        this.genericClass = genericClass;  
    }  
  
    @Override  
    public void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {  
        if (in.readableBytes() < 4) {  
            return;  
        }  
        in.markReaderIndex();  
        int dataLength = in.readInt();  
        if (dataLength < 0) {  
            ctx.close();  
        }  
        if (in.readableBytes() < dataLength) {  
            in.resetReaderIndex();  
            return;  
        }  
        byte[] data = new byte[dataLength];  
        in.readBytes(data);  
  
        Object obj = SerializationUtil.deserialize(data, genericClass);  
        out.add(obj);  
    }  
}
public class RpcEncoder extends MessageToByteEncoder {  
  
    private Class<?> genericClass;  
  
    public RpcEncoder(Class<?> genericClass) {  
        this.genericClass = genericClass;  
    }  
  
    @Override  
    public void encode(ChannelHandlerContext ctx, Object in, ByteBuf out) throws Exception {  
        if (genericClass.isInstance(in)) {  
            byte[] data = SerializationUtil.serialize(in);  
            out.writeInt(data.length);  
            out.writeBytes(data);  
        }  
    }  
}
       SerializationUtil工具類是使用Protostuff來實現序列化,程式碼略過。

6. 在RpcHandler中將處理 RPC 請求,擴充套件了Netty的SimpleChannelInboundHandler抽象類:
public class RpcHandler extends SimpleChannelInboundHandler<RpcRequest> {  
    
    private final Map<String, Object> handlerMap;  
  
    public RpcHandler(Map<String, Object> handlerMap) {  
        this.handlerMap = handlerMap;  
    }  
  
    @Override  
    public void channelRead0(final ChannelHandlerContext ctx, RpcRequest request) throws Exception {  
        RpcResponse response = new RpcResponse();  
        response.setRequestId(request.getRequestId());  
        try {  
            Object result = handle(request);  
            response.setResult(result);  
        } catch (Throwable t) {  
            response.setError(t);  
        }  
        ctx.writeAndFlush(response).addListener(ChannelFutureListener.CLOSE);  
    }  
  
    private Object handle(RpcRequest request) throws Throwable {  
        String className = request.getClassName();  
        Object serviceBean = handlerMap.get(className);  
  
        Class<?> serviceClass = serviceBean.getClass();  
        String methodName = request.getMethodName();  
        Class<?>[] parameterTypes = request.getParameterTypes();  
        Object[] parameters = request.getParameters();  
  
        /*Method method = serviceClass.getMethod(methodName, parameterTypes); 
        method.setAccessible(true); 
        return method.invoke(serviceBean, parameters);*/  
  
        FastClass serviceFastClass = FastClass.create(serviceClass);  
        FastMethod serviceFastMethod = serviceFastClass.getMethod(methodName, parameterTypes);  
        return serviceFastMethod.invoke(serviceBean, parameters);  
    }  
  
    @Override  
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {  
        ctx.close();  
    }  
}  
       為了避免使用 Java 反射帶來的效能問題,使用了CGLib 提供的反射API,即上面用到的FastClass與FastMethod。

7. 客戶端呼叫服務,使用代理模式呼叫服務:
public class RpcProxy {  
  
    private String serverAddress;  
    private ServiceDiscovery serviceDiscovery;  
  
    public RpcProxy(String serverAddress) {  
        this.serverAddress = serverAddress;  
    }  
  
    public RpcProxy(ServiceDiscovery serviceDiscovery) {  
        this.serviceDiscovery = serviceDiscovery;  
    }  
  
    @SuppressWarnings("unchecked")  
    public <T> T create(Class<?> interfaceClass) {  
        return (T) Proxy.newProxyInstance(  
            interfaceClass.getClassLoader(),  
            new Class<?>[]{interfaceClass},  
            new InvocationHandler() {  
                @Override  
                public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {  
                    RpcRequest request = new RpcRequest(); // 建立並初始化 RPC 請求  
                    request.setRequestId(UUID.randomUUID().toString());  
                    request.setClassName(method.getDeclaringClass().getName());  
                    request.setMethodName(method.getName());  
                    request.setParameterTypes(method.getParameterTypes());  
                    request.setParameters(args);  
  
                    if (serviceDiscovery != null) {  
                        serverAddress = serviceDiscovery.discover(); // 發現服務  
                    }  
  
                    String[] array = serverAddress.split(":");  
                    String host = array[0];  
                    int port = Integer.parseInt(array[1]);  
  
                    RpcClient client = new RpcClient(host, port); // 初始化 RPC 客戶端  
                    RpcResponse response = client.send(request); // 通過 RPC 客戶端傳送 RPC 請求並獲取 RPC 響應  
  
                    if (response.isError()) {  
                        throw response.getError();  
                    } else {  
                        return response.getResult();  
                    }  
                }  
            }  
        );  
    }  
}  
       ServiceDiscovery是Zookeeper實現的服務發現,取出已經註冊過的”ip:port”,然後隨機返回一個”ip:port”。

8. RpcClient類實現 RPC 客戶端:
public class RpcClient extends SimpleChannelInboundHandler<RpcResponse> {  
  
    private String host;  
    private int port;  
  
    private RpcResponse response;  
  
    private final Object obj = new Object();  
  
    public RpcClient(String host, int port) {  
        this.host = host;  
        this.port = port;  
    }  
  
    @Override  
    public void channelRead0(ChannelHandlerContext ctx, RpcResponse response) throws Exception {  
        this.response = response;  
  
        synchronized (obj) {  
            obj.notifyAll(); // 收到響應,喚醒執行緒  
        }  
    }  
  
    @Override  
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {  
        ctx.close();  
    }  
  
    public RpcResponse send(RpcRequest request) throws Exception {  
        EventLoopGroup group = new NioEventLoopGroup();  
        try {  
            Bootstrap bootstrap = new Bootstrap();  
            bootstrap.group(group).channel(NioSocketChannel.class)  
                .handler(new ChannelInitializer<SocketChannel>() {  
                    @Override  
                    public void initChannel(SocketChannel channel) throws Exception {  
                        channel.pipeline()  
                            .addLast(new RpcEncoder(RpcRequest.class)) // 將 RPC 請求進行編碼(為了傳送請求)  
                            .addLast(new RpcDecoder(RpcResponse.class)) // 將 RPC 響應進行解碼(為了處理響應)  
                            .addLast(RpcClient.this); // 使用 RpcClient 傳送 RPC 請求  
                    }  
                })  
                .option(ChannelOption.SO_KEEPALIVE, true);  
  
            ChannelFuture future = bootstrap.connect(host, port).sync();  
            future.channel().writeAndFlush(request).sync();  
  
            synchronized (obj) {  
                obj.wait(); // 未收到響應,使執行緒等待  
            }  
  
            if (response != null) {  
                future.channel().closeFuture().sync();  
            }  
            return response;  
        } finally {  
            group.shutdownGracefully();  
        }  
    }  
}
       這裡每次呼叫的send時候才去和服務端建立連線,使用的是短連線,這種短連線在高併發時會有連線數問題,也會影響效能。,使用了obj的wait和notifyAll來等待Response返回,會出現“假死等待”的情況:一個Request傳送出去後,在obj.wait()呼叫之前可能Response就返回了,這時候在channelRead0裡已經拿到了Response並且obj.notifyAll()已經在obj.wait()之前呼叫了,這時候send後再obj.wait()就出現了假死等待,客戶端就一直等待在這裡。應該使用CountDownLatch來解決這個問題。

9.測試:
RpcProxy rpcProxy = new RpcProxy(serviceDiscovery);
HelloService helloService = rpcProxy.create(HelloService.class);  
String result = helloService.hello("World");  

以上的demo還只是個雛形,還存在一些效能問題需要改進,僅供學習參考之用。