1. 程式人生 > >NIO單一長連線Dubbo通訊模型實現

NIO單一長連線Dubbo通訊模型實現

前言

前一段時間看了下Dubbo,原想將Dubbo詳細總結下來,從使用簡介、SPI擴充套件機制、Spring的schema擴充套件、啟動過程、動態註冊與發現、分層設計、通訊設計、執行緒模型等方面來總結,但是越看越發現架子太大,涉及的點太廣,反而RPC的思想其實已經印象深刻了,再來總結這麼多的點似乎不太值得,因為不懂的東西才是最有價值的,所以有了本文,將個人認為dubbo中比較有特色的通訊模型總結於此,本文是一個demo,當然不乏一些腦補的東西在裡面,如您偶然閱讀此文發現問題,還請不吝指出問題所在。

作者:峽客 連結:https://www.jianshu.com/p/13bef2795c44 來源:簡書 簡書著作權歸作者所有,任何形式的轉載都請聯絡作者獲得授權並註明出處。

BIO通訊缺陷

為了相對更好的理解dubbo這個通訊模型的優勢,首先需要回顧一下BIO通訊的缺陷。

為此引用一下我之前借鑑dubbo作者樑飛的部落格(RPC框架幾行程式碼就夠了)而實現的一個用BIO實現的RPC示例:RPC原理簡析——三分鐘看完,這裡使用的是BIO的socket和ServerSocket實現通訊,用流來寫入寫出資料。BIO的完整連線示意圖如下:

缺陷一、IO阻塞

可以看出,在BIO中,除了建立連線比較耗時之外,在客戶端將資料傳輸到服務端之前,服務端的IO(輸入流)阻塞,然後在服務端將返回值傳輸回來之前,客戶端的IO(輸入流)阻塞。也就是說在一次RPC呼叫開始到完成之前,這個連線一直被此次呼叫所佔用,但是實際上這次呼叫中,真正需要網路連線的只有中間的資料傳輸過程,在客戶端寫出和服務端讀取並執行遠端方法這兩個時間點,其實網路連線是空閒的。這就是BIO連線中浪費了網路資源的地方。

缺陷二、大量連線

由於BIO的IO阻塞,導致每次RPC呼叫會佔用一個連線。而正因為如此,為了減少頻繁建立連線消耗的時間,引入了連線池的概念,連線池解決了頻繁建立連線的資源消耗,但是沒有解決根本性的阻塞問題。而且在服務消費者(客戶端)數量遠大於服務提供者(服務端)數量的時候,會導致服務提供者建立了大量的連線,而本身由於硬體資源的限制,單機最大連線數是有限的(這個限制以前是1w,也就是以前的C10K問題,據說近幾年已經提升至50萬個連線),所以在服務消費者過多,而服務提供者數量過少的情況下,服務提供者有因為過多的連線而被拖垮的風險(這需要極大的併發數,每秒上百萬次的呼叫)。當然,要解決這個問題,增加機器,從而增加服務提供者數量是可以解決的,但是沒有充分利用單機效能。

建立大量連線的另一個弊端,是作業系統頻繁的執行緒上下文切換,因為連線數過多,執行緒切換頻繁,會消耗大量的資源,而且這些切換可能不是必要的。比如當前建立了大量的連線,可能大部分處於阻塞狀態,根本沒有挨個挨個切換的必要。但是因為作業系統任務排程時並不會忽略阻塞狀態的執行緒,所以造成浪費。

NIO單一長連線實現分析

長連線巨集觀簡介

長連線其實不算NIO這裡的特點,因為BIO也可以實現長連線(每次寫完資料之後手動寫入結束符而不關閉流就可以了),而且連線池一般也是使用的長連線方式。

NIO真正解決的是阻塞問題,因為阻塞問題解決了,所以也就不需要大量連線了。由於篇幅問題,此處不討論NIO的細節(API),從相對巨集觀的角度介紹大體角色和呼叫方式,NIO三種角色示意如下圖:

NIO由三種角色組成,Selector、SocketChannel、Buffer。

SocketChannel相當於BIO中的Socket,分為SocketChannel和ServerSocketChannel兩種,是真正建立連線並傳輸資料的管道。這個管道不同於BIO的Socket的點就是,這個管道可以被多個執行緒共用,執行緒A使用這個管道寫出資料了之後,執行緒B還可以使用這個管道寫出資料,不再被某一次呼叫所獨佔。所以就可以不需要像BIO一樣建立那麼多的連線,一個客戶端的一個連線就夠了(當然,實際應用中因為機器都是多核,實際上建立核數個連線個人感覺是比較好的)。

