Github專案NettyRpc 閱讀(Netty+多執行緒+AQS+CAS+volatile)
Github專案:https://github.com/luxiaoxun/NettyRpc
Fork: https://github.com/sw008/NettyRpc
此專案很適合學習多執行緒和Netty
RPC呼叫流程
大體思路:客戶端通過ConcurrentHashMap<String, RPCFuture>儲存請求ID和RPCFuture物件,然後把請求物件傳送給服務端,返回RPCFuture物件給呼叫者。服務端處理成功後返回響應物件(包含請求ID)。客戶端輸入流接收響應物件,通過請求ID在ConcurrentHashMap中找到傳送時建立的RPCFuture更新其相應資訊,並更新其AQS的狀態,release喚醒呼叫RPCFuture.get()而掛起的執行緒。
1 客戶端採用JDK動態代理建立ObjectProxy類代理物件,並與服務介面繫結。
2 客戶端呼叫服務介面方法,觸發動態代理物件的ObjectProxy.invoke()
3 客戶端傳送請求, ObjectProxy.invoke(Object proxy, Method method, Object[] args) 是JDK動態代理InvocationHandler介面的方法
3.1 通過method、args,生成RpcRequest類物件(其包含成員變數 requestId、className、 methodName、parameterTypes、parameters)
3.2 ConnectManage.getInstance().chooseHandler() :RpcClientHandler 一個簡單的負載均衡方法,找到應該呼叫的伺服器。因為Netty客服端主機與服務端主機是通過一條Channel連結,每一條Channel代表一個服務端主機。每個RpcClientHandler中包含一個Channel連結服務端,一個ConcurrentHashMap<String, RPCFuture>記錄請求ID和其對應的請求
3.3 RpcClientHandler.sendRequest(RpcRequest request) 將請求物件傳送給服務端主機,等待對方接收成功後,返回RPCFuture物件實現非同步呼叫
RpcClientHandler類
ConcurrentHashMap<String, RPCFuture> pendingRPC;//儲存 請求ID+對應RPCFuture
public RPCFuture sendRequest(RpcRequest request) {
final CountDownLatch latch = new CountDownLatch(1);
//建立自定義非同步請求類RPCFuture物件
RPCFuture rpcFuture = new RPCFuture(request);
//pendingRPC為ConcurrentHashMap<String, RPCFuture> 記錄請求ID和對應非同步請求
//對方伺服器通過channel返回Response物件時,本機輸入流方法 通過pendingRPC+請求ID更新對應RPCFuture狀態
pendingRPC.put(request.getRequestId(), rpcFuture);
//傳送請求RpcRequest,並新增對方接收成功的非同步監聽物件,回撥物件ChannelFutureListener
channel.writeAndFlush(request).addListener(
new ChannelFutureListener() { //例項化 一個匿名區域性內部類物件
//一個非同步監聽物件 ,監聽回撥由Netty框架實現
//服務端接收到後 回撥此匿名內部類物件 的方法 (注意不是對方處理完回撥)
@Override
public void operationComplete(ChannelFuture future) {
//此處使用區域性內部類的閉包特性,此區域性內部類物件可呼叫此方法的區域性變數latch
//對方接受成功,通過CountDownLatch喚醒當前執行緒
latch.countDown();
}
});
try {
//當前執行緒掛起 等待接收監聽回撥喚醒
latch.await();
} catch (InterruptedException e) {
logger.error(e.getMessage());
}
//先返回RPCFuture,此時只代表請求送達,但是對方伺服器可能還沒有處理完成
return rpcFuture;
}
4 服務端接收處理資訊
服務端RpcHandler類繼承Netty的SimpleChannelInboundHandler並實現channelRead0()方法,接收客戶端資訊,並通過反射執行。
RpcHandler類
public void channelRead0(final ChannelHandlerContext ctx,final RpcRequest request) throws Exception {
//接到資訊後,直接提交到RpcServer中的執行緒池執行
RpcServer.submit(new Runnable() {
//同樣用到了區域性內部類的閉包特性,可以呼叫當前方法區域性變數
@Override
public void run() {
RpcResponse response = new RpcResponse();
//例項化RpcResponse 並裝配資訊
response.setRequestId(request.getRequestId());
try {
Object result = handle(request);
response.setResult(result);
} catch (Throwable t) {
response.setError(t.toString());
}
//傳送response到客戶端
ctx.writeAndFlush(response).addListener(new ChannelFutureListener() {
//新增非同步監聽物件,傳送成功後回撥此物件方法
@Override
public void operationComplete(ChannelFuture channelFuture) throws Exception {
logger.debug("Send response for request " + request.getRequestId());
}
});
}
});
}
5 客戶端接收響應資訊
RpcClientHandler類繼承Netty的SimpleChannelInboundHandler並實現channelRead0方法,接收服務端響應資訊。
可以發現客戶端傳送請求和接收響應的方法都是RpcClientHandler類實現,因為傳送和接收需要依靠同一個pendingRPC進行結果匹配,傳送時將RPCFuture放入其中,接收響應後通過請求ID更新對應RPCFuture。
RpcClientHandler類
//客戶端 收到響應資訊
ConcurrentHashMap<String, RPCFuture> pendingRPC;
@Override
public void channelRead0(ChannelHandlerContext ctx, RpcResponse response) throws Exception {
//用過請求ID 在pendingRPC中找到傳送時儲存的RPCFuture
String requestId = response.getRequestId();
//pendingRPC儲存了傳送時的RPCFuture
RPCFuture rpcFuture = pendingRPC.get(requestId);
if (rpcFuture != null) {
pendingRPC.remove(requestId);
//更新對應rpcFuture,並且喚醒已經執行rpcFuture.get()的所有執行緒
rpcFuture.done(response);
}
}
6 RPCFuture類實現了Future介面,並通過AQS實現執行緒的掛起與喚醒。
sync物件實現了AbstractQueuedSynchronizer的tryRelease,tryAcquire方法。
當執行rpcFuture.done(response)時,將AQS中volatile int state通過CAS設定為1,喚醒已經執行rpcFuture.get()的所有執行緒。
RPCFuture類
//5中,接收到服務端響應後執行的方法rpcFuture.done(response);
public void done(RpcResponse reponse) {
this.response = reponse;
//sync為AQS物件,通過CAS更新AQS中的狀態值volatile int state;
sync.release(1);
invokeCallbacks();
// Threshold
long responseTime = System.currentTimeMillis() - startTime;
if (responseTime > this.responseTimeThreshold) {
logger.warn("Service response time is too slow. Request id = " + reponse.getRequestId() + ". Response Time = " + responseTime + "ms");
}
}
當前程執行rpcFuture.get()時,判斷AQS中的volatile int state=1 ?,若還沒有響應資訊則當前執行緒進入掛起狀態。
RPCFuture類
@Override
public Object get() throws InterruptedException, ExecutionException {
//AQS中的狀態值volatile int state,判斷對方伺服器時候已經響應;
sync.acquire(-1);
if (this.response != null) {
return this.response.getResult();
} else {
return null;
}
}
6 Sync類,是RPCFuture的靜態內部類。通過CAS控制volatile int state=1,決定呼叫執行緒是否需要掛起。volatile保證了可見性, CAS保證了原子性,整個過程是執行緒安全。使比較+賦值成為一個原子性操作,不會被其他執行緒打擾。可以把CAS理解成多執行緒的序列執行,再加上volatile的可見性有序性保障,所以是執行緒安全的。
AQS物件.acquire:請求資源,tryAcquire==false時掛起執行緒
AQS物件.release:釋放資源,tryRelease==true時喚醒一個掛起執行緒
http://www.cnblogs.com/waterystone/p/4920797.html
static class Sync extends AbstractQueuedSynchronizer {
private static final long serialVersionUID = 1L;
//future status
private final int done = 1;
private final int pending = 0;
@Override
//獲取資源
protected boolean tryAcquire(int arg) {
//判斷當前 volatile int state=1
//返回false時,當前執行緒掛起
return getState() == done;
}
@Override
//釋放資源
protected boolean tryRelease(int arg) {
if (getState() == pending) {
//CAS設定 volatile int state=1
//CAS保證操作原子性,執行緒安全
if (compareAndSetState(pending, done)) {
//因為只有傳送執行緒會執行其請求對應的RPCFuture的get方法,所以只會有一個執行緒掛起等待
//返回true時,AQS框架會喚醒第一個等待執行緒
return true;
} else {
return false;
}
} else {
return true;
}
}
public boolean isDone() {
getState();
return getState() == done;
}
}