NIO單一長連線Dubbo通訊模型實現
前言
前一段時間看了下Dubbo,原想將Dubbo詳細總結下來,從使用簡介、SPI擴充套件機制、Spring的schema擴充套件、啟動過程、動態註冊與發現、分層設計、通訊設計、執行緒模型等方面來總結,但是越看越發現架子太大,涉及的點太廣,反而RPC的思想其實已經印象深刻了,再來總結這麼多的點似乎不太值得,因為不懂的東西才是最有價值的,所以有了本文,將個人認為dubbo中比較有特色的通訊模型總結於此,本文是一個demo,當然不乏一些腦補的東西在裡面,如您偶然閱讀此文發現問題,還請不吝指出問題所在。
作者:峽客 連結:https://www.jianshu.com/p/13bef2795c44 來源:簡書 簡書著作權歸作者所有,任何形式的轉載都請聯絡作者獲得授權並註明出處。
BIO通訊缺陷
為了相對更好的理解dubbo這個通訊模型的優勢,首先需要回顧一下BIO通訊的缺陷。
為此引用一下我之前借鑑dubbo作者樑飛的部落格(RPC框架幾行程式碼就夠了)而實現的一個用BIO實現的RPC示例:RPC原理簡析——三分鐘看完,這裡使用的是BIO的socket和ServerSocket實現通訊,用流來寫入寫出資料。BIO的完整連線示意圖如下:
缺陷一、IO阻塞
可以看出,在BIO中,除了建立連線比較耗時之外,在客戶端將資料傳輸到服務端之前,服務端的IO(輸入流)阻塞,然後在服務端將返回值傳輸回來之前,客戶端的IO(輸入流)阻塞。也就是說在一次RPC呼叫開始到完成之前,這個連線一直被此次呼叫所佔用,但是實際上這次呼叫中,真正需要網路連線的只有中間的資料傳輸過程,在客戶端寫出和服務端讀取並執行遠端方法這兩個時間點,其實網路連線是空閒的。這就是BIO連線中浪費了網路資源的地方。
缺陷二、大量連線
由於BIO的IO阻塞,導致每次RPC呼叫會佔用一個連線。而正因為如此,為了減少頻繁建立連線消耗的時間,引入了連線池的概念,連線池解決了頻繁建立連線的資源消耗,但是沒有解決根本性的阻塞問題。而且在服務消費者(客戶端)數量遠大於服務提供者(服務端)數量的時候,會導致服務提供者建立了大量的連線,而本身由於硬體資源的限制,單機最大連線數是有限的(這個限制以前是1w,也就是以前的C10K問題,據說近幾年已經提升至50萬個連線),所以在服務消費者過多,而服務提供者數量過少的情況下,服務提供者有因為過多的連線而被拖垮的風險(這需要極大的併發數,每秒上百萬次的呼叫)。當然,要解決這個問題,增加機器,從而增加服務提供者數量是可以解決的,但是沒有充分利用單機效能。
建立大量連線的另一個弊端,是作業系統頻繁的執行緒上下文切換,因為連線數過多,執行緒切換頻繁,會消耗大量的資源,而且這些切換可能不是必要的。比如當前建立了大量的連線,可能大部分處於阻塞狀態,根本沒有挨個挨個切換的必要。但是因為作業系統任務排程時並不會忽略阻塞狀態的執行緒,所以造成浪費。
NIO單一長連線實現分析
長連線巨集觀簡介
長連線其實不算NIO這裡的特點,因為BIO也可以實現長連線(每次寫完資料之後手動寫入結束符而不關閉流就可以了),而且連線池一般也是使用的長連線方式。
NIO真正解決的是阻塞問題,因為阻塞問題解決了,所以也就不需要大量連線了。由於篇幅問題,此處不討論NIO的細節(API),從相對巨集觀的角度介紹大體角色和呼叫方式,NIO三種角色示意如下圖:
NIO由三種角色組成,Selector、SocketChannel、Buffer。
SocketChannel相當於BIO中的Socket,分為SocketChannel和ServerSocketChannel兩種,是真正建立連線並傳輸資料的管道。這個管道不同於BIO的Socket的點就是,這個管道可以被多個執行緒共用,執行緒A使用這個管道寫出資料了之後,執行緒B還可以使用這個管道寫出資料,不再被某一次呼叫所獨佔。所以就可以不需要像BIO一樣建立那麼多的連線,一個客戶端的一個連線就夠了(當然,實際應用中因為機器都是多核,實際上建立核數個連線個人感覺是比較好的)。
Buffer是用來與SocketChannel互通資料的物件,本質上是一塊記憶體區域。SocketChannel是不支援直接讀寫資料的,所有的讀寫操作必須通過Buffer來實現。值得一提的是,我們經常說在JVM中有的時候會使用虛擬機器之外的記憶體,說的就是NIO中的Buffer,在分配記憶體的時候可以選擇使用虛擬機器外記憶體,減少資料的複製。
Selector是用來監控SocketChannel的事件的,其實是實現非阻塞的關鍵。NIO是基於事件的,將BIO中的流式傳輸改為了事件機制。BIO中,一個連線擁有一個輸入/輸出流,只要資料傳輸完成,流就可以讀取資料。在NIO中,Selector定義了四種事件,OP_READ、OP_WRITE、OP_CONNECT、OP_ACCEPT。當服務端或者客戶端收到寫入完成的一次資料時,會觸發OP_READ事件,此時可以從連線中讀取資料。同理,當可以往連線中寫入資料的時候,觸發OP_WRITE事件(但是一般情況下這個事件沒有必要,因為連線一般都是可寫的)。客戶端與服務端建立連線的時候,客戶端會收到OP_CONNECT事件,而服務端會觸發OP_ACCEPT事件。通過這一系列事件將資料的傳送與讀寫解耦,實現非同步呼叫。將一個SocketChannel + 一個事件繫結在一個Selector上,Selector本質上是輪詢每一個SocketChannel,如果沒有事件觸發,那麼執行緒阻塞,如果有事件觸發,返回對應的SocketChannel,以便進行後續的處理。
使用NIO設計RPC呼叫分析
前面提到,由於NIO的SocketChannel是非阻塞的,所以不再需要連線池,使用一個連線就夠了。
但是如果真的使用NIO來進行RPC呼叫的話,會有資料和呼叫方對應不上的問題,如下圖:
如上圖所示,如果多個執行緒共用一個連線,那麼每個執行緒呼叫之後返回的順序是不可控的,所以有可能先發出資料的反而後得到返回值,這就使得資料對應不上了。個人覺得因為這一點,NIO及其適合聊天室型別的設計,因為每個聊天方都是一個單獨的SocketChannel連線,而此時並沒有順序問題。
但是對RPC呼叫來說,每次呼叫的返回值必須與呼叫方對應上,為此,Dubbo的設計是給每個請求設計一個請求id,在傳送請求與傳送返回值時都帶上這個id。詳細思路如下圖:
業務執行緒在發出請求之前,需要儲存一個請求物件,同時掛起相應的業務執行緒(掛起不會被任務排程,所以不存線上程切換消耗),這個請求物件包含了此次請求的id,然後在獲取服務端返回的資料的時候,解析出這個id,通過這個id取出請求物件,並喚醒對應的執行緒。
NIO單一長連線實現demo
廢話了這麼多,終於可以上程式碼了,我這裡服務端使用了執行緒池去執行遠端方法,使得真正的服務端執行緒只需要讀取資料就可以了。可能作為demo來講,寫的有些複雜,但是理解的時候以RpcNioConsumer和RpcNioProvider作為入口就比較好梳理了。NIO的主要實現在RpcNioMultClient和RpcNioMultServer中。
工具類
序列化/反序列化工具類
1package com.github.logsys.rpc.nio;
2
3import lombok.extern.slf4j.Slf4j;
4
5import java.io.*;
6
7/**
8 * Created by liuchunlong on 2018/10/16.
9 * <p>
10 * 序列化工具
11 */
[email protected]
13public class SerializeUtil {
14
15 public static byte[] serialize(Object obj) {
16 try {
17 ByteArrayOutputStream bos = new ByteArrayOutputStream();
18 ObjectOutputStream oos = new ObjectOutputStream(bos);
19 oos.writeObject(obj);
20 oos.flush();
21 return bos.toByteArray();
22 } catch (IOException e) {
23 log.info("序列化物件出錯!");
24 e.printStackTrace();
25 return null;
26 }
27 }
28
29 public static Object unSerialize(byte[] bytes) {
30 try {
31 ByteArrayInputStream bis = new ByteArrayInputStream(bytes);
32 ObjectInputStream ois = new ObjectInputStream(bis);
33 return ois.readObject();
34 } catch (IOException | ClassNotFoundException e) {
35 log.info("反序列化出錯!");
36 e.printStackTrace();
37 return null;
38 }
39 }
40}
服務端程式碼
請求時的資料結構:
1package com.github.logsys.rpc.nio;
2
3import lombok.Data;
4
5import java.io.Serializable;
6
7/**
8 * Created by liuchunlong on 2018/10/17.
9 */
[email protected]
11public class RequestMultObject implements Serializable {
12
13 private static final long serialVersionUID = 3132836600205356306L;
14
15 public RequestMultObject(Class<?> calzz, String methodName, Class<?>[] paramTypes, Object[] args) {
16 this.calzz = calzz;
17 this.methodName = methodName;
18 this.paramTypes = paramTypes;
19 this.args = args;
20 }
21
22 /**
23 * 請求ID
24 */
25 private Long requestId;
26
27 /**
28 * 服務提供者介面
29 */
30 private Class<?> calzz;
31
32 /**
33 * 請求的方法名稱
34 */
35 private String methodName;
36
37 /**
38 * 引數型別
39 */
40 private Class<?>[] paramTypes;
41
42 /**
43 * 引數
44 */
45 private Object[] args;
46}
RPC呼叫的介面:
1package com.github.logsys.rpc.nio;
2
3/**
4 * Created by liuchunlong on 2018/10/16.
5 */
6public interface HelloService {
7
8 String hello(String name);
9
10}
服務端的介面實現:
1package com.github.logsys.rpc.nio;
2
3/**
4 * Created by liuchunlong on 2018/10/16.
5 */
6public class HelloServiceImpl implements HelloService {
7
8 public String hello(String name) {
9 return "Hello " + name;
10 }
11
12}
服務端服務釋出時使用的bean容器:
1package com.github.logsys.rpc.nio;
2
3import java.util.concurrent.ConcurrentHashMap;
4
5/**
6 * Created by liuchunlong on 2018/10/17.
7 */
8public class BeanContainer {
9
10 private static ConcurrentHashMap<Class<?>, Object> container = new ConcurrentHashMap<>();
11
12 public static boolean addBean(Class<?> clazz, Object object) {
13 container.put(clazz, object);
14 return true;
15 }
16
17 public static Object getBean(Class<?> clazz) {
18 return container.get(clazz);
19 }
20}
服務端,用於與客戶端建立連線、讀取資料:
1package com.github.logsys.rpc.nio;
2
3import lombok.extern.slf4j.Slf4j;
4
5import java.io.IOException;
6import java.net.InetSocketAddress;
7import java.nio.ByteBuffer;
8import java.nio.channels.SelectionKey;
9import java.nio.channels.Selector;
10import java.nio.channels.ServerSocketChannel;
11import java.nio.channels.SocketChannel;
12import java.util.Iterator;
13
14/**
15 * Created by liuchunlong on 2018/10/16.
16 */
[email protected]
18public class RpcNioMultServer {
19
20 // 多路複用器
21 private Selector selector;
22
23 public static void start() throws IOException {
24 RpcNioMultServer server = new RpcNioMultServer();
25 server.initServer(8080);
26 server.listen();
27 }
28
29 /**
30 * 建立一個ServerSocket通道,並對該通道做一些初始化的工作
31 *
32 * @param port 繫結的埠號
33 * @throws IOException
34 */
35 public void initServer(int port) throws IOException {
36 // 建立一個ServerSocket通道
37 ServerSocketChannel serverChannel = ServerSocketChannel.open();
38 // 設定通道為非阻塞
39 serverChannel.configureBlocking(false);
40 // 將該通道對應的ServerSocket繫結到port埠
41 serverChannel.socket().bind(new InetSocketAddress(port));
42
43 // 建立一個多路複用器
44 this.selector = Selector.open();
45
46 // 將該通道註冊到多路複用器,併為該通道註冊SelectionKey.OP_ACCEPT事件,註冊該事件後,
47 // 當該事件到達時,selector.select()會返回,如果該事件沒到達selector.select()會一直阻塞。
48 serverChannel.register(selector, SelectionKey.OP_ACCEPT);
49 }
50
51 public void listen() {
52 log.info("服務端啟動成功!");
53 // 輪詢訪問selector
54 while (true) {
55 try {
56 // 當註冊的事件到達時,方法返回;否則,該方法會一直阻塞
57 selector.select();
58 Iterator ite = selector.selectedKeys().iterator();
59 while (ite.hasNext()) {
60 SelectionKey key = (SelectionKey) ite.next();
61 // 刪除已選的key,以防重複處理
62 ite.remove();
63 // 客戶端請求連線事件
64 if (key.isAcceptable()) {
65 ServerSocketChannel server = (ServerSocketChannel) key.channel();
66 // 獲得和客戶端連線的通道
67 SocketChannel channel = server.accept();
68 // 設定成非阻塞
69 channel.configureBlocking(false);
70
71 // 在和客戶端連線成功之後,為了可以接收到客戶端的資訊,需要給通道設定讀的許可權。
72 channel.register(this.selector, SelectionKey.OP_READ);
73
74 // 獲得了可讀的事件
75 } else if (key.isReadable()) {
76 SocketChannel channel = (SocketChannel) key.channel();
77 byte[] bytes = readMsgFromClient(channel);
78 if (bytes != null && bytes.length > 0) {
79 // 讀取之後將任務放入執行緒池非同步返回
80 RpcNioMultServerTask task = new RpcNioMultServerTask(bytes, channel);
81 ThreadPoolUtil.addTask(task);
82 }
83 }
84 }
85 } catch (IOException e) {
86 e.printStackTrace();
87 }
88
89 }
90 }
91
92 public byte[] readMsgFromClient(SocketChannel channel) {
93 ByteBuffer byteBuffer = ByteBuffer.allocate(4);
94 try {
95 // 首先讀取訊息頭(自己設計的協議頭,此處是訊息體的長度)
96 int headCount = channel.read(byteBuffer);
97 if (headCount < 0) {
98 return null;
99 }
100 byteBuffer.flip();
101 int length = byteBuffer.getInt();
102 // 讀取訊息體
103 byteBuffer = ByteBuffer.allocate(length);
104 int bodyCount = channel.read(byteBuffer);
105 if (bodyCount < 0) {
106 return null;
107 }
108 return byteBuffer.array();
109 } catch (IOException e) {
110 log.info("讀取資料異常");
111 e.printStackTrace();
112 return null;
113 }
114 }
115}
執行緒池工具類:
1package com.github.logsys.rpc.nio;
2
3import java.util.concurrent.LinkedBlockingDeque;
4import java.util.concurrent.ThreadPoolExecutor;
5import java.util.concurrent.TimeUnit;
6
7/**
8 * Created by liuchunlong on 2018/10/16.
9 */
10public class ThreadPoolUtil {
11
12 private static volatile ThreadPoolExecutor executor;
13
14 public static void init() {
15 if (executor == null) {
16 synchronized (ThreadPoolUtil.class) {
17 executor = new ThreadPoolExecutor(10, 20, 200, TimeUnit.MILLISECONDS, new LinkedBlockingDeque<>());
18 }
19 }
20 }
21
22 public static void addTask(RpcNioMultServerTask task) {
23 if (executor == null) {
24 init();
25 }
26 executor.execute(task);
27 }
28}
用於獲取資料執行遠端方法的執行緒池任務類:
1package com.github.logsys.rpc.nio;
2
3import java.io.IOException;
4import java.lang.reflect.InvocationTargetException;
5import java.lang.reflect.Method;
6import java.nio.ByteBuffer;
7import java.nio.channels.SocketChannel;
8
9/**
10 * Created by liuchunlong on 2018/10/16.
11 */
12public class RpcNioMultServerTask implements Runnable {
13
14 private byte[] bytes;
15
16 private SocketChannel channel;
17
18 public RpcNioMultServerTask(byte[] bytes, SocketChannel channel) {
19 this.bytes = bytes;
20 this.channel = channel;
21 }
22
23 @Override
24 public void run() {
25 if (bytes != null && bytes.length > 0 && channel != null) {
26 // 反序列化
27 RequestMultObject requestMultObject = (RequestMultObject) SerializeUtil.unSerialize(bytes);
28 // 呼叫服務並序列化結果然後返回
29 requestHandle(requestMultObject, channel);
30 }
31 }
32
33 public void requestHandle(RequestMultObject requsetObject, SocketChannel channel) {
34
35 Long requestId = requsetObject.getRequestId(); // 請求ID
36 Object obj = BeanContainer.getBean(requsetObject.getCalzz()); // 根據請求型別,獲取服務端的實現
37 String methodName = requsetObject.getMethodName(); // 請求的方法名稱
38 Class<?>[] parameterTypes = requsetObject.getParamTypes(); // 請求的引數型別
39 Object[] arguments = requsetObject.getArgs(); // 請求的引數
40 try {
41 Method method = obj.getClass().getMethod(methodName, parameterTypes);
42 Object result = method.invoke(obj, arguments);
43 byte[] bytes = SerializeUtil.serialize(result);
44 ByteBuffer buffer = ByteBuffer.allocate(bytes.length + 12);
45 // 為了便於客戶端獲得請求ID,直接將id寫在頭部(這樣客戶端直接解析即可獲得,不需要將所有訊息反序列化才能得到)
46 // 然後寫入訊息題的長度,最後寫入返回內容
47 buffer.putLong(requestId);
48 buffer.putInt(bytes.length);
49 buffer.put(bytes);
50 buffer.flip();
51 channel.write(buffer);
52 } catch (NoSuchMethodException | IllegalAccessException | InvocationTargetException | IOException e) {
53 e.printStackTrace();
54 }
55 }
56
57 public SocketChannel getChannel() {
58 return channel;
59 }
60
61 public void setChannel(SocketChannel channel) {
62 this.channel = channel;
63 }
64}
服務端服務釋出:
1package com.github.logsys.rpc.nio;
2
3import java.io.IOException;
4
5/**
6 * Created by liuchunlong on 2018/10/17.
7 */
8public class RpcNioProvider {
9
10 public static void main(String[] args) throws IOException {
11
12 // 將服務實現放進bean容器
13 HelloService helloService = new HelloServiceImpl();
14 BeanContainer.addBean(HelloService.class, helloService);
15 // 啟動NIO服務端
16 startMultRpcNioServer();
17 }
18
19 public static void startMultRpcNioServer() {
20 Runnable r = () -> {
21 try {
22 RpcNioMultServer.start();
23 } catch (IOException e) {
24 e.printStackTrace();
25 }
26 };
27 Thread t = new Thread(r);
28 t.start();
29 }
30}
客戶端程式碼
客戶端:
1package com.github.logsys.rpc.nio;
2
3import lombok.extern.slf4j.Slf4j;
4
5import java.io.IOException;
6import java.net.InetSocketAddress;
7import java.nio.ByteBuffer;
8import java.nio.channels.SelectionKey;
9import java.nio.channels.Selector;
10import java.nio.channels.SocketChannel;
11import java.util.Iterator;
12
13/**
14 * Created by liuchunlong on 2018/10/17.
15 */
[email protected]
17public class RpcNioMultClient {
18
19 private static volatile RpcNioMultClient rpcNioClient;
20
21 private Selector selector; // 多路複用器
22 private SocketChannel channel; // Socket通道
23
24 private String host = "localhost"; // 伺服器IP
25 private int port = 8080; // 伺服器埠
26
27 private RpcNioMultClient() {
28 // 初始化client
29 initClient();
30 Runnable runnable = new Runnable() {
31 @Override
32 public void run() {
33 listen();
34 }
35 };
36 Thread t = new Thread(runnable);
37 t.start();
38 }
39
40 public static RpcNioMultClient getInstance() {
41 if (rpcNioClient == null) {
42 synchronized (RpcNioMultClient.class) {
43 if (rpcNioClient == null) {
44 rpcNioClient = new RpcNioMultClient();
45 }
46 }
47 }
48 return rpcNioClient;
49 }
50
51 public void initClient() {
52 try {
53 // 建立Socket通道
54 channel = SocketChannel.open();
55 // 設定為非同步非阻塞模式
56 channel.configureBlocking(false);
57
58 // 建立多路複用器,用於監聽通道事件
59 selector = Selector.open();
60
61 // 建立連線
62 channel.connect(new InetSocketAddress(host, port));
63 // 判斷此通道上是否正在進行連線操作。
64 // 當且僅當已在此通道上發起連線操作,但是尚未通過呼叫 finishConnect 方法完成連線時才返回 true
65 if (channel.isConnectionPending()) {
66 while (!channel.finishConnect()) {
67 //wait, or do something else...
68 }
69 }
70
71 // 如果直接連線成功,則註冊到多路複用器,傳送請求訊息,並讀取應答
72// if (channel.connect(new InetSocketAddress(host, port))) {
73// channel.register(selector, SelectionKey.OP_READ);
74// } else { // 直接連線失敗,此通道處於非阻塞模式,並且連線操作正在進行中
75// channel.register(selector, SelectionKey.OP_CONNECT);
76// }
77
78 log.info("客戶端初始化完成,建立連線完成");
79 } catch (IOException e) {
80 e.printStackTrace();
81 }
82 }
83
84 public void listen() {
85 try {
86 while (true) {
87 selector.select();
88 Iterator ite = selector.selectedKeys().iterator();
89 while (ite.hasNext()) {
90 SelectionKey key = (SelectionKey) ite.next();
91 // 刪除已選的key,以防重複處理
92 ite.remove();
93
94 if (key.isValid()) {
95
96// SocketChannel socketChannel = (SocketChannel) key.channel();
97// if (key.isConnectable()) { // 連線操作完成
98// // 連線操作完成,即伺服器返回了ACK應答資訊。
99// // 這時,我們需要對連線結果進行判斷,呼叫socketChannel.finishConnect(),
100// // 如果返回值true,說明連線成功;如果返回值false,說明正在進行連線;或者丟擲IOException,說明連線失敗
101// if (socketChannel.finishConnect()) {
102// socketChannel.register(selector, SelectionKey.OP_READ);
103// }
104// }
105
106 if (key.isReadable()) {
107 // 讀取資訊
108 readMsgFromServer();
109 }
110 }
111 }
112 }
113 } catch (IOException e) {
114 log.info("客戶端建立連線失敗");
115 }
116 }
117
118 public boolean sendMsg2Server(byte[] bytes) {
119 try {
120 ByteBuffer buffer = ByteBuffer.allocate(4 + bytes.length);
121 // 放入訊息長度,然後放入訊息體
122 buffer.putInt(bytes.length);
123 buffer.put(bytes);
124 buffer.flip();
125 // 寫出訊息
126 channel.write(buffer);
127 } catch (IOException e) {
128 log.info("客戶端寫出訊息失敗!");
129 e.printStackTrace();
130 }
131 return true;
132 }
133
134 public void readMsgFromServer() {
135 ByteBuffer byteBuffer;
136 try {
137 // 首先讀取請求ID
138 byteBuffer = ByteBuffer.allocate(8);
139 int readIdCount = channel.read(byteBuffer);
140 if (readIdCount < 0) {
141 return;
142 }
143 byteBuffer.flip();
144 Long requsetId = byteBuffer.getLong();
145
146 // 讀取返回值長度
147 byteBuffer = ByteBuffer.allocate(4);
148 int readHeadCount = channel.read(byteBuffer);
149 if (readHeadCount < 0) {
150 return;
151 }
152 byteBuffer.flip();
153 int length = byteBuffer.getInt();
154
155 // 讀取訊息體
156 byteBuffer = ByteBuffer.allocate(length);
157 int readBodyCount = channel.read(byteBuffer);
158 if (readBodyCount < 0) {
159 return;
160 }
161 byte[] bytes = byteBuffer.array();
162
163 // 將返回值放入指定容器
164 RpcContainer.addResponse(requsetId, bytes);
165 } catch (IOException e) {
166 log.info("讀取資料異常");
167 e.printStackTrace();
168 }
169 }
170}
RPC容器:
用來儲存傳送RPC請求時的請求物件,以及儲存返回值。
1package com.github.logsys.rpc.nio;
2
3import java.util.concurrent.ConcurrentHashMap;
4import java.util.concurrent.atomic.AtomicLong;
5
6/**
7 * Created by liuchunlong on 2018/10/17.
8 */
9public class RpcContainer {
10
11 // 返回值容器
12 private static ConcurrentHashMap<Long, byte[]> responseContainer = new ConcurrentHashMap<>();
13 // 請求物件容器
14 private static ConcurrentHashMap<Long, RpcResponseFuture> requestFuture = new ConcurrentHashMap<>();
15 // 請求ID
16 private static AtomicLong requsetId = new AtomicLong(0);
17
18 /**
19 * 獲取下一請求ID
20 *
21 * @return
22 */
23 public static Long getRequestId() {
24 return requsetId.getAndIncrement();
25 }
26
27 public static void addResponse(Long requestId, byte[] responseBytes) {
28 responseContainer.put(requestId, responseBytes);
29 RpcResponseFuture responseFuture = requestFuture.get(requestId);
30 responseFuture.rpcIsDone();
31 }
32
33 /**
34 * 獲取響應結果
35 *
36 * @param requestId 請求ID
37 * @return
38 */
39 public static byte[] getResponse(Long requestId) {
40 return responseContainer.get(requestId);
41 }
42
43 public static void addRequstFuture(RpcResponseFuture rpcResponseFuture) {
44 requestFuture.put(rpcResponseFuture.getRequstId(), rpcResponseFuture);
45 }
46
47 public static RpcResponseFuture getRpcRequstFutue(Long requestId) {
48 return requestFuture.get(requestId);
49 }
50
51 /**
52 * 移除指定請求
53 *
54 * @param requestId 請求ID
55 */
56 public static void removeResponseAndFuture(Long requestId) {
57 responseContainer.remove(requestId);
58 requestFuture.remove(requestId);
59 }
60}
請求資料獲取類,用來掛起和喚醒掛起的執行緒:
1package com.github.logsys.rpc.nio;
2
3import lombok.extern.slf4j.Slf4j;
4
5import java.util.concurrent.locks.Condition;
6import java.util.concurrent.locks.Lock;
7import java.util.concurrent.locks.ReentrantLock;
8
9/**
10 * Created by liuchunlong on 2018/10/17.
11 */
[email protected]
13public class RpcResponseFuture {
14
15 private final Lock lock = new ReentrantLock();
16 private final Condition condition = lock.newCondition();
17 private Long requsetId;
18
19 /**
20 *
21 * @param requsetId 請求ID
22 */
23 public RpcResponseFuture(Long requsetId) {
24 this.requsetId = requsetId;
25 }
26
27 public byte[] get() {
28 // 獲取響應結果
29 byte[] bytes = RpcContainer.getResponse(requsetId);
30 if (bytes == null || bytes.length < 0) {
31 lock.lock();
32 try {
33 log.info("請求id:" + requsetId + ",請求結果尚未返回,執行緒掛起");
34 condition.await();
35 } catch (InterruptedException e) {
36 e.printStackTrace();
37 } finally {
38 lock.unlock();
39 }
40 }
41 log.info("請求id:" + requsetId + ",請求結果返回,執行緒掛起結束");
42 return RpcContainer.getResponse(requsetId);
43 }
44
45 public void rpcIsDone() {
46 lock.lock();
47 try {
48 condition.signal();
49 } finally {
50 lock.unlock();
51 }
52 }
53
54 public Long getRequstId() {
55 return requsetId;
56 }
57
58 public void setRequstId(Long requsetId) {
59 this.requsetId = requsetId;
60 }
61}
RPC代理工廠類:
1package com.github.logsys.rpc.nio;
2
3import java.lang.reflect.Proxy;
4
5/**
6 * Created by liuchunlong on 2018/10/17.
7 */
8public class RpcProxyFactory {
9 /**
10 * 多執行緒環境代理物件
11 *
12 * @param interfaceClass
13 * @return T
14 */
15 public static <T> T getMultService(Class<T> interfaceClass) {
16 return (T) Proxy.newProxyInstance(interfaceClass.getClassLoader(), new Class[]{interfaceClass},
17 new RpcNIoMultHandler());
18 }
19}
實際代理類:
1package com.github.logsys.rpc.nio;
2
3import lombok.extern.slf4j.Slf4j;
4
5import java.lang.reflect.InvocationHandler;
6import java.lang.reflect.Method;
7
8/**
9 * Created by liuchunlong on 2018/10/17.
10 */
[email protected]
12public class RpcNIoMultHandler implements InvocationHandler {
13 @Override
14 public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
15
16 // 生成請求ID
17 Long responseId = RpcContainer.getRequestId();
18
19 // 封裝請求物件
20 RequestMultObject requestMultObject = new RequestMultObject(method.getDeclaringClass(), method.getName(),
21 method.getParameterTypes(), args);
22 requestMultObject.setRequestId(responseId);
23
24 // 封裝設定rpcResponseFuture,主要用於獲取返回值
25 RpcResponseFuture rpcResponseFuture = new RpcResponseFuture(responseId);
26 RpcContainer.addRequstFuture(rpcResponseFuture);
27
28 // 序列化
29 byte[] requsetBytes = SerializeUtil.serialize(requestMultObject);
30 // 傳送請求資訊
31 RpcNioMultClient rpcNioMultClient = RpcNioMultClient.getInstance();
32 rpcNioMultClient.sendMsg2Server(requsetBytes);
33
34 // 從ResponseContainer獲取返回值
35 byte[] responseBytes = rpcResponseFuture.get();
36 if (requsetBytes != null) {
37 RpcContainer.removeResponseAndFuture(responseId);
38 }
39
40 // 反序列化獲得結果
41 Object result = SerializeUtil.unSerialize(responseBytes);
42 log.info("請求id:" + responseId + " 結果:" + result);
43 return result;
44 }
45}
客戶端RPC呼叫:
1package com.github.logsys.rpc.nio;
2
3import lombok.extern.slf4j.Slf4j;
4
5/**
6 * Created by liuchunlong on 2018/10/17.
7 */
[email protected]
9public class RpcNioConsumer {
10
11 public static void main(String[] args) {
12 multipartRpcNio();
13 }
14
15 /**
16 * 多執行緒IO呼叫示例
17 *
18 * @param
19 * @return void
20 */
21 public static void multipartRpcNio() {
22 HelloService proxy = RpcProxyFactory.getMultService(HelloService.class);
23 for (int i = 0; i < 100; i++) {
24 int j = i;
25 Runnable runnable = new Runnable() {
26 @Override
27 public void run() {
28 String result = proxy.hello("world!");
29 log.info(j + result);
30 }
31 };
32 Thread t = new Thread(runnable);
33 t.start();
34 }
35 }
36}
JDK解讀
SelectionKey#isConnectable
1public final boolean isConnectable()
2
測試此鍵的通道是否已完成其套接字連線操作。
呼叫此方法的形式為 k.isConnectable() ,該呼叫與以下呼叫的作用完全相同:
1k.readyOps() & OP_CONNECT != 0
如果此鍵的通道不支援套接字連線操作,則此方法始終返回 false。
返回:
當且僅當 readyOps() & OP_CONNECT 為非零值時才返回 true
丟擲:
CancelledKeyException - 如果已取消此鍵
SocketChannel#finishConnect()
1public abstract boolean finishConnect()
2 throws IOException
完成套接字通道的連線過程。
通過將套接字通道置於非阻塞模式,然後呼叫其 connect 方法來發起非阻塞連線操作。一旦建立了連線,或者嘗試已失敗,該套接字通道就變為可連線的,並且可呼叫此方法完成連線序列。如果連線操作失敗,則呼叫此方法將導致丟擲適當的 IOException。
如果已連線了此通道,則不阻塞此方法並且立即返回 true。如果此通道處於非阻塞模式,那麼當連線過程尚未完成時,此方法將返回 false。如果此通道處於阻塞模式,則在連線完成或失敗之前將阻塞此方法,並且總是返回 true 或丟擲描述該失敗的、檢查型異常。
可在任意時間呼叫此方法。如果正在呼叫此方法時在此通道上呼叫讀取或寫入操作,則在此呼叫完成前將首先阻塞該操作。如果試圖發起連線但失敗了,也就是說如果呼叫此方法導致丟擲檢查型異常,則關閉此通道。
返回:
當且僅當已連線此通道的套接字時才返回 true
丟擲:
-
NoConnectionPendingException - 如果未連線此通道並且尚未發起連線操作
-
ClosedChannelException - 如果此通道已關閉
-
AsynchronousCloseException - 如果正在進行連線操作時另一個執行緒關閉了此通道
-
ClosedByInterruptException - 如果正在進行連線操作時另一個執行緒中斷了當前執行緒,因此關閉了該通道並將當前執行緒設定為中斷狀態
-
IOException - 如果發生其他 I/O 錯誤