Buffer是用來與SocketChannel互通資料的物件,本質上是一塊記憶體區域。SocketChannel是不支援直接讀寫資料的,所有的讀寫操作必須通過Buffer來實現。值得一提的是,我們經常說在JVM中有的時候會使用虛擬機器之外的記憶體,說的就是NIO中的Buffer,在分配記憶體的時候可以選擇使用虛擬機器外記憶體,減少資料的複製。

Selector是用來監控SocketChannel的事件的,其實是實現非阻塞的關鍵。NIO是基於事件的,將BIO中的流式傳輸改為了事件機制。BIO中,一個連線擁有一個輸入/輸出流,只要資料傳輸完成,流就可以讀取資料。在NIO中,Selector定義了四種事件,OP_READ、OP_WRITE、OP_CONNECT、OP_ACCEPT。當服務端或者客戶端收到寫入完成的一次資料時,會觸發OP_READ事件,此時可以從連線中讀取資料。同理,當可以往連線中寫入資料的時候,觸發OP_WRITE事件(但是一般情況下這個事件沒有必要,因為連線一般都是可寫的)。客戶端與服務端建立連線的時候,客戶端會收到OP_CONNECT事件,而服務端會觸發OP_ACCEPT事件。通過這一系列事件將資料的傳送與讀寫解耦,實現非同步呼叫。將一個SocketChannel + 一個事件繫結在一個Selector上,Selector本質上是輪詢每一個SocketChannel,如果沒有事件觸發,那麼執行緒阻塞,如果有事件觸發,返回對應的SocketChannel,以便進行後續的處理。

使用NIO設計RPC呼叫分析

前面提到,由於NIO的SocketChannel是非阻塞的,所以不再需要連線池,使用一個連線就夠了。

但是如果真的使用NIO來進行RPC呼叫的話,會有資料和呼叫方對應不上的問題,如下圖:

如上圖所示,如果多個執行緒共用一個連線,那麼每個執行緒呼叫之後返回的順序是不可控的,所以有可能先發出資料的反而後得到返回值,這就使得資料對應不上了。個人覺得因為這一點,NIO及其適合聊天室型別的設計,因為每個聊天方都是一個單獨的SocketChannel連線,而此時並沒有順序問題。

但是對RPC呼叫來說,每次呼叫的返回值必須與呼叫方對應上,為此,Dubbo的設計是給每個請求設計一個請求id,在傳送請求與傳送返回值時都帶上這個id。詳細思路如下圖:

業務執行緒在發出請求之前,需要儲存一個請求物件,同時掛起相應的業務執行緒(掛起不會被任務排程,所以不存線上程切換消耗),這個請求物件包含了此次請求的id,然後在獲取服務端返回的資料的時候,解析出這個id,通過這個id取出請求物件,並喚醒對應的執行緒。

NIO單一長連線實現demo

廢話了這麼多,終於可以上程式碼了,我這裡服務端使用了執行緒池去執行遠端方法,使得真正的服務端執行緒只需要讀取資料就可以了。可能作為demo來講,寫的有些複雜,但是理解的時候以RpcNioConsumer和RpcNioProvider作為入口就比較好梳理了。NIO的主要實現在RpcNioMultClient和RpcNioMultServer中。

工具類

序列化/反序列化工具類

 1package com.github.logsys.rpc.nio;
 2
 3import lombok.extern.slf4j.Slf4j;
 4
 5import java.io.*;
 6
 7/**
 8 * Created by liuchunlong on 2018/10/16.
 9 * <p>
10 * 序列化工具
11 */
[email protected]
13public class SerializeUtil {
14
15    public static byte[] serialize(Object obj) {
16        try {
17            ByteArrayOutputStream bos = new ByteArrayOutputStream();
18            ObjectOutputStream oos = new ObjectOutputStream(bos);
19            oos.writeObject(obj);
20            oos.flush();
21            return bos.toByteArray();
22        } catch (IOException e) {
23            log.info("序列化物件出錯!");
24            e.printStackTrace();
25            return null;
26        }
27    }
28
29    public static Object unSerialize(byte[] bytes) {
30        try {
31            ByteArrayInputStream bis = new ByteArrayInputStream(bytes);
32            ObjectInputStream ois = new ObjectInputStream(bis);
33            return ois.readObject();
34        } catch (IOException | ClassNotFoundException e) {
35            log.info("反序列化出錯!");
36            e.printStackTrace();
37            return null;
38        }
39    }
40}

