1. 程式人生 > >基於Netty和SpringBoot實現一個輕量級RPC框架-Client篇

基於Netty和SpringBoot實現一個輕量級RPC框架-Client篇

前提

前置文章:

  • 《基於Netty和SpringBoot實現一個輕量級RPC框架-協議篇》
  • 《基於Netty和SpringBoot實現一個輕量級RPC框架-Server篇》

前一篇文章相對簡略地介紹了RPC服務端的編寫,而這篇博文最要介紹服務端(Client)的實現。RPC呼叫一般是面向契約程式設計的,而Client的核心功能就是:把契約介面方法的呼叫抽象為使用NettyRPC服務端通過私有協議傳送一個請求。這裡最底層的實現依賴於動態代理,因此動態代理是動態實現介面的最簡單方式(如果位元組碼研究得比較深入,可以通過位元組碼程式設計實現介面)。需要的依賴如下:

  • JDK1.8+
  • Netty:4.1.44.Final
  • SpringBoot:2.2.2.RELEASE

動態代理的簡單使用

一般可以通過JDK動態代理或者Cglib的位元組碼增強來實現此功能,為了簡單起見,不引入額外的依賴,這裡選用JDK動態代理。這裡重新搬出前面提到的契約介面HelloService

public interface HelloService {

    String sayHello(String name);
}

接下來需要通過動態代理為此介面新增一個實現:

public class TestDynamicProxy {

    public static void main(String[] args) throws Exception {
        Class<HelloService> interfaceKlass = HelloService.class;
        InvocationHandler handler = new HelloServiceImpl(interfaceKlass);
        HelloService helloService = (HelloService)
                Proxy.newProxyInstance(interfaceKlass.getClassLoader(), new Class[]{interfaceKlass}, handler);
        System.out.println(helloService.sayHello("throwable"));
    }

    @RequiredArgsConstructor
    private static class HelloServiceImpl implements InvocationHandler {

        private final Class<?> interfaceKlass;

