徒手擼一個簡單的RPC框架(2)——專案改造
徒手擼一個簡單的RPC框架(2)——專案改造
在上一篇的徒手擼一個簡單的RPC框架中再最後的伺服器和客戶端連線的時候只是簡單的寫了Socket連線,覺得有些不妥。正好最近學習了Netty,在平時工作中沒機會運用,於是自己就給自己出需求將之前的專案改造一下。
Netty是什麼?
在學習Netty之前呢我們首先得了解IO和NIO
IO模型
IO程式設計模型簡單來說就是上一篇我寫的服務端與客戶端的連線,客戶端與服務端建立連線通訊後,必須等待服務端返回資訊才能進行下一步的動作,這期間執行緒一直是等待狀態。IO模型在客戶端較少的情況下是沒問題的,但是一旦有大量客戶端與服務端進行連線,那麼就會出問題。我們簡單的分析一下原因。
- 首先我之前寫的程式碼其實是有問題的,為什麼呢?因為每次連線通訊一次我就將其連線關閉了。Socket連線時TCP,雙方每次建立連線時都會耗費時間和資源,不能每通訊一次就關閉連線。就好比你和別人打電話你說一句對方說一句,然後掛電話,然後再打過去。肯定是不能這麼做的
- 如果是想要保持通訊,那麼程式中就得將其監聽的程式碼放入
while
迴圈中用專門的執行緒來維護。但是執行緒是作業系統中非常寶貴的資源,每個作業系統能建立的執行緒也是有限的。 - CPU頻繁的線上程之間切換是非常損耗效能的。
- 我們可以看到之前編寫的程式碼中客戶端與服務端交流的媒介是位元組流,效率不高。
IO模型有這麼多的問題,於是在JDK1.4中提出了NIO的概念,就是為了解決以上的問題
NIO模型
我們一一對應上面的問題來看NIO用什麼技術來解決的
第一個問題
第一個問題是程式碼的問題,我們就不討論了
第二個問題
執行緒有限的問題:NIO中提出了Selector
概念,IO中是每個連線都會由一個執行緒阻塞來維護,NIO中用Selector
來管理這些連線,如果有訊息的傳入或傳出,那麼就建立相應的執行緒了處理。這樣伺服器只需要阻塞一個Selector
執行緒,就可以管理多個連線了。
具體的Selector
文章可以看我之前的NIO中選擇器Selector,裡面有介紹詳細的Selector
用法。
這裡舉個例子應該就明白的,好比你去釣魚,IO就是一人一個魚竿,等著魚上來,中間哪也不能去,而NIO就是一個人能守著好幾個魚竿。
這就是NIO模型解決作業系統中執行緒有限的問題。
第三個問題
CPU線上程之間頻繁切換,由於NIO中只管理了一個Selector
執行緒,那麼這個問題也就相應的解決了
第四個問題
NIO中提出了Channel
和Buffer
的概念,就好比在嚮往的生活第一季中摘玉米中,是用竹筐一次一次背快呢?還是接一輛車子來回運送快?當然是車子來回運送快了,而這裡的Buffer
就好比車子。具體的Channel
和Buffer
的解釋可以看我之前的文章Java中IO和NIO和JAVA中NIO再深入。
Netty
那麼為什麼就和Netty扯上關係了呢?其實我覺得NIO之於Netty的關係就好比Servlet之於Tomcat的關係,Netty只是對於NIO進行了進一步的封裝,讓使用者更加簡便的程式設計。
改造
這次改造分為服務端和客戶端的改造
服務端
接下來我們就利用Netty將我們的伺服器端與客戶端連線通訊部分進行改造一下,首先我們先加上對於Netty的依賴
compile 'io.netty:netty-all:4.1.6.Final'
然後編寫服務端的程式碼,服務端的程式碼非常簡單
ServerBootstrap serverBootstrap = new ServerBootstrap();
NioEventLoopGroup boos = new NioEventLoopGroup();
NioEventLoopGroup worker = new NioEventLoopGroup();
serverBootstrap
.group(boos, worker)
.channel(NioServerSocketChannel.class)
.childHandler(new ChannelInitializer<NioSocketChannel>() {
protected void initChannel(NioSocketChannel ch) {
ch.pipeline().addLast(new StringDecoder());
ch.pipeline().addLast(new SimpleChannelInboundHandler<String>() {
@Override
protected void channelRead0(ChannelHandlerContext ctx, String msg) throws Exception {
//獲得實現類處理過後的返回值
String invokeMethodMes = CommonDeal.getInvokeMethodMes(msg);
ByteBuf encoded = ctx.alloc().buffer(4 * invokeMethodMes.length());
encoded.writeBytes(invokeMethodMes.getBytes());
ctx.writeAndFlush(encoded);
}
});
}
}).bind(20006);
這和我們平常寫的Socket連線有些區別,可以看到我們建了兩個NioEventLoopGroup
一個boss一個worker,為什麼會有兩個呢?
從這個圖裡面我們可以看到,boss是專門用來對外連線的,worker則是像NIO中Selector
用來處理各種讀寫的請求。
客戶端
其實難點就是在客戶端,因為Netty是非同步事件驅動的框架,什麼是非同步呢?
客戶端與服務端的任何I/O操作都將立即返回,等待服務端處理完成以後會呼叫指定的回撥函式進行處理。在這個過程中客戶端一直沒有阻塞。所以我們在客戶端與服務端請求處理時,如果獲得非同步處理的結果呢?Netty提供有一種獲取非同步回撥結果的,但是那是新增監聽器。而我們的RPC呼叫在最後返回結果的時候必須得阻塞等待結果的返回,所以我們需要自己寫一個簡單的獲取非同步回撥結果的程式。想法如下。
- 想要獲得服務端返回的訊息時,阻塞等待。
- Netty客戶端讀取到客戶端訊息時,喚醒等待的執行緒
那麼我們就圍繞這兩步來進行編碼。
客戶端想要獲取服務端訊息時如何等待呢?這裡我們就可以用wait()
public Response getMessage(){
synchronized (object){
while (!success){
try {
object.wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
return response;
}
}
那麼讀到訊息以後如何喚醒呢?
public void setMessage(Response response){
synchronized (object){
this.response = response;
this.success = true;
object.notify();
}
}
這樣就解決了我們上面提出的兩個問題了。接下來編寫客戶端的程式碼
private final Map<Long,MessageFuture> futureMap = new ConcurrentHashMap<>();
private CountDownLatch countDownLatch = new CountDownLatch(1);
public void connect(String requestJson,Long threadId){
Bootstrap bootstrap = new Bootstrap();
NioEventLoopGroup group = new NioEventLoopGroup();
bootstrap.group(group).channel(NioSocketChannel.class).handler(new ChannelInitializer<Channel>() {
@Override
protected void initChannel(Channel ch) {
ch.pipeline().
addLast(new StringDecoder()).
addLast(new SimpleChannelInboundHandler<String>() {
@Override
protected void channelRead0(ChannelHandlerContext ctx, String msg) throws Exception {
Gson gson = new Gson();
Response response = gson.fromJson(msg, Response.class);
MessageFuture messageFuture = futureMap.get(threadId);
messageFuture.setMessage(response);
}
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
futureMap.put(threadId,new MessageFuture());
countDownLatch.countDown();
ByteBuf encoded = ctx.alloc().buffer(4 * requestJson.length());
encoded.writeBytes(requestJson.getBytes());
ctx.writeAndFlush(encoded);
}
});
}
}).connect("127.0.0.1", 20006);
}
public Response getResponse(Long threadId){
MessageFuture messageFuture = null;
try {
countDownLatch.await();
} catch (InterruptedException e) {
e.printStackTrace();
}
messageFuture = futureMap.get(threadId);
return messageFuture.getMessage();
}
這裡面我們用到了CountDownLatch
類,即等待發送完訊息以後通知我能獲取資料了。這裡面的程式碼和服務端的差不多,其中有區別的地方就是在傳送資料的時候將執行緒ID和MessageFuture
放入Map中,在得到服務端傳送的資料時取出並放入得到的Response。
總結
到目前為止我們就完成了我們的專案改造,只是簡單的應用了一下Netty的客戶端和服務端的通訊,因為在學習的過程中如果沒有運用的話,那麼感覺記憶沒有那麼牢靠,所以就有了此次的專案改造的計劃。雖然完成了簡單的通訊,但是我知道還有些地方需要優化,例如用synchronized
在以後學習了AQS
以後希望也能夠學以致用將這裡給改一下。