1. 程式人生 > 其它 >分散式-通訊(NIO&BIO&網路模型&零拷貝)

分散式-通訊(NIO&BIO&網路模型&零拷貝)

分散式-通訊(NIO&BIO&網路模型&零拷貝)

前面聊到redis和rabbit,那我們是如何他他們進行通訊的呢?所以想聊聊這個問題。在我們分散式架構中,一個重要的點就是通訊,客戶端和服務端的通訊、微服務之間、中介軟體。而通訊直接影響到使用者的體驗,比如我的伺服器只能支援100個使用者同時和我通訊,而這個時候,有1000個使用者,那剩下的900的使用者,肯定要等待。所以今天會聊到關於通訊的一些東西:BIO、NIO、TCP揮手和握手、零拷貝、以及七層網路模型。

Java中通訊-Socket

java中提供一種通訊模型名為Socket, 我們可以通過Socket去進行一些網路通訊。

服務端

public class SocketServer  {
    public static void main(String[] args) {
        ServerSocket serverSocket= null;
        try {
            serverSocket = new ServerSocket(8080);
            //這裡是阻塞等待
            Socket socket=serverSocket.accept();
            //對客戶端進行資訊的接受
            BufferedReader bufferedReader=new
BufferedReader(new InputStreamReader(socket.getInputStream())); String readLine = bufferedReader.readLine(); System.out.println("接受客戶端訊息:"+readLine); BufferedWriter bufferedWriter=new BufferedWriter(new OutputStreamWriter(socket.getOutputStream())); bufferedWriter.write(
"我收到了資訊\n"); bufferedWriter.flush(); } catch (IOException e) { e.printStackTrace(); } } }

客戶端

public class SocketClient {
    public static void main(String[] args) {
        try {
            Socket socket= new Socket("localhost",8080);
            //對服務端進行訊息的傳送
            BufferedWriter bufferedWriter=new BufferedWriter(new OutputStreamWriter(socket.getOutputStream()));
            bufferedWriter.write("我是客戶端一,傳送訊息!\n");
            bufferedWriter.flush();
            BufferedReader bufferedReader=new BufferedReader(new InputStreamReader(socket.getInputStream()));
            String result = bufferedReader.readLine();
            System.out.println("服務端返回訊息:"+result);
        } catch (IOException e) {
            e.printStackTrace();
        }
    }
}

socket的簡單通訊整體流程:

  • 服務端:在服務端建立一個監聽(這個時候服務端是一個阻塞狀態)
  • 客戶端:建立一個連線,這個時候,服務端的阻塞被喚醒,得到一個socket。然後客戶端通過OutputStream進行傳輸
  • 服務端:通過inputStream去獲取資訊,因為tcp是一個雙工的,所以可以通過一個OutputStream去寫回到客戶端
  • 客戶端:通過inputStream獲取服務端的返回資料

網路分層(當他們建立連線的時候,底層涉及到這個點)不管是tcp還是udp他們的傳輸都會涉及到七層的網路模型

  • 應用層:為我們的應用程式提供服務
  • 表示層:對資料進行格式化轉換、加密
  • 會話層:建立、管理、以及維護會話
  • 傳輸層:建立、管理和維護端到端的連線
  • 網路層:IP選址及路由選擇
  • 資料鏈路層:提供介質訪問和鏈路管理
  • 物理層:底層的物理傳輸

負載和網路分層

  • 二層負載是利用的Mac頭(資料鏈路層)、
  • 三層負載是Ip(網路層)我們可以在第三層對ip進行修改進行資料包的路由、
  • 四層負載(傳輸層)TCP屬於這一層,我們利用出傳輸層中的ip和埠號去進行一個負載均衡的計算(Nginx其實就是基於這一層進行的負載)
  • 七層負載(應用層):根據URL進行負載,就行controller一樣

