基於Netty和SpringBoot實現一個輕量級RPC框架-Client篇
前提
前置文章:
- 《基於Netty和SpringBoot實現一個輕量級RPC框架-協議篇》
- 《基於Netty和SpringBoot實現一個輕量級RPC框架-Server篇》
前一篇文章相對簡略地介紹了RPC
服務端的編寫,而這篇博文最要介紹服務端(Client
)的實現。RPC
呼叫一般是面向契約程式設計的,而Client
的核心功能就是:把契約介面方法的呼叫抽象為使用Netty
向RPC
服務端通過私有協議傳送一個請求。這裡最底層的實現依賴於動態代理,因此動態代理是動態實現介面的最簡單方式(如果位元組碼研究得比較深入,可以通過位元組碼程式設計實現介面)。需要的依賴如下:
JDK1.8+
Netty:4.1.44.Final
SpringBoot:2.2.2.RELEASE
動態代理的簡單使用
一般可以通過JDK
動態代理或者Cglib
的位元組碼增強來實現此功能,為了簡單起見,不引入額外的依賴,這裡選用JDK
動態代理。這裡重新搬出前面提到的契約介面HelloService
:
public interface HelloService {
String sayHello(String name);
}
接下來需要通過動態代理為此介面新增一個實現:
public class TestDynamicProxy { public static void main(String[] args) throws Exception { Class<HelloService> interfaceKlass = HelloService.class; InvocationHandler handler = new HelloServiceImpl(interfaceKlass); HelloService helloService = (HelloService) Proxy.newProxyInstance(interfaceKlass.getClassLoader(), new Class[]{interfaceKlass}, handler); System.out.println(helloService.sayHello("throwable")); } @RequiredArgsConstructor private static class HelloServiceImpl implements InvocationHandler { private final Class<?> interfaceKlass; @Override public Object invoke(Object proxy, Method method, Object[] args) throws Throwable { // 這裡應該根據方法的返回值型別去決定返回結果 return String.format("[%s#%s]方法被呼叫,引數列表:%s", interfaceKlass.getName(), method.getName(), JSON.toJSONString(args)); } } } // 控制檯輸出結果 [club.throwable.contract.HelloService#sayHello]方法被呼叫,引數列表:["throwable"]
這裡可以確認兩點:
InvocationHandler
實現後會對被代理介面生成一個動態實現類。- 動態實現類(介面)方法被呼叫的時候,實際上是呼叫
InvocationHandler
對應例項的invoke()
方法,傳入的引數就是當前方法呼叫的元資料。
Client端程式碼實現
Client
端需要通過動態代理為契約介面生成一個動態實現類,然後提取契約介面呼叫方法時候所能提供的元資料,通過這些元資料和Netty
客戶端的支援(例如Netty
的Channel
)基於私有RPC
協議組裝請求資訊並且傳送請求。這裡先定義一個請求引數提取器介面RequestArgumentExtractor
:
@Data public class RequestArgumentExtractInput { private Class<?> interfaceKlass; private Method method; } @Data public class RequestArgumentExtractOutput { private String interfaceName; private String methodName; private List<String> methodArgumentSignatures; } // 介面 public interface RequestArgumentExtractor { RequestArgumentExtractOutput extract(RequestArgumentExtractInput input); }
簡單實現一下,解析結果新增到快取中,實現類DefaultRequestArgumentExtractor
程式碼如下:
public class DefaultRequestArgumentExtractor implements RequestArgumentExtractor {
private final ConcurrentMap<CacheKey, RequestArgumentExtractOutput> cache = Maps.newConcurrentMap();
@Override
public RequestArgumentExtractOutput extract(RequestArgumentExtractInput input) {
Class<?> interfaceKlass = input.getInterfaceKlass();
Method method = input.getMethod();
String methodName = method.getName();
Class<?>[] parameterTypes = method.getParameterTypes();
return cache.computeIfAbsent(new CacheKey(interfaceKlass.getName(), methodName,
Lists.newArrayList(parameterTypes)), x -> {
RequestArgumentExtractOutput output = new RequestArgumentExtractOutput();
output.setInterfaceName(interfaceKlass.getName());
List<String> methodArgumentSignatures = Lists.newArrayList();
for (Class<?> klass : parameterTypes) {
methodArgumentSignatures.add(klass.getName());
}
output.setMethodArgumentSignatures(methodArgumentSignatures);
output.setMethodName(methodName);
return output;
});
}
@RequiredArgsConstructor
private static class CacheKey {
private final String interfaceName;
private final String methodName;
private final List<Class<?>> parameterTypes;
@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
CacheKey cacheKey = (CacheKey) o;
return Objects.equals(interfaceName, cacheKey.interfaceName) &&
Objects.equals(methodName, cacheKey.methodName) &&
Objects.equals(parameterTypes, cacheKey.parameterTypes);
}
@Override
public int hashCode() {
return Objects.hash(interfaceName, methodName, parameterTypes);
}
}
}
在不考慮重連、斷連等情況下,新增一個類ClientChannelHolder
用於儲存Netty
客戶端的Channel
例項:
public class ClientChannelHolder {
public static final AtomicReference<Channel> CHANNEL_REFERENCE = new AtomicReference<>();
}
接著新增一個契約動態代理工廠(工具類)ContractProxyFactory
,用於為契約介面生成代理類例項:
public class ContractProxyFactory {
private static final RequestArgumentExtractor EXTRACTOR = new DefaultRequestArgumentExtractor();
private static final ConcurrentMap<Class<?>, Object> CACHE = Maps.newConcurrentMap();
@SuppressWarnings("unchecked")
public static <T> T ofProxy(Class<T> interfaceKlass) {
// 快取契約介面的代理類例項
return (T) CACHE.computeIfAbsent(interfaceKlass, x ->
Proxy.newProxyInstance(interfaceKlass.getClassLoader(), new Class[]{interfaceKlass}, (target, method, args) -> {
RequestArgumentExtractInput input = new RequestArgumentExtractInput();
input.setInterfaceKlass(interfaceKlass);
input.setMethod(method);
RequestArgumentExtractOutput output = EXTRACTOR.extract(input);
// 封裝請求引數
RequestMessagePacket packet = new RequestMessagePacket();
packet.setMagicNumber(ProtocolConstant.MAGIC_NUMBER);
packet.setVersion(ProtocolConstant.VERSION);
packet.setSerialNumber(SerialNumberUtils.X.generateSerialNumber());
packet.setMessageType(MessageType.REQUEST);
packet.setInterfaceName(output.getInterfaceName());
packet.setMethodName(output.getMethodName());
packet.setMethodArgumentSignatures(output.getMethodArgumentSignatures().toArray(new String[0]));
packet.setMethodArguments(args);
Channel channel = ClientChannelHolder.CHANNEL_REFERENCE.get();
// 發起請求
channel.writeAndFlush(packet);
// 這裡方法返回值需要進行同步處理,相對複雜,後面專門開一篇文章講解,暫時統一返回字串
// 如果契約介面的返回值型別不是字串,這裡方法返回後會丟擲異常
return String.format("[%s#%s]呼叫成功,傳送了[%s]到NettyServer[%s]", output.getInterfaceName(),
output.getMethodName(), JSON.toJSONString(packet), channel.remoteAddress());
}));
}
}
最後編寫客戶端ClientApplication
的程式碼:
@Slf4j
public class ClientApplication {
public static void main(String[] args) throws Exception {
int port = 9092;
EventLoopGroup workerGroup = new NioEventLoopGroup();
Bootstrap bootstrap = new Bootstrap();
try {
bootstrap.group(workerGroup);
bootstrap.channel(NioSocketChannel.class);
bootstrap.option(ChannelOption.SO_KEEPALIVE, Boolean.TRUE);
bootstrap.option(ChannelOption.TCP_NODELAY, Boolean.TRUE);
bootstrap.handler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ch.pipeline().addLast(new LengthFieldBasedFrameDecoder(1024, 0, 4, 0, 4));
ch.pipeline().addLast(new LengthFieldPrepender(4));
ch.pipeline().addLast(new LoggingHandler(LogLevel.DEBUG));
ch.pipeline().addLast(new RequestMessagePacketEncoder(FastJsonSerializer.X));
ch.pipeline().addLast(new ResponseMessagePacketDecoder());
ch.pipeline().addLast(new SimpleChannelInboundHandler<ResponseMessagePacket>() {
@Override
protected void channelRead0(ChannelHandlerContext ctx, ResponseMessagePacket packet) throws Exception {
Object targetPayload = packet.getPayload();
if (targetPayload instanceof ByteBuf) {
ByteBuf byteBuf = (ByteBuf) targetPayload;
int readableByteLength = byteBuf.readableBytes();
byte[] bytes = new byte[readableByteLength];
byteBuf.readBytes(bytes);
targetPayload = FastJsonSerializer.X.decode(bytes, String.class);
byteBuf.release();
}
packet.setPayload(targetPayload);
log.info("接收到來自服務端的響應訊息,訊息內容:{}", JSON.toJSONString(packet));
}
});
}
});
ChannelFuture future = bootstrap.connect("localhost", port).sync();
// 儲存Channel例項,暫時不考慮斷連重連
ClientChannelHolder.CHANNEL_REFERENCE.set(future.channel());
// 構造契約介面代理類例項
HelloService helloService = ContractProxyFactory.ofProxy(HelloService.class);
String result = helloService.sayHello("throwable");
log.info(result);
future.channel().closeFuture().sync();
} finally {
workerGroup.shutdownGracefully();
}
}
}
先啟動《基於Netty和SpringBoot實現一個輕量級RPC框架-Server篇》一文中的ServerApplication
,再啟動ClientApplication
,控制檯輸出如下:
// 服務端日誌
2020-01-16 22:34:51 [main] INFO c.throwable.server.ServerApplication - 啟動NettyServer[9092]成功...
2020-01-16 22:36:35 [nioEventLoopGroup-3-1] INFO club.throwable.server.ServerHandler - 服務端接收到:RequestMessagePacket(interfaceName=club.throwable.contract.HelloService, methodName=sayHello, methodArgumentSignatures=[java.lang.String], methodArguments=[PooledUnsafeDirectByteBuf(ridx: 0, widx: 11, cap: 11/144)])
2020-01-16 22:36:35 [nioEventLoopGroup-3-1] INFO club.throwable.server.ServerHandler - 查詢目標實現方法成功,目標類:club.throwable.server.contract.DefaultHelloService,宿主類:club.throwable.server.contract.DefaultHelloService,宿主方法:sayHello
2020-01-16 22:36:35 [nioEventLoopGroup-3-1] INFO club.throwable.server.ServerHandler - 服務端輸出:{"attachments":{},"errorCode":200,"magicNumber":10086,"message":"Success","messageType":"RESPONSE","payload":"\"throwable say hello!\"","serialNumber":"63d386214d30410c9e5f04de03d8b2da","version":1}
// 客戶端日誌
2020-01-16 22:36:35 [main] INFO c.throwable.client.ClientApplication - [club.throwable.contract.HelloService#sayHello]呼叫成功,傳送了[{"attachments":{},"interfaceName":"club.throwable.contract.HelloService","magicNumber":10086,"messageType":"REQUEST","methodArgumentSignatures":["java.lang.String"],"methodArguments":["throwable"],"methodName":"sayHello","serialNumber":"63d386214d30410c9e5f04de03d8b2da","version":1}]到NettyServer[localhost/127.0.0.1:9092]
2020-01-16 22:36:35 [nioEventLoopGroup-2-1] INFO c.throwable.client.ClientApplication - 接收到來自服務端的響應訊息,訊息內容:{"attachments":{},"errorCode":200,"magicNumber":10086,"message":"Success","messageType":"RESPONSE","payload":"\"throwable say hello!\"","serialNumber":"63d386214d30410c9e5f04de03d8b2da","version":1}
小結
Client
端主要負責契約介面呼叫轉換為傳送RPC
協議請求這一步,核心技術就是動態代理,在不進行模組封裝優化的前提下實現是相對簡單的。這裡其實Client
端還有一個比較大的技術難題沒有解決,上面例子中客戶端日誌輸出如果眼尖的夥伴會發現,Client
端傳送RPC
請求的執行緒(main
執行緒)和Client
端接收Server
端RPC
響應處理的執行緒(nioEventLoopGroup-2-1
執行緒)並不相同,這一點是Netty
處理網路請求之所以能夠如此高效的根源(簡單來說就是請求和響應是非同步的,兩個流程本來是互不感知的)。但是更多情況下,我們希望外部請求是同步的,希望傳送RPC
請求的執行緒得到響應結果再返回(這裡請求和響應有可能依然是非同步流程)。下一篇文章會詳細分析一下如果對請求-響應做同步化處理。
Demo
專案地址:
- ch0-custom-rpc-protocol
(c-2-d e-a-20200116