服務端程式碼

請求時的資料結構:

 1package com.github.logsys.rpc.nio;
 2
 3import lombok.Data;
 4
 5import java.io.Serializable;
 6
 7/**
 8 * Created by liuchunlong on 2018/10/17.
 9 */
[email protected]
11public class RequestMultObject implements Serializable {
12
13    private static final long serialVersionUID = 3132836600205356306L;
14
15    public RequestMultObject(Class<?> calzz, String methodName, Class<?>[] paramTypes, Object[] args) {
16        this.calzz = calzz;
17        this.methodName = methodName;
18        this.paramTypes = paramTypes;
19        this.args = args;
20    }
21
22    /**
23     * 請求ID
24     */
25    private Long requestId;
26
27    /**
28     * 服務提供者介面
29     */
30    private Class<?> calzz;
31
32    /**
33     * 請求的方法名稱
34     */
35    private String methodName;
36
37    /**
38     * 引數型別
39     */
40    private Class<?>[] paramTypes;
41
42    /**
43     * 引數
44     */
45    private Object[] args;
46}

RPC呼叫的介面:

 1package com.github.logsys.rpc.nio;
 2
 3/**
 4 * Created by liuchunlong on 2018/10/16.
 5 */
 6public interface HelloService {
 7
 8    String hello(String name);
 9
10}

服務端的介面實現:

 1package com.github.logsys.rpc.nio;
 2
 3/**
 4 * Created by liuchunlong on 2018/10/16.
 5 */
 6public class HelloServiceImpl implements HelloService {
 7
 8    public String hello(String name) {
 9        return "Hello " + name;
10    }
11
12}

服務端服務釋出時使用的bean容器:

 1package com.github.logsys.rpc.nio;
 2
 3import java.util.concurrent.ConcurrentHashMap;
 4
 5/**
 6 * Created by liuchunlong on 2018/10/17.
 7 */
 8public class BeanContainer {
 9
10    private static ConcurrentHashMap<Class<?>, Object> container = new ConcurrentHashMap<>();
11
12    public static boolean addBean(Class<?> clazz, Object object) {
13        container.put(clazz, object);
14        return true;
15    }
16
17    public static Object getBean(Class<?> clazz) {
18        return container.get(clazz);
19    }
20}