        @Override
        public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
            // 這裡應該根據方法的返回值型別去決定返回結果
            return String.format("[%s#%s]方法被呼叫,引數列表:%s", interfaceKlass.getName(), method.getName(),
                    JSON.toJSONString(args));
        }
    }
}
// 控制檯輸出結果
[club.throwable.contract.HelloService#sayHello]方法被呼叫,引數列表:["throwable"]

這裡可以確認兩點:

  1. InvocationHandler實現後會對被代理介面生成一個動態實現類。
  2. 動態實現類(介面)方法被呼叫的時候,實際上是呼叫InvocationHandler對應例項的invoke()方法,傳入的引數就是當前方法呼叫的元資料。

Client端程式碼實現

Client端需要通過動態代理為契約介面生成一個動態實現類,然後提取契約介面呼叫方法時候所能提供的元資料,通過這些元資料和Netty客戶端的支援(例如NettyChannel)基於私有RPC協議組裝請求資訊並且傳送請求。這裡先定義一個請求引數提取器介面RequestArgumentExtractor

@Data
public class RequestArgumentExtractInput {

    private Class<?> interfaceKlass;

    private Method method;
}

@Data
public class RequestArgumentExtractOutput {

    private String interfaceName;

    private String methodName;

    private List<String> methodArgumentSignatures;
}

// 介面
public interface RequestArgumentExtractor {

    RequestArgumentExtractOutput extract(RequestArgumentExtractInput input);
}

簡單實現一下,解析結果新增到快取中,實現類DefaultRequestArgumentExtractor程式碼如下:

public class DefaultRequestArgumentExtractor implements RequestArgumentExtractor {

    private final ConcurrentMap<CacheKey, RequestArgumentExtractOutput> cache = Maps.newConcurrentMap();

    @Override

    public RequestArgumentExtractOutput extract(RequestArgumentExtractInput input) {
        Class<?> interfaceKlass = input.getInterfaceKlass();
        Method method = input.getMethod();
        String methodName = method.getName();
        Class<?>[] parameterTypes = method.getParameterTypes();
        return cache.computeIfAbsent(new CacheKey(interfaceKlass.getName(), methodName,
                Lists.newArrayList(parameterTypes)), x -> {
            RequestArgumentExtractOutput output = new RequestArgumentExtractOutput();
            output.setInterfaceName(interfaceKlass.getName());
            List<String> methodArgumentSignatures = Lists.newArrayList();
            for (Class<?> klass : parameterTypes) {
                methodArgumentSignatures.add(klass.getName());
            }
            output.setMethodArgumentSignatures(methodArgumentSignatures);
            output.setMethodName(methodName);
            return output;
        });
    }

    @RequiredArgsConstructor
    private static class CacheKey {

        private final String interfaceName;
        private final String methodName;
        private final List<Class<?>> parameterTypes;

        @Override
        public boolean equals(Object o) {
            if (this == o) return true;
            if (o == null || getClass() != o.getClass()) return false;
            CacheKey cacheKey = (CacheKey) o;
            return Objects.equals(interfaceName, cacheKey.interfaceName) &&
                    Objects.equals(methodName, cacheKey.methodName) &&
                    Objects.equals(parameterTypes, cacheKey.parameterTypes);
        }

        @Override
        public int hashCode() {
            return Objects.hash(interfaceName, methodName, parameterTypes);
        }
    }
}

在不考慮重連、斷連等情況下,新增一個類ClientChannelHolder用於儲存Netty客戶端的Channel例項:

public class ClientChannelHolder {

    public static final AtomicReference<Channel> CHANNEL_REFERENCE = new AtomicReference<>();
}

接著新增一個契約動態代理工廠(工具類)ContractProxyFactory,用於為契約介面生成代理類例項:

public class ContractProxyFactory {

    private static final RequestArgumentExtractor EXTRACTOR = new DefaultRequestArgumentExtractor();
    private static final ConcurrentMap<Class<?>, Object> CACHE = Maps.newConcurrentMap();

    @SuppressWarnings("unchecked")
    public static <T> T ofProxy(Class<T> interfaceKlass) {
        // 快取契約介面的代理類例項
        return (T) CACHE.computeIfAbsent(interfaceKlass, x ->
                Proxy.newProxyInstance(interfaceKlass.getClassLoader(), new Class[]{interfaceKlass}, (target, method, args) -> {
                    RequestArgumentExtractInput input = new RequestArgumentExtractInput();
                    input.setInterfaceKlass(interfaceKlass);
                    input.setMethod(method);
                    RequestArgumentExtractOutput output = EXTRACTOR.extract(input);
                    // 封裝請求引數
                    RequestMessagePacket packet = new RequestMessagePacket();
                    packet.setMagicNumber(ProtocolConstant.MAGIC_NUMBER);
                    packet.setVersion(ProtocolConstant.VERSION);
                    packet.setSerialNumber(SerialNumberUtils.X.generateSerialNumber());
                    packet.setMessageType(MessageType.REQUEST);
                    packet.setInterfaceName(output.getInterfaceName());
                    packet.setMethodName(output.getMethodName());
                    packet.setMethodArgumentSignatures(output.getMethodArgumentSignatures().toArray(new String[0]));
                    packet.setMethodArguments(args);
                    Channel channel = ClientChannelHolder.CHANNEL_REFERENCE.get();
                    // 發起請求
                    channel.writeAndFlush(packet);
                    // 這裡方法返回值需要進行同步處理,相對複雜,後面專門開一篇文章講解,暫時統一返回字串
                    // 如果契約介面的返回值型別不是字串,這裡方法返回後會丟擲異常
                    return String.format("[%s#%s]呼叫成功,傳送了[%s]到NettyServer[%s]", output.getInterfaceName(),
                            output.getMethodName(), JSON.toJSONString(packet), channel.remoteAddress());
                }));
    }
}

最後編寫客戶端ClientApplication的程式碼:

@Slf4j
public class ClientApplication {

    public static void main(String[] args) throws Exception {
        int port = 9092;
        EventLoopGroup workerGroup = new NioEventLoopGroup();
        Bootstrap bootstrap = new Bootstrap();
        try {
            bootstrap.group(workerGroup);
            bootstrap.channel(NioSocketChannel.class);
            bootstrap.option(ChannelOption.SO_KEEPALIVE, Boolean.TRUE);
            bootstrap.option(ChannelOption.TCP_NODELAY, Boolean.TRUE);
            bootstrap.handler(new ChannelInitializer<SocketChannel>() {

                @Override
                protected void initChannel(SocketChannel ch) throws Exception {
                    ch.pipeline().addLast(new LengthFieldBasedFrameDecoder(1024, 0, 4, 0, 4));
                    ch.pipeline().addLast(new LengthFieldPrepender(4));
                    ch.pipeline().addLast(new LoggingHandler(LogLevel.DEBUG));
                    ch.pipeline().addLast(new RequestMessagePacketEncoder(FastJsonSerializer.X));
                    ch.pipeline().addLast(new ResponseMessagePacketDecoder());
                    ch.pipeline().addLast(new SimpleChannelInboundHandler<ResponseMessagePacket>() {
                        @Override
                        protected void channelRead0(ChannelHandlerContext ctx, ResponseMessagePacket packet) throws Exception {
                            Object targetPayload = packet.getPayload();
                            if (targetPayload instanceof ByteBuf) {
                                ByteBuf byteBuf = (ByteBuf) targetPayload;
                                int readableByteLength = byteBuf.readableBytes();
                                byte[] bytes = new byte[readableByteLength];
                                byteBuf.readBytes(bytes);
                                targetPayload = FastJsonSerializer.X.decode(bytes, String.class);
                                byteBuf.release();
                            }
                            packet.setPayload(targetPayload);
                            log.info("接收到來自服務端的響應訊息,訊息內容:{}", JSON.toJSONString(packet));
                        }
                    });
                }
            });
            ChannelFuture future = bootstrap.connect("localhost", port).sync();
            // 儲存Channel例項,暫時不考慮斷連重連
            ClientChannelHolder.CHANNEL_REFERENCE.set(future.channel());
            // 構造契約介面代理類例項
            HelloService helloService = ContractProxyFactory.ofProxy(HelloService.class);
            String result = helloService.sayHello("throwable");
            log.info(result);
            future.channel().closeFuture().sync();
        } finally {
            workerGroup.shutdownGracefully();
        }
    }
}

先啟動《基於Netty和SpringBoot實現一個輕量級RPC框架-Server篇》一文中的ServerApplication,再啟動ClientApplication,控制檯輸出如下:

// 服務端日誌
2020-01-16 22:34:51 [main] INFO  c.throwable.server.ServerApplication - 啟動NettyServer[9092]成功...
2020-01-16 22:36:35 [nioEventLoopGroup-3-1] INFO  club.throwable.server.ServerHandler - 服務端接收到:RequestMessagePacket(interfaceName=club.throwable.contract.HelloService, methodName=sayHello, methodArgumentSignatures=[java.lang.String], methodArguments=[PooledUnsafeDirectByteBuf(ridx: 0, widx: 11, cap: 11/144)])
2020-01-16 22:36:35 [nioEventLoopGroup-3-1] INFO  club.throwable.server.ServerHandler - 查詢目標實現方法成功,目標類:club.throwable.server.contract.DefaultHelloService,宿主類:club.throwable.server.contract.DefaultHelloService,宿主方法:sayHello
2020-01-16 22:36:35 [nioEventLoopGroup-3-1] INFO  club.throwable.server.ServerHandler - 服務端輸出:{"attachments":{},"errorCode":200,"magicNumber":10086,"message":"Success","messageType":"RESPONSE","payload":"\"throwable say hello!\"","serialNumber":"63d386214d30410c9e5f04de03d8b2da","version":1}

// 客戶端日誌
2020-01-16 22:36:35 [main] INFO  c.throwable.client.ClientApplication - [club.throwable.contract.HelloService#sayHello]呼叫成功,傳送了[{"attachments":{},"interfaceName":"club.throwable.contract.HelloService","magicNumber":10086,"messageType":"REQUEST","methodArgumentSignatures":["java.lang.String"],"methodArguments":["throwable"],"methodName":"sayHello","serialNumber":"63d386214d30410c9e5f04de03d8b2da","version":1}]到NettyServer[localhost/127.0.0.1:9092]
2020-01-16 22:36:35 [nioEventLoopGroup-2-1] INFO  c.throwable.client.ClientApplication - 接收到來自服務端的響應訊息,訊息內容:{"attachments":{},"errorCode":200,"magicNumber":10086,"message":"Success","messageType":"RESPONSE","payload":"\"throwable say hello!\"","serialNumber":"63d386214d30410c9e5f04de03d8b2da","version":1}

小結

Client端主要負責契約介面呼叫轉換為傳送RPC協議請求這一步,核心技術就是動態代理,在不進行模組封裝優化的前提下實現是相對簡單的。這裡其實Client端還有一個比較大的技術難題沒有解決,上面例子中客戶端日誌輸出如果眼尖的夥伴會發現,Client端傳送RPC請求的執行緒(main執行緒)和Client端接收ServerRPC響應處理的執行緒(nioEventLoopGroup-2-1執行緒)並不相同,這一點是Netty處理網路請求之所以能夠如此高效的根源(簡單來說就是請求和響應是非同步的,兩個流程本來是互不感知的)。但是更多情況下,我們希望外部請求是同步的,希望傳送RPC請求的執行緒得到響應結果再返回(這裡請求和響應有可能依然是非同步流程)。下一篇文章會詳細分析一下如果對請求-響應做同步化處理。

Demo專案地址:

  • ch0-custom-rpc-protocol

(c-2-d e-a-20200116