基於Netty和SpringBoot實現一個輕量級RPC框架-Client端請求響應同步化處理
前提
前置文章:
- 《基於Netty和SpringBoot實現一個輕量級RPC框架-協議篇》
- 《基於Netty和SpringBoot實現一個輕量級RPC框架-Server篇》
- 《基於Netty和SpringBoot實現一個輕量級RPC框架-Client篇》
前一篇文章簡單介紹了通過動態代理完成了Client
端契約介面呼叫轉換為傳送RPC
協議請求的功能。這篇文章主要解決一個遺留的技術難題:請求-響應同步化處理。
需要的依賴如下:
JDK1.8+
Netty:4.1.44.Final
SpringBoot:2.2.2.RELEASE
簡單分析Netty請求-響應的處理流程
圖中已經忽略了編碼解碼器和其他入站出站處理器,不同顏色的執行緒代表完全不相同的執行緒,不同執行緒之間的處理邏輯是完全非同步,也就是Netty IO
n-l-g-1
)接收到Server
端的訊息並且解析完成的時候,使用者呼叫執行緒(u-t-1
)無法感知到解析完畢的訊息包,那麼這裡要做的事情就是讓使用者呼叫執行緒(u-t-1
)獲取到Netty IO
執行緒(n-l-g-1
)接收並且解析完成的訊息包。
這裡可以用一個簡單的例子來說明模擬Client
端呼叫執行緒等待Netty IO
執行緒的處理結果再同步返回的過程。
@Slf4j public class NettyThreadSyncTest { @ToString private static class ResponseFuture { private final long beginTimestamp = System.currentTimeMillis(); @Getter private final long timeoutMilliseconds; @Getter private final String requestId; @Setter @Getter private volatile boolean sendRequestSucceed = false; @Setter @Getter private volatile Throwable cause; @Getter private volatile Object response; private final CountDownLatch latch = new CountDownLatch(1); public ResponseFuture(String requestId, long timeoutMilliseconds) { this.requestId = requestId; this.timeoutMilliseconds = timeoutMilliseconds; } public boolean timeout() { return System.currentTimeMillis() - beginTimestamp > timeoutMilliseconds; } public Object waitResponse(final long timeoutMilliseconds) throws InterruptedException { latch.await(timeoutMilliseconds, TimeUnit.MILLISECONDS); return response; } public void putResponse(Object response) throws InterruptedException { this.response = response; latch.countDown(); } } static ExecutorService REQUEST_THREAD; static ExecutorService NETTY_IO_THREAD; static Callable<Object> REQUEST_TASK; static Runnable RESPONSE_TASK; static String processBusiness(String name) { return String.format("%s say hello!", name); } private static final Map<String /* request id */, ResponseFuture> RESPONSE_FUTURE_TABLE = Maps.newConcurrentMap(); @BeforeClass public static void beforeClass() throws Exception { String requestId = UUID.randomUUID().toString(); String requestContent = "throwable"; REQUEST_TASK = () -> { try { // 3秒沒有得到響應認為超時 ResponseFuture responseFuture = new ResponseFuture(requestId, 3000); RESPONSE_FUTURE_TABLE.put(requestId, responseFuture); // 這裡忽略傳送請求的操作,只打印日誌和模擬耗時1秒 Thread.sleep(1000); log.info("傳送請求成功,請求ID:{},請求內容:{}", requestId, requestContent); // 更新標記屬性 responseFuture.setSendRequestSucceed(true); // 剩餘2秒等待時間 - 這裡只是粗略計算 return responseFuture.waitResponse(3000 - 1000); } catch (Exception e) { log.info("傳送請求失敗,請求ID:{},請求內容:{}", requestId, requestContent); throw new RuntimeException(e); } }; RESPONSE_TASK = () -> { String responseContent = processBusiness(requestContent); try { ResponseFuture responseFuture = RESPONSE_FUTURE_TABLE.get(requestId); if (null != responseFuture) { log.warn("處理響應成功,請求ID:{},響應內容:{}", requestId, responseContent); responseFuture.putResponse(responseContent); } else { log.warn("請求ID[{}]對應的ResponseFuture不存在,忽略處理", requestId); } } catch (Exception e) { log.info("處理響應失敗,請求ID:{},響應內容:{}", requestId, responseContent); throw new RuntimeException(e); } }; REQUEST_THREAD = Executors.newSingleThreadExecutor(runnable -> { Thread thread = new Thread(runnable, "REQUEST_THREAD"); thread.setDaemon(true); return thread; }); NETTY_IO_THREAD = Executors.newSingleThreadExecutor(runnable -> { Thread thread = new Thread(runnable, "NETTY_IO_THREAD"); thread.setDaemon(true); return thread; }); } @Test public void testProcessSync() throws Exception { log.info("非同步提交請求處理任務......"); Future<Object> future = REQUEST_THREAD.submit(REQUEST_TASK); // 模擬請求耗時 Thread.sleep(1500); log.info("非同步提交響應處理任務......"); NETTY_IO_THREAD.execute(RESPONSE_TASK); // 這裡可以設定超時 log.info("同步獲取請求結果:{}", future.get()); Thread.sleep(Long.MAX_VALUE); } }
執行testProcessSync()
方法,控制檯輸出如下:
2020-01-18 13:17:07 [main] INFO c.t.client.NettyThreadSyncTest - 非同步提交請求處理任務...... 2020-01-18 13:17:08 [REQUEST_THREAD] INFO c.t.client.NettyThreadSyncTest - 傳送請求成功,請求ID:71f47e27-c17c-458d-b271-4e74fad33a7b,請求內容:throwable 2020-01-18 13:17:09 [main] INFO c.t.client.NettyThreadSyncTest - 非同步提交響應處理任務...... 2020-01-18 13:17:09 [NETTY_IO_THREAD] WARN c.t.client.NettyThreadSyncTest - 處理響應成功,請求ID:71f47e27-c17c-458d-b271-4e74fad33a7b,響應內容:throwable say hello! 2020-01-18 13:17:09 [main] INFO c.t.client.NettyThreadSyncTest - 同步獲取請求結果:throwable say hello!
上面這個例子裡面的執行緒同步處理主要參考主流的Netty
框架客戶端部分的實現邏輯:RocketMQ
(具體是NettyRemotingClient
類)以及Redisson
(具體是RedisExecutor
類),它們就是用這種方式使得非同步執行緒處理轉化為同步處理。
Client端請求響應同步化處理
按照前面的例子,首先新增一個ResponseFuture
用於承載已傳送但未響應的請求:
@ToString
public class ResponseFuture {
private final long beginTimestamp = System.currentTimeMillis();
@Getter
private final long timeoutMilliseconds;
@Getter
private final String requestId;
@Setter
@Getter
private volatile boolean sendRequestSucceed = false;
@Setter
@Getter
private volatile Throwable cause;
@Getter
private volatile ResponseMessagePacket response;
private final CountDownLatch latch = new CountDownLatch(1);
public ResponseFuture(String requestId, long timeoutMilliseconds) {
this.requestId = requestId;
this.timeoutMilliseconds = timeoutMilliseconds;
}
public boolean timeout() {
return System.currentTimeMillis() - beginTimestamp > timeoutMilliseconds;
}
public ResponseMessagePacket waitResponse(final long timeoutMilliseconds) throws InterruptedException {
latch.await(timeoutMilliseconds, TimeUnit.MILLISECONDS);
return response;
}
public void putResponse(ResponseMessagePacket response) throws InterruptedException {
this.response = response;
latch.countDown();
}
}
接著需要新增一個HashMap
去快取這些返送成功但是未得到響應處理的ResponseFuture
:
Map<String /* request id */, ResponseFuture> RESPONSE_FUTURE_TABLE = Maps.newConcurrentMap();
這裡的KEY
選用requestId
,而requestId
之前已經定義為UUID
,確保每個請求不會重複。為了簡單起見,目前所有的邏輯都編寫在契約代理工廠ContractProxyFactory
,新增下面的功能:
- 新增一個同步傳送方法
sendRequestSync()
處理訊息包的傳送和同步響應,RequestMessagePacket
轉換為呼叫代理目標方法返回值型別的邏輯暫時也編寫在此方法中。 - 新增一個核心執行緒數量為邏輯核心數量 * 2的執行緒池用於處理請求。
- 新增一個單執行緒的排程執行緒池用於定時清理那些過期的
ResponseFuture
,清理方法為scanResponseFutureTable()
。
修改後的ContractProxyFactory
如下:
@Slf4j
public class ContractProxyFactory {
private static final RequestArgumentExtractor EXTRACTOR = new DefaultRequestArgumentExtractor();
private static final ConcurrentMap<Class<?>, Object> CACHE = Maps.newConcurrentMap();
static final ConcurrentMap<String /* request id */, ResponseFuture> RESPONSE_FUTURE_TABLE = Maps.newConcurrentMap();
// 定義請求的最大超時時間為3秒
private static final long REQUEST_TIMEOUT_MS = 3000;
private static final ExecutorService EXECUTOR;
private static final ScheduledExecutorService CLIENT_HOUSE_KEEPER;
private static final Serializer SERIALIZER = FastJsonSerializer.X;
@SuppressWarnings("unchecked")
public static <T> T ofProxy(Class<T> interfaceKlass) {
// 快取契約介面的代理類例項
return (T) CACHE.computeIfAbsent(interfaceKlass, x ->
Proxy.newProxyInstance(interfaceKlass.getClassLoader(), new Class[]{interfaceKlass}, (target, method, args) -> {
RequestArgumentExtractInput input = new RequestArgumentExtractInput();
input.setInterfaceKlass(interfaceKlass);
input.setMethod(method);
RequestArgumentExtractOutput output = EXTRACTOR.extract(input);
// 封裝請求引數
RequestMessagePacket packet = new RequestMessagePacket();
packet.setMagicNumber(ProtocolConstant.MAGIC_NUMBER);
packet.setVersion(ProtocolConstant.VERSION);
packet.setSerialNumber(SerialNumberUtils.X.generateSerialNumber());
packet.setMessageType(MessageType.REQUEST);
packet.setInterfaceName(output.getInterfaceName());
packet.setMethodName(output.getMethodName());
packet.setMethodArgumentSignatures(output.getMethodArgumentSignatures().toArray(new String[0]));
packet.setMethodArguments(args);
Channel channel = ClientChannelHolder.CHANNEL_REFERENCE.get();
return sendRequestSync(channel, packet, method.getReturnType());
}));
}
/**
* 同步傳送請求
*
* @param channel channel
* @param packet packet
* @return Object
*/
static Object sendRequestSync(Channel channel, RequestMessagePacket packet, Class<?> returnType) {
long beginTimestamp = System.currentTimeMillis();
ResponseFuture responseFuture = new ResponseFuture(packet.getSerialNumber(), REQUEST_TIMEOUT_MS);
RESPONSE_FUTURE_TABLE.put(packet.getSerialNumber(), responseFuture);
try {
// 獲取到承載響應Packet的Future
Future<ResponseMessagePacket> packetFuture = EXECUTOR.submit(() -> {
channel.writeAndFlush(packet).addListener((ChannelFutureListener)
future -> responseFuture.setSendRequestSucceed(true));
return responseFuture.waitResponse(REQUEST_TIMEOUT_MS - (System.currentTimeMillis() - beginTimestamp));
});
ResponseMessagePacket responsePacket = packetFuture.get(
REQUEST_TIMEOUT_MS - (System.currentTimeMillis() - beginTimestamp), TimeUnit.MILLISECONDS);
if (null == responsePacket) {
// 超時導致響應包獲取失敗
throw new SendRequestException(String.format("ResponseMessagePacket獲取超時,請求ID:%s", packet.getSerialNumber()));
} else {
ByteBuf payload = (ByteBuf) responsePacket.getPayload();
byte[] bytes = ByteBufferUtils.X.readBytes(payload);
return SERIALIZER.decode(bytes, returnType);
}
} catch (Exception e) {
log.error("同步傳送請求異常,請求包:{}", JSON.toJSONString(packet), e);
if (e instanceof RuntimeException) {
throw (RuntimeException) e;
} else {
throw new SendRequestException(e);
}
}
}
static void scanResponseFutureTable() {
log.info("開始執行ResponseFutureTable清理任務......");
Iterator<Map.Entry<String, ResponseFuture>> iterator = RESPONSE_FUTURE_TABLE.entrySet().iterator();
while (iterator.hasNext()) {
Map.Entry<String, ResponseFuture> entry = iterator.next();
ResponseFuture responseFuture = entry.getValue();
if (responseFuture.timeout()) {
iterator.remove();
log.warn("移除過期的請求ResponseFuture,請求ID:{}", entry.getKey());
}
}
log.info("執行ResponseFutureTable清理任務結束......");
}
static {
int n = Runtime.getRuntime().availableProcessors();
EXECUTOR = new ThreadPoolExecutor(n * 2, n * 2, 0, TimeUnit.SECONDS,
new ArrayBlockingQueue<>(50), runnable -> {
Thread thread = new Thread(runnable);
thread.setDaemon(true);
thread.setName("CLIENT_REQUEST_EXECUTOR");
return thread;
});
CLIENT_HOUSE_KEEPER = new ScheduledThreadPoolExecutor(1, runnable -> {
Thread thread = new Thread(runnable);
thread.setDaemon(true);
thread.setName("CLIENT_HOUSE_KEEPER");
return thread;
});
CLIENT_HOUSE_KEEPER.scheduleWithFixedDelay(ContractProxyFactory::scanResponseFutureTable, 5, 5, TimeUnit.SECONDS);
}
}
接著新增一個客戶端入站處理器,用於通過reuqestId
匹配目標ResponseFuture
例項,同時設定ResponseFuture
例項中的response
屬性為響應包,同時釋放閉鎖:
@Slf4j
public class ClientHandler extends SimpleChannelInboundHandler<ResponseMessagePacket> {
@Override
protected void channelRead0(ChannelHandlerContext ctx, ResponseMessagePacket packet) throws Exception {
log.info("接收到響應包,內容:{}", JSON.toJSONString(packet));
ResponseFuture responseFuture = ContractProxyFactory.RESPONSE_FUTURE_TABLE.get(packet.getSerialNumber());
if (null != responseFuture) {
responseFuture.putResponse(packet);
} else {
log.warn("接收響應包查詢ResponseFuture不存在,請求ID:{}", packet.getSerialNumber());
}
}
}
最後,客戶端啟動類ClientApplication
中新增ClientHandler
到Netty
的處理器流水線中即可:
bootstrap.handler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ch.pipeline().addLast(new LengthFieldBasedFrameDecoder(1024, 0, 4, 0, 4));
ch.pipeline().addLast(new LengthFieldPrepender(4));
ch.pipeline().addLast(new LoggingHandler(LogLevel.DEBUG));
ch.pipeline().addLast(new RequestMessagePacketEncoder(FastJsonSerializer.X));
ch.pipeline().addLast(new ResponseMessagePacketDecoder());
ch.pipeline().addLast(new ClientHandler());
}
});
先執行之前- 《基於Netty和SpringBoot實現一個輕量級RPC框架-Server篇》中編寫好的ServerApplication
,再啟動ClientApplication
,日誌輸出如下:
// 服務端
2020-01-18 14:32:59 [nioEventLoopGroup-3-2] INFO club.throwable.server.ServerHandler - 服務端接收到:RequestMessagePacket(interfaceName=club.throwable.contract.HelloService, methodName=sayHello, methodArgumentSignatures=[java.lang.String], methodArguments=[PooledUnsafeDirectByteBuf(ridx: 0, widx: 11, cap: 11/144)])
2020-01-18 14:32:59 [nioEventLoopGroup-3-2] INFO club.throwable.server.ServerHandler - 查詢目標實現方法成功,目標類:club.throwable.server.contract.DefaultHelloService,宿主類:club.throwable.server.contract.DefaultHelloService,宿主方法:sayHello
2020-01-18 14:32:59 [nioEventLoopGroup-3-2] INFO club.throwable.server.ServerHandler - 服務端輸出:{"attachments":{},"errorCode":200,"magicNumber":10086,"message":"Success","messageType":"RESPONSE","payload":"\"throwable say hello!\"","serialNumber":"21d131d26fc74f91b4691e0207826b90","version":1}
// 客戶端
2020-01-18 14:32:59 [nioEventLoopGroup-2-1] INFO club.throwable.client.ClientHandler - 接收到響應包,內容:{"attachments":{},"errorCode":200,"magicNumber":10086,"message":"Success","messageType":"RESPONSE","payload":{"contiguous":true,"direct":true,"readOnly":false,"readable":true,"writable":false},"serialNumber":"21d131d26fc74f91b4691e0207826b90","version":1}
2020-01-18 14:32:59 [main] INFO c.throwable.client.ClientApplication - HelloService[throwable]呼叫結果:"throwable say hello!"
2020-01-18 14:33:04 [CLIENT_HOUSE_KEEPER] INFO c.t.client.ContractProxyFactory - 開始執行ResponseFutureTable清理任務......
2020-01-18 14:33:04 [CLIENT_HOUSE_KEEPER] WARN c.t.client.ContractProxyFactory - 移除過期的請求ResponseFuture,請求ID:21d131d26fc74f91b4691e0207826b90
可見非同步執行緒模型已經被改造為同步化,現在可以通過契約介面通過RPC
同步呼叫服務端。
小結
Client
端的請求-響應同步化處理基本改造完畢,到此為止,一個RPC
框架大致已經完成,接下來會對Client
端和Server
端進行一些改造,讓契約相關元件託管到IOC
容器,實現契約介面自動注入等等功能。
Demo
專案地址:
- ch0-custom-rpc-protocol
(本文完e-a-20200118 c-2-d