對於TCP/IP有四層和上面的這七層相對應:為什麼叫TCP/IP?因為TCP是傳輸層,在我們的網路層(IP)層之上

  • 應用層:應用層、表示層、會話層
  • 傳輸層:傳輸層
  • 網路層:網路層
  • 網路介面層:資料鏈路層、物理層

當客戶端傳送一個tcp請求會經過一下幾個步驟:

  • 發起一個請求,會把tcp頭和資料報文放在一起
  • 向下走會增加一個ip頭,拼接到上面的資料中
  • 接著拼接Mac頭(服務端的網絡卡地址),當服務端進行簽收的時候會去確認是不是和自己Mac地址一樣
  • 這些資料就都變成二進位制進行傳輸

服務端經過的步驟

  • 接收到二進位制資料
  • 向上傳輸對Mac頭進行解析,這一步拿傳遞過來的Mac地址和當前的Mac地址進行匹配,如果匹配繼續傳輸
  • 上面一層對Ip頭進行解析,如果ip是自己則向上傳遞,不是則轉發到別的地址
  • 最上面一層獲取tcp頭,去匹配服務端的程序,程序中去獲取資料報文進行處理。

問題:我們怎麼知道服務端的Mac地址呢?

那就是ARP協議(網路層),流程為,他將含目標IP地址的ARP請求廣播到網路上的所有主機,想符合的主機則會返回Mac資訊,從而得知Mac地址。IP地址表示伺服器所在的位置,而Mac地址表示的是唯一的身份證明,每個主機都是唯一的(Mac是刻在網絡卡上的)。

為什麼我們在實際的網路過程中考慮的是Tcp而不是Udp?

因為TCP網路傳輸的可靠性:

  • 三次握手
    • 我們的兩個節點通訊,需要通過三個資料包來確定連線的建立。用伺服器A和B舉個例子
      • 伺服器A給B說:我要和你通訊 (第一次握手這一步是確定伺服器B是否是可使用狀態,如果不可用伺服器A會不斷重試
      • 伺服器B相應:一切正常(第二次握手 這一步是告訴伺服器A我這邊可以進行連線,防止伺服器A不斷的重試。
      • 伺服器A說:那我們開始通訊吧(第三次握手)伺服器A收到了伺服器B的返回訊息,他需要告訴伺服器B他收到了訊息,以為不排除在伺服器A在收到B的相應後掛了的可能性,
  • 流量控制
  • 斷開機制(四次揮手)
    • A傳送關閉訊息給B(第一次揮手這個時候B就知道A沒有訊息要傳送了
    • B傳送訊息給A說:我已經收到關閉訊息了,但是等我通知你後,你再進行關閉(第二次揮手這個時候不直接關閉,因為B可能還有資料沒有處理完成
    • B傳送訊息給A說可以我處理資料完成,可以進行關閉。第三次揮手
    • A給B說我關閉連線(第四次揮手)

IO阻塞怎麼辦【BIO】?

上面我們執行的程式碼是一個客戶端對一個服務端,(服務端在等待獲取資料的時候是阻塞狀態)但是在真實的場景中,不可能只有一個使用者連線伺服器,那是不是可以當一個請求過來的時候,開啟執行緒去處理

客戶端

public class ServerSocketDemo {

    static ExecutorService executorService= Executors.newFixedThreadPool(20);

    public static void main(String[] args) {
        ServerSocket serverSocket=null;
        try {
            //localhost: 8080
            serverSocket=new ServerSocket(8080);
            while(true) {
                Socket socket = serverSocket.accept(); //監聽客戶端連線(連線阻塞)
                System.out.println(socket.getPort());
                executorService.execute(new SocketThread(socket)); //非同步
            }
        } catch (IOException e) {
            e.printStackTrace();
        } finally {
            //TODO
            if(serverSocket!=null){
                try {
                    serverSocket.close();
                } catch (IOException e) {
                    e.printStackTrace();
                }
            }
        }
    }
}
public class SocketThread implements Runnable{

    private Socket socket;

    public SocketThread(Socket socket) {
        this.socket = socket;
    }

    @Override
    public void run() {
        try {
            BufferedReader bufferedReader = new BufferedReader(new InputStreamReader(socket.getInputStream()));//輸入流
            String s = bufferedReader.readLine(); //被阻塞了
            String clientStr = s; //讀取客戶端的一行資料
            System.out.println("接收到客戶端的資訊:" + clientStr);
            //寫回去
            BufferedWriter bufferedWriter = new BufferedWriter(new OutputStreamWriter(socket.getOutputStream()));
            bufferedWriter.write("我收到了資訊\n");
            bufferedWriter.flush();

            bufferedReader.close();
            bufferedWriter.close();
        }catch (Exception e){

        }
    }
}
View Code

服務端

public class SocketClientDemo1 {
    public static void main(String[] args) {
        try {
            Socket socket=new Socket("localhost",8080);
            BufferedWriter bufferedWriter=new BufferedWriter(new OutputStreamWriter(socket.getOutputStream()));
            bufferedWriter.write("我是客戶端1,傳送了一個訊息\n");
            bufferedWriter.flush();
            BufferedReader bufferedReader=new BufferedReader(new InputStreamReader(socket.getInputStream()));//輸入流
            String serverLine=bufferedReader.readLine(); //讀取服務端返回的資料
            System.out.println("服務端返回的資料:"+serverLine);

        } catch (IOException e) {
            e.printStackTrace();
        }
    }
}
public class SocketClientDemo {
    public static void main(String[] args) {
        try {
            Socket socket=new Socket("localhost",8080);
            BufferedWriter bufferedWriter=new BufferedWriter(new OutputStreamWriter(socket.getOutputStream()));
            bufferedWriter.write("我是客戶端,傳送了一個訊息\n");
            bufferedWriter.flush();
            BufferedReader bufferedReader=new BufferedReader(new InputStreamReader(socket.getInputStream()));//輸入流
            String serverLine=bufferedReader.readLine(); //讀取服務端返回的資料(被阻塞了)
            System.out.println("服務端返回的資料:"+serverLine);

        } catch (IOException e) {
            e.printStackTrace();
        }
    }
}
View Code

流程如下:

當客戶端傳遞請求到服務端,首先在阻塞在accept這裡,然後開啟執行緒去處理IO請求,這裡可會阻塞,那執行緒的數量如何控制?如果連線數大於執行緒數,勢必會有一些請求丟失,那如何提升連線數?這就出現了非阻塞IO(NIO)

有兩個阻塞的地方IO阻塞、連線阻塞

非阻塞IO(NIO)

非阻塞指的是:連線非阻塞、IO非阻塞

服務端:

public class NioClient {
    public static void main(String[] args)  {
        try {
            SocketChannel socketChannel=SocketChannel.open();
            /*socketChannel.configureBlocking(false);*/
            socketChannel.connect(new InetSocketAddress("localhost",8080));
            //如果連線已經建立
            if (socketChannel.isConnectionPending()){
                //完成連線
                socketChannel.finishConnect();
            }
            //這裡並不意味著連線已經建立
            ByteBuffer byteBuffer=ByteBuffer.allocate(1024);
            byteBuffer.put("Hi,I am client".getBytes());
            byteBuffer.flip();
            socketChannel.write(byteBuffer);
            byteBuffer.clear();
            //讀取服務端返回的資料 (這裡其實是阻塞的)
            // 有一個坑,就是如果上面設定了非阻塞,下面這裡在等待服務端返回結果,服務端是不會返回結果的,因為不阻塞的話,這裡的連線就已經關閉了
            // 所以想要收到服務端的返回結果就註釋上面的configureBlocking
            int read = socketChannel.read(byteBuffer);
            if (read>0){
                System.out.println("服務端的資料::"+new String(byteBuffer.array()));
            }
            else {
                System.out.println("服務端沒有資料返回");
            }
        } catch (IOException e) {
            e.printStackTrace();
        }

    }
}
View Code

客戶端

public class NioClient {
    public static void main(String[] args)  {
        try {
            SocketChannel socketChannel=SocketChannel.open();
            /*socketChannel.configureBlocking(false);*/
            socketChannel.connect(new InetSocketAddress("localhost",8080));
            //如果連線已經建立
            if (socketChannel.isConnectionPending()){
                //完成連線
                socketChannel.finishConnect();
            }
            //這裡並不意味著連線已經建立
            ByteBuffer byteBuffer=ByteBuffer.allocate(1024);
            byteBuffer.put("Hi,I am client".getBytes());
            byteBuffer.flip();
            socketChannel.write(byteBuffer);
            byteBuffer.clear();
            //讀取服務端返回的資料 (這裡其實是阻塞的)
            // 有一個坑,就是如果上面設定了非阻塞,下面這裡在等待服務端返回結果,服務端是不會返回結果的,因為不阻塞的話,這裡的連線就已經關閉了
            // 所以想要收到服務端的返回結果就註釋上面的configureBlocking
            int read = socketChannel.read(byteBuffer);
            if (read>0){
                System.out.println("服務端的資料::"+new String(byteBuffer.array()));
            }
            else {
                System.out.println("服務端沒有資料返回");
            }
        } catch (IOException e) {
            e.printStackTrace();
        }

    }
}
View Code

我們可以看出服務端在不斷的詢問連線,但是採用這種輪詢的方式,會不會很消耗效能?所以這裡就引出了多路複用機制。

多路複用【selector】

多路複用指的是一個執行緒管理多個通道,那一個執行緒就是我們的selector,簡單來說就是把channel註冊到selector上,註冊的事件分為,讀事件、寫事件、連線事件、接收事件、一旦有一個事件通知的話,我們的selector就由阻塞變成非阻塞,這個時候他就拿到對應的通道進行處理,當他處理的時候一定是一個可以執行的通道,不像我們上圖展示的那種(出現連線未就緒的情況發生).流程如下:

程式碼流程如下:

【客戶端】:註冊一個連線時間到他的selector中,客戶端的selector發現有一個連線事件,然後就處理這個事件傳送訊息到服務端,並且註冊一個讀的事件

【服務端】:註冊一個接受事件,當客戶端傳送一個訊息過來後,服務端的selector就註冊一個接受事件,在這個事件中他給客戶端傳送一個【收到】的訊息,並且也註冊一個讀的事件。他的selector發現了這個讀的事件後就開始讀取服務端傳遞過來的資料。

【客戶端】:selector發現了讀的事件後,就開始讀取服務端傳遞過來的【收到】的資訊。

服務端:

public class NIOServer {
    //多路複用
    static Selector selector;
    public static void main(String[] args) {
        try {
            //開啟多路複用
            selector= Selector.open();
            ServerSocketChannel serverSocketChannel=ServerSocketChannel.open();
            //這是設定非阻塞
            serverSocketChannel.configureBlocking(false);
            serverSocketChannel.socket().bind(new InetSocketAddress(8080));
            //把接收事件註冊到多路複用器上
            serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);
            while (true){
                //只有事件到達的時候他才會被喚醒,否則是阻塞狀態。
                selector.select();
                //這裡是所有可以使用的channel,下面對這些可以使用的事件進行輪詢操作
                Set<SelectionKey> selectionKeys = selector.selectedKeys();
                Iterator<SelectionKey> iterator = selectionKeys.iterator();
                while (iterator.hasNext()) {
                    SelectionKey next = iterator.next();
                    // 一旦操作就對把事件進行移除
                    iterator.remove();
                    if (next.isAcceptable()){//寫事件
                        copeWithAccept(next);
                    }else if (next.isReadable()){//讀事件
                        copeWithRead(next);
                    }
                }
            }

        } catch (IOException  e) {
            e.printStackTrace();
        }
    }

    //處理寫事件
    private static   void  copeWithAccept(SelectionKey selectionKey){
        //可以使用的channel
        ServerSocketChannel channel = (ServerSocketChannel) selectionKey.channel();
        try {
            //這裡就是接受一個連線
            SocketChannel accept = channel.accept();
            //開啟非阻塞
            accept.configureBlocking(false);
            //註冊一個事件(因為上面寫了一些東西,現在讀的話也要註冊一個讀的事件)
            //這個時候上面輪詢的時候就發現有一個讀的事件準備就緒了
            accept.register(selector,SelectionKey.OP_READ);
            accept.write(ByteBuffer.wrap("Hello,I am server".getBytes()));
        } catch (IOException e) {
            e.printStackTrace();
        }

    }
    //處理讀事件
    private  static void  copeWithRead(SelectionKey selectionKey){
        //這裡拿到的通道就是上面註冊(copeWithAccept)的要讀的通道
        SocketChannel socketChannel= (SocketChannel) selectionKey.channel();
        ByteBuffer byteBuffer=ByteBuffer.allocate(1024);
        try {
           socketChannel.read(byteBuffer);
            System.out.println(new String(byteBuffer.array()));
        } catch (IOException e) {
            e.printStackTrace();
        }
    }
}
View Code

客戶端:

public class NioClient {
    //多路複用
    static Selector selector;
    public static void main(String[] args)  {
        try {
            //開啟多路複用
            selector=Selector.open();
            SocketChannel socketChannel=SocketChannel.open();
            socketChannel.configureBlocking(false);
            socketChannel.connect(new InetSocketAddress("localhost",8080));
            //註冊一個連線事件
            socketChannel.register(selector, SelectionKey.OP_CONNECT);
            while (true){
                //還是當有事件發生的時候才觸發
                selector.select();
                //一樣,輪詢所有註冊的事件
                Set<SelectionKey> selectionKeys = selector.selectedKeys();
                Iterator<SelectionKey> iterator = selectionKeys.iterator();
                while (iterator.hasNext()) {
                    SelectionKey next = iterator.next();
                    iterator.remove();
                    if (next.isConnectable()){//連線事件
                        copeWithAccept(next);
                    }else if (next.isReadable()){//讀事件
                        copeWithRead(next);
                    }
                }
            }
        } catch (IOException e) {
            e.printStackTrace();
        }

    }
    //處理連線事件
    private static   void  copeWithAccept(SelectionKey selectionKey) throws IOException {
        //可以使用的channel
        SocketChannel socketChannel = (SocketChannel) selectionKey.channel();
        if (socketChannel.isConnectionPending()) {
            socketChannel.finishConnect();
        }
        socketChannel.configureBlocking(false);
        socketChannel.write(ByteBuffer.wrap("Hello Serve,I am  client".getBytes()));
        //註冊一個讀取的事件
        socketChannel.register(selector,SelectionKey.OP_READ);
    }
    //處理讀事件
    private  static void  copeWithRead(SelectionKey selectionKey) throws IOException {
        //這裡拿到的通道就是上面註冊(copeWithAccept)的要讀的通道
        SocketChannel socketChannel= (SocketChannel) selectionKey.channel();
        ByteBuffer byteBuffer=ByteBuffer.allocate(1024);
        socketChannel.read(byteBuffer);
        System.out.println("client receive:"+new String(byteBuffer.array()));
    }
}
View Code

零拷貝

當我們要操作一個檔案傳送給其他伺服器的時候,三次拷貝

  • 把檔案拷貝到核心空間
  • 然後從核心空間拷貝到使用者空間
  • 然後再從使用者空間把這個資料拷貝到核心空間,通過網絡卡傳送到其他的伺服器上去

零拷貝的意思是,我們不經過使用者空間,直接從磁碟的核心空間進行傳送。後面我們聊Netty的時候會聊到這。

實現方式:

  • 把核心空間和使用者空間對映在一起(MMAP)
  • 使用現成的API