服務端,用於與客戶端建立連線、讀取資料:

  1package com.github.logsys.rpc.nio;
  2
  3import lombok.extern.slf4j.Slf4j;
  4
  5import java.io.IOException;
  6import java.net.InetSocketAddress;
  7import java.nio.ByteBuffer;
  8import java.nio.channels.SelectionKey;
  9import java.nio.channels.Selector;
 10import java.nio.channels.ServerSocketChannel;
 11import java.nio.channels.SocketChannel;
 12import java.util.Iterator;
 13
 14/**
 15 * Created by liuchunlong on 2018/10/16.
 16 */
 [email protected]
 18public class RpcNioMultServer {
 19
 20    // 多路複用器
 21    private Selector selector;
 22
 23    public static void start() throws IOException {
 24        RpcNioMultServer server = new RpcNioMultServer();
 25        server.initServer(8080);
 26        server.listen();
 27    }
 28
 29    /**
 30     * 建立一個ServerSocket通道,並對該通道做一些初始化的工作
 31     *
 32     * @param port 繫結的埠號
 33     * @throws IOException
 34     */
 35    public void initServer(int port) throws IOException {
 36        // 建立一個ServerSocket通道
 37        ServerSocketChannel serverChannel = ServerSocketChannel.open();
 38        // 設定通道為非阻塞
 39        serverChannel.configureBlocking(false);
 40        // 將該通道對應的ServerSocket繫結到port埠
 41        serverChannel.socket().bind(new InetSocketAddress(port));
 42
 43        // 建立一個多路複用器
 44        this.selector = Selector.open();
 45
 46        // 將該通道註冊到多路複用器,併為該通道註冊SelectionKey.OP_ACCEPT事件,註冊該事件後,
 47        // 當該事件到達時,selector.select()會返回,如果該事件沒到達selector.select()會一直阻塞。
 48        serverChannel.register(selector, SelectionKey.OP_ACCEPT);
 49    }
 50
 51    public void listen() {
 52        log.info("服務端啟動成功!");
 53        // 輪詢訪問selector
 54        while (true) {
 55            try {
 56                // 當註冊的事件到達時,方法返回;否則,該方法會一直阻塞
 57                selector.select();
 58                Iterator ite = selector.selectedKeys().iterator();
 59                while (ite.hasNext()) {
 60                    SelectionKey key = (SelectionKey) ite.next();
 61                    // 刪除已選的key,以防重複處理
 62                    ite.remove();
 63                    // 客戶端請求連線事件
 64                    if (key.isAcceptable()) {
 65                        ServerSocketChannel server = (ServerSocketChannel) key.channel();
 66                        // 獲得和客戶端連線的通道
 67                        SocketChannel channel = server.accept();
 68                        // 設定成非阻塞
 69                        channel.configureBlocking(false);
 70
 71                        // 在和客戶端連線成功之後,為了可以接收到客戶端的資訊,需要給通道設定讀的許可權。
 72                        channel.register(this.selector, SelectionKey.OP_READ);
 73
 74                        // 獲得了可讀的事件
 75                    } else if (key.isReadable()) {
 76                        SocketChannel channel = (SocketChannel) key.channel();
 77                        byte[] bytes = readMsgFromClient(channel);
 78                        if (bytes != null && bytes.length > 0) {
 79                            // 讀取之後將任務放入執行緒池非同步返回
 80                            RpcNioMultServerTask task = new RpcNioMultServerTask(bytes, channel);
 81                            ThreadPoolUtil.addTask(task);
 82                        }
 83                    }
 84                }
 85            } catch (IOException e) {
 86                e.printStackTrace();
 87            }
 88
 89        }
 90    }
 91
 92    public byte[] readMsgFromClient(SocketChannel channel) {
 93        ByteBuffer byteBuffer = ByteBuffer.allocate(4);
 94        try {
 95            // 首先讀取訊息頭(自己設計的協議頭,此處是訊息體的長度)
 96            int headCount = channel.read(byteBuffer);
 97            if (headCount < 0) {
 98                return null;
 99            }
100            byteBuffer.flip();
101            int length = byteBuffer.getInt();
102            // 讀取訊息體
103            byteBuffer = ByteBuffer.allocate(length);
104            int bodyCount = channel.read(byteBuffer);
105            if (bodyCount < 0) {
106                return null;
107            }
108            return byteBuffer.array();
109        } catch (IOException e) {
110            log.info("讀取資料異常");
111            e.printStackTrace();
112            return null;
113        }
114    }
115}

執行緒池工具類:

 1package com.github.logsys.rpc.nio;
 2
 3import java.util.concurrent.LinkedBlockingDeque;
 4import java.util.concurrent.ThreadPoolExecutor;
 5import java.util.concurrent.TimeUnit;
 6
 7/**
 8 * Created by liuchunlong on 2018/10/16.
 9 */
10public class ThreadPoolUtil {
11
12    private static volatile ThreadPoolExecutor executor;
13
14    public static void init() {
15        if (executor == null) {
16            synchronized (ThreadPoolUtil.class) {
17                executor = new ThreadPoolExecutor(10, 20, 200, TimeUnit.MILLISECONDS, new LinkedBlockingDeque<>());
18            }
19        }
20    }
21
22    public static void addTask(RpcNioMultServerTask task) {
23        if (executor == null) {
24            init();
25        }
26        executor.execute(task);
27    }
28}

用於獲取資料執行遠端方法的執行緒池任務類:

 1package com.github.logsys.rpc.nio;
 2
 3import java.io.IOException;
 4import java.lang.reflect.InvocationTargetException;
 5import java.lang.reflect.Method;
 6import java.nio.ByteBuffer;
 7import java.nio.channels.SocketChannel;
 8
 9/**
10 * Created by liuchunlong on 2018/10/16.
11 */
12public class RpcNioMultServerTask implements Runnable {
13
14    private byte[] bytes;
15
16    private SocketChannel channel;
17
18    public RpcNioMultServerTask(byte[] bytes, SocketChannel channel) {
19        this.bytes = bytes;
20        this.channel = channel;
21    }
22
23    @Override
24    public void run() {
25        if (bytes != null && bytes.length > 0 && channel != null) {
26            // 反序列化
27            RequestMultObject requestMultObject = (RequestMultObject) SerializeUtil.unSerialize(bytes);
28            // 呼叫服務並序列化結果然後返回
29            requestHandle(requestMultObject, channel);
30        }
31    }
32
33    public void requestHandle(RequestMultObject requsetObject, SocketChannel channel) {
34
35        Long requestId = requsetObject.getRequestId(); // 請求ID
36        Object obj = BeanContainer.getBean(requsetObject.getCalzz()); // 根據請求型別,獲取服務端的實現
37        String methodName = requsetObject.getMethodName(); // 請求的方法名稱
38        Class<?>[] parameterTypes = requsetObject.getParamTypes(); // 請求的引數型別
39        Object[] arguments = requsetObject.getArgs(); // 請求的引數
40        try {
41            Method method = obj.getClass().getMethod(methodName, parameterTypes);
42            Object result = method.invoke(obj, arguments);
43            byte[] bytes = SerializeUtil.serialize(result);
44            ByteBuffer buffer = ByteBuffer.allocate(bytes.length + 12);
45            // 為了便於客戶端獲得請求ID,直接將id寫在頭部(這樣客戶端直接解析即可獲得,不需要將所有訊息反序列化才能得到)
46            // 然後寫入訊息題的長度,最後寫入返回內容
47            buffer.putLong(requestId);
48            buffer.putInt(bytes.length);
49            buffer.put(bytes);
50            buffer.flip();
51            channel.write(buffer);
52        } catch (NoSuchMethodException | IllegalAccessException | InvocationTargetException | IOException e) {
53            e.printStackTrace();
54        }
55    }
56
57    public SocketChannel getChannel() {
58        return channel;
59    }
60
61    public void setChannel(SocketChannel channel) {
62        this.channel = channel;
63    }
64}

服務端服務釋出:

 1package com.github.logsys.rpc.nio;
 2
 3import java.io.IOException;
 4
 5/**
 6 * Created by liuchunlong on 2018/10/17.
 7 */
 8public class RpcNioProvider {
 9
10    public static void main(String[] args) throws IOException {
11
12        // 將服務實現放進bean容器
13        HelloService helloService = new HelloServiceImpl();
14        BeanContainer.addBean(HelloService.class, helloService);
15        // 啟動NIO服務端
16        startMultRpcNioServer();
17    }
18
19    public static void startMultRpcNioServer() {
20        Runnable r = () -> {
21            try {
22                RpcNioMultServer.start();
23            } catch (IOException e) {
24                e.printStackTrace();
25            }
26        };
27        Thread t = new Thread(r);
28        t.start();
29    }
30}

客戶端程式碼

客戶端:

  1package com.github.logsys.rpc.nio;
  2
  3import lombok.extern.slf4j.Slf4j;
  4
  5import java.io.IOException;
  6import java.net.InetSocketAddress;
  7import java.nio.ByteBuffer;
  8import java.nio.channels.SelectionKey;
  9import java.nio.channels.Selector;
 10import java.nio.channels.SocketChannel;
 11import java.util.Iterator;
 12
 13/**
 14 * Created by liuchunlong on 2018/10/17.
 15 */
 [email protected]
 17public class RpcNioMultClient {
 18
 19    private static volatile RpcNioMultClient rpcNioClient;
 20
 21    private Selector selector; // 多路複用器
 22    private SocketChannel channel; // Socket通道
 23
 24    private String host = "localhost"; // 伺服器IP
 25    private int port = 8080; // 伺服器埠
 26
 27    private RpcNioMultClient() {
 28        // 初始化client
 29        initClient();
 30        Runnable runnable = new Runnable() {
 31            @Override
 32            public void run() {
 33                listen();
 34            }
 35        };
 36        Thread t = new Thread(runnable);
 37        t.start();
 38    }
 39
 40    public static RpcNioMultClient getInstance() {
 41        if (rpcNioClient == null) {
 42            synchronized (RpcNioMultClient.class) {
 43                if (rpcNioClient == null) {
 44                    rpcNioClient = new RpcNioMultClient();
 45                }
 46            }
 47        }
 48        return rpcNioClient;
 49    }
 50
 51    public void initClient() {
 52        try {
 53            // 建立Socket通道
 54            channel = SocketChannel.open();
 55            // 設定為非同步非阻塞模式
 56            channel.configureBlocking(false);
 57
 58            // 建立多路複用器,用於監聽通道事件
 59            selector = Selector.open();
 60
 61            // 建立連線
 62            channel.connect(new InetSocketAddress(host, port));
 63            // 判斷此通道上是否正在進行連線操作。
 64            // 當且僅當已在此通道上發起連線操作,但是尚未通過呼叫 finishConnect 方法完成連線時才返回 true
 65            if (channel.isConnectionPending()) {
 66                while (!channel.finishConnect()) {
 67                    //wait, or do something else...
 68                }
 69            }
 70
 71            // 如果直接連線成功,則註冊到多路複用器,傳送請求訊息,並讀取應答
 72//            if (channel.connect(new InetSocketAddress(host, port))) {
 73//                channel.register(selector, SelectionKey.OP_READ);
 74//            } else { // 直接連線失敗,此通道處於非阻塞模式,並且連線操作正在進行中
 75//                channel.register(selector, SelectionKey.OP_CONNECT);
 76//            }
 77
 78            log.info("客戶端初始化完成,建立連線完成");
 79        } catch (IOException e) {
 80            e.printStackTrace();
 81        }
 82    }
 83
 84    public void listen() {
 85        try {
 86            while (true) {
 87                selector.select();
 88                Iterator ite = selector.selectedKeys().iterator();
 89                while (ite.hasNext()) {
 90                    SelectionKey key = (SelectionKey) ite.next();
 91                    // 刪除已選的key,以防重複處理
 92                    ite.remove();
 93
 94                    if (key.isValid()) {
 95
 96//                        SocketChannel socketChannel = (SocketChannel) key.channel();
 97//                        if (key.isConnectable()) { // 連線操作完成
 98//                            // 連線操作完成,即伺服器返回了ACK應答資訊。
 99//                            // 這時,我們需要對連線結果進行判斷,呼叫socketChannel.finishConnect(),
100//                            // 如果返回值true,說明連線成功;如果返回值false,說明正在進行連線;或者丟擲IOException,說明連線失敗
101//                            if (socketChannel.finishConnect()) {
102//                                socketChannel.register(selector, SelectionKey.OP_READ);
103//                            }
104//                        }
105
106                        if (key.isReadable()) {
107                            // 讀取資訊
108                            readMsgFromServer();
109                        }
110                    }
111                }
112            }
113        } catch (IOException e) {
114            log.info("客戶端建立連線失敗");
115        }
116    }
117
118    public boolean sendMsg2Server(byte[] bytes) {
119        try {
120            ByteBuffer buffer = ByteBuffer.allocate(4 + bytes.length);
121            // 放入訊息長度,然後放入訊息體
122            buffer.putInt(bytes.length);
123            buffer.put(bytes);
124            buffer.flip();
125            // 寫出訊息
126            channel.write(buffer);
127        } catch (IOException e) {
128            log.info("客戶端寫出訊息失敗!");
129            e.printStackTrace();
130        }
131        return true;
132    }
133
134    public void readMsgFromServer() {
135        ByteBuffer byteBuffer;
136        try {
137            // 首先讀取請求ID
138            byteBuffer = ByteBuffer.allocate(8);
139            int readIdCount = channel.read(byteBuffer);
140            if (readIdCount < 0) {
141                return;
142            }
143            byteBuffer.flip();
144            Long requsetId = byteBuffer.getLong();
145
146            // 讀取返回值長度
147            byteBuffer = ByteBuffer.allocate(4);
148            int readHeadCount = channel.read(byteBuffer);
149            if (readHeadCount < 0) {
150                return;
151            }
152            byteBuffer.flip();
153            int length = byteBuffer.getInt();
154
155            // 讀取訊息體
156            byteBuffer = ByteBuffer.allocate(length);
157            int readBodyCount = channel.read(byteBuffer);
158            if (readBodyCount < 0) {
159                return;
160            }
161            byte[] bytes = byteBuffer.array();
162
163            // 將返回值放入指定容器
164            RpcContainer.addResponse(requsetId, bytes);
165        } catch (IOException e) {
166            log.info("讀取資料異常");
167            e.printStackTrace();
168        }
169    }
170}

RPC容器:

用來儲存傳送RPC請求時的請求物件,以及儲存返回值。

 1package com.github.logsys.rpc.nio;
 2
 3import java.util.concurrent.ConcurrentHashMap;
 4import java.util.concurrent.atomic.AtomicLong;
 5
 6/**
 7 * Created by liuchunlong on 2018/10/17.
 8 */
 9public class RpcContainer {
10
11    // 返回值容器
12    private static ConcurrentHashMap<Long, byte[]> responseContainer = new ConcurrentHashMap<>();
13    // 請求物件容器
14    private static ConcurrentHashMap<Long, RpcResponseFuture> requestFuture = new ConcurrentHashMap<>();
15    // 請求ID
16    private static AtomicLong requsetId = new AtomicLong(0);
17
18    /**
19     * 獲取下一請求ID
20     *
21     * @return
22     */
23    public static Long getRequestId() {
24        return requsetId.getAndIncrement();
25    }
26
27    public static void addResponse(Long requestId, byte[] responseBytes) {
28        responseContainer.put(requestId, responseBytes);
29        RpcResponseFuture responseFuture = requestFuture.get(requestId);
30        responseFuture.rpcIsDone();
31    }
32
33    /**
34     * 獲取響應結果
35     *
36     * @param requestId 請求ID
37     * @return
38     */
39    public static byte[] getResponse(Long requestId) {
40        return responseContainer.get(requestId);
41    }
42
43    public static void addRequstFuture(RpcResponseFuture rpcResponseFuture) {
44        requestFuture.put(rpcResponseFuture.getRequstId(), rpcResponseFuture);
45    }
46
47    public static RpcResponseFuture getRpcRequstFutue(Long requestId) {
48        return requestFuture.get(requestId);
49    }
50
51    /**
52     * 移除指定請求
53     *
54     * @param requestId 請求ID
55     */
56    public static void removeResponseAndFuture(Long requestId) {
57        responseContainer.remove(requestId);
58        requestFuture.remove(requestId);
59    }
60}

請求資料獲取類,用來掛起和喚醒掛起的執行緒:

 1package com.github.logsys.rpc.nio;
 2
 3import lombok.extern.slf4j.Slf4j;
 4
 5import java.util.concurrent.locks.Condition;
 6import java.util.concurrent.locks.Lock;
 7import java.util.concurrent.locks.ReentrantLock;
 8
 9/**
10 * Created by liuchunlong on 2018/10/17.
11 */
[email protected]
13public class RpcResponseFuture {
14
15    private final Lock lock = new ReentrantLock();
16    private final Condition condition = lock.newCondition();
17    private Long requsetId;
18
19    /**
20     *
21     * @param requsetId 請求ID
22     */
23    public RpcResponseFuture(Long requsetId) {
24        this.requsetId = requsetId;
25    }
26
27    public byte[] get() {
28        // 獲取響應結果
29        byte[] bytes = RpcContainer.getResponse(requsetId);
30        if (bytes == null || bytes.length < 0) {
31            lock.lock();
32            try {
33                log.info("請求id:" + requsetId + ",請求結果尚未返回,執行緒掛起");
34                condition.await();
35            } catch (InterruptedException e) {
36                e.printStackTrace();
37            } finally {
38                lock.unlock();
39            }
40        }
41        log.info("請求id:" + requsetId + ",請求結果返回,執行緒掛起結束");
42        return RpcContainer.getResponse(requsetId);
43    }
44
45    public void rpcIsDone() {
46        lock.lock();
47        try {
48            condition.signal();
49        } finally {
50            lock.unlock();
51        }
52    }
53
54    public Long getRequstId() {
55        return requsetId;
56    }
57
58    public void setRequstId(Long requsetId) {
59        this.requsetId = requsetId;
60    }
61}

RPC代理工廠類:

 1package com.github.logsys.rpc.nio;
 2
 3import java.lang.reflect.Proxy;
 4
 5/**
 6 * Created by liuchunlong on 2018/10/17.
 7 */
 8public class RpcProxyFactory {
 9    /**
10     * 多執行緒環境代理物件
11     *
12     * @param interfaceClass
13     * @return T
14     */
15    public static <T> T getMultService(Class<T> interfaceClass) {
16        return (T) Proxy.newProxyInstance(interfaceClass.getClassLoader(), new Class[]{interfaceClass},
17                new RpcNIoMultHandler());
18    }
19}

實際代理類:

 1package com.github.logsys.rpc.nio;
 2
 3import lombok.extern.slf4j.Slf4j;
 4
 5import java.lang.reflect.InvocationHandler;
 6import java.lang.reflect.Method;
 7
 8/**
 9 * Created by liuchunlong on 2018/10/17.
10 */
[email protected]
12public class RpcNIoMultHandler implements InvocationHandler {
13    @Override
14    public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
15
16        // 生成請求ID
17        Long responseId = RpcContainer.getRequestId();
18
19        // 封裝請求物件
20        RequestMultObject requestMultObject = new RequestMultObject(method.getDeclaringClass(), method.getName(),
21                method.getParameterTypes(), args);
22        requestMultObject.setRequestId(responseId);
23
24        // 封裝設定rpcResponseFuture,主要用於獲取返回值
25        RpcResponseFuture rpcResponseFuture = new RpcResponseFuture(responseId);
26        RpcContainer.addRequstFuture(rpcResponseFuture);
27
28        // 序列化
29        byte[] requsetBytes = SerializeUtil.serialize(requestMultObject);
30        // 傳送請求資訊
31        RpcNioMultClient rpcNioMultClient = RpcNioMultClient.getInstance();
32        rpcNioMultClient.sendMsg2Server(requsetBytes);
33
34        // 從ResponseContainer獲取返回值
35        byte[] responseBytes = rpcResponseFuture.get();
36        if (requsetBytes != null) {
37            RpcContainer.removeResponseAndFuture(responseId);
38        }
39
40        // 反序列化獲得結果
41        Object result = SerializeUtil.unSerialize(responseBytes);
42        log.info("請求id:" + responseId + " 結果:" + result);
43        return result;
44    }
45}

客戶端RPC呼叫:

 1package com.github.logsys.rpc.nio;
 2
 3import lombok.extern.slf4j.Slf4j;
 4
 5/**
 6 * Created by liuchunlong on 2018/10/17.
 7 */
 [email protected]
 9public class RpcNioConsumer {
10
11    public static void main(String[] args) {
12        multipartRpcNio();
13    }
14
15    /**
16     * 多執行緒IO呼叫示例
17     *
18     * @param
19     * @return void
20     */
21    public static void multipartRpcNio() {
22        HelloService proxy = RpcProxyFactory.getMultService(HelloService.class);
23        for (int i = 0; i < 100; i++) {
24            int j = i;
25            Runnable runnable = new Runnable() {
26                @Override
27                public void run() {
28                    String result = proxy.hello("world!");
29                    log.info(j + result);
30                }
31            };
32            Thread t = new Thread(runnable);
33            t.start();
34        }
35    }
36}

JDK解讀

SelectionKey#isConnectable

1public final boolean isConnectable()
2

測試此鍵的通道是否已完成其套接字連線操作。

呼叫此方法的形式為 k.isConnectable() ,該呼叫與以下呼叫的作用完全相同:

1k.readyOps() & OP_CONNECT != 0

如果此鍵的通道不支援套接字連線操作,則此方法始終返回 false。

返回:

當且僅當 readyOps()  &  OP_CONNECT 為非零值時才返回 true

丟擲:

CancelledKeyException - 如果已取消此鍵

SocketChannel#finishConnect()

1public abstract boolean finishConnect()
2                               throws IOException

完成套接字通道的連線過程。

通過將套接字通道置於非阻塞模式,然後呼叫其 connect 方法來發起非阻塞連線操作。一旦建立了連線,或者嘗試已失敗,該套接字通道就變為可連線的,並且可呼叫此方法完成連線序列。如果連線操作失敗,則呼叫此方法將導致丟擲適當的 IOException。

如果已連線了此通道,則不阻塞此方法並且立即返回 true。如果此通道處於非阻塞模式,那麼當連線過程尚未完成時,此方法將返回 false。如果此通道處於阻塞模式,則在連線完成或失敗之前將阻塞此方法,並且總是返回 true 或丟擲描述該失敗的、檢查型異常。

可在任意時間呼叫此方法。如果正在呼叫此方法時在此通道上呼叫讀取或寫入操作,則在此呼叫完成前將首先阻塞該操作。如果試圖發起連線但失敗了,也就是說如果呼叫此方法導致丟擲檢查型異常,則關閉此通道。

返回:

當且僅當已連線此通道的套接字時才返回 true

丟擲:

  • NoConnectionPendingException - 如果未連線此通道並且尚未發起連線操作

  • ClosedChannelException - 如果此通道已關閉

  • AsynchronousCloseException - 如果正在進行連線操作時另一個執行緒關閉了此通道

  • ClosedByInterruptException - 如果正在進行連線操作時另一個執行緒中斷了當前執行緒,因此關閉了該通道並將當前執行緒設定為中斷狀態

  • IOException - 如果發生其他 I/O 錯誤