分散式-通訊(NIO&BIO&網路模型&零拷貝)
分散式-通訊(NIO&BIO&網路模型&零拷貝)
前面聊到redis和rabbit,那我們是如何他他們進行通訊的呢?所以想聊聊這個問題。在我們分散式架構中,一個重要的點就是通訊,客戶端和服務端的通訊、微服務之間、中介軟體。而通訊直接影響到使用者的體驗,比如我的伺服器只能支援100個使用者同時和我通訊,而這個時候,有1000個使用者,那剩下的900的使用者,肯定要等待。所以今天會聊到關於通訊的一些東西:BIO、NIO、TCP揮手和握手、零拷貝、以及七層網路模型。
Java中通訊-Socket
java中提供一種通訊模型名為Socket, 我們可以通過Socket去進行一些網路通訊。
服務端
public class SocketServer { public static void main(String[] args) { ServerSocket serverSocket= null; try { serverSocket = new ServerSocket(8080); //這裡是阻塞等待 Socket socket=serverSocket.accept(); //對客戶端進行資訊的接受 BufferedReader bufferedReader=newBufferedReader(new InputStreamReader(socket.getInputStream())); String readLine = bufferedReader.readLine(); System.out.println("接受客戶端訊息:"+readLine); BufferedWriter bufferedWriter=new BufferedWriter(new OutputStreamWriter(socket.getOutputStream())); bufferedWriter.write("我收到了資訊\n"); bufferedWriter.flush(); } catch (IOException e) { e.printStackTrace(); } } }客戶端
public class SocketClient { public static void main(String[] args) { try { Socket socket= new Socket("localhost",8080); //對服務端進行訊息的傳送 BufferedWriter bufferedWriter=new BufferedWriter(new OutputStreamWriter(socket.getOutputStream())); bufferedWriter.write("我是客戶端一,傳送訊息!\n"); bufferedWriter.flush(); BufferedReader bufferedReader=new BufferedReader(new InputStreamReader(socket.getInputStream())); String result = bufferedReader.readLine(); System.out.println("服務端返回訊息:"+result); } catch (IOException e) { e.printStackTrace(); } } }socket的簡單通訊整體流程:
- 服務端:在服務端建立一個監聽(這個時候服務端是一個阻塞狀態)
- 客戶端:建立一個連線,這個時候,服務端的阻塞被喚醒,得到一個socket。然後客戶端通過OutputStream進行傳輸
- 服務端:通過inputStream去獲取資訊,因為tcp是一個雙工的,所以可以通過一個OutputStream去寫回到客戶端
- 客戶端:通過inputStream獲取服務端的返回資料
網路分層(當他們建立連線的時候,底層涉及到這個點)不管是tcp還是udp他們的傳輸都會涉及到七層的網路模型
- 應用層:為我們的應用程式提供服務
- 表示層:對資料進行格式化轉換、加密
- 會話層:建立、管理、以及維護會話
- 傳輸層:建立、管理和維護端到端的連線
- 網路層:IP選址及路由選擇
- 資料鏈路層:提供介質訪問和鏈路管理
- 物理層:底層的物理傳輸
負載和網路分層
- 二層負載是利用的Mac頭(資料鏈路層)、
- 三層負載是Ip(網路層)我們可以在第三層對ip進行修改進行資料包的路由、
- 四層負載(傳輸層)TCP屬於這一層,我們利用出傳輸層中的ip和埠號去進行一個負載均衡的計算(Nginx其實就是基於這一層進行的負載)
- 七層負載(應用層):根據URL進行負載,就行controller一樣
對於TCP/IP有四層和上面的這七層相對應:為什麼叫TCP/IP?因為TCP是傳輸層,在我們的網路層(IP)層之上
- 應用層:應用層、表示層、會話層
- 傳輸層:傳輸層
- 網路層:網路層
- 網路介面層:資料鏈路層、物理層
當客戶端傳送一個tcp請求會經過一下幾個步驟:
- 發起一個請求,會把tcp頭和資料報文放在一起
- 向下走會增加一個ip頭,拼接到上面的資料中
- 接著拼接Mac頭(服務端的網絡卡地址),當服務端進行簽收的時候會去確認是不是和自己Mac地址一樣
- 這些資料就都變成二進位制進行傳輸
服務端經過的步驟
- 接收到二進位制資料
- 向上傳輸對Mac頭進行解析,這一步拿傳遞過來的Mac地址和當前的Mac地址進行匹配,如果匹配繼續傳輸
- 上面一層對Ip頭進行解析,如果ip是自己則向上傳遞,不是則轉發到別的地址
- 最上面一層獲取tcp頭,去匹配服務端的程序,程序中去獲取資料報文進行處理。
問題:我們怎麼知道服務端的Mac地址呢?
那就是ARP協議(網路層),流程為,他將含目標IP地址的ARP請求廣播到網路上的所有主機,想符合的主機則會返回Mac資訊,從而得知Mac地址。IP地址表示伺服器所在的位置,而Mac地址表示的是唯一的身份證明,每個主機都是唯一的(Mac是刻在網絡卡上的)。
為什麼我們在實際的網路過程中考慮的是Tcp而不是Udp?
因為TCP網路傳輸的可靠性:
- 三次握手
- 我們的兩個節點通訊,需要通過三個資料包來確定連線的建立。用伺服器A和B舉個例子
- 伺服器A給B說:我要和你通訊 (第一次握手)這一步是確定伺服器B是否是可使用狀態,如果不可用伺服器A會不斷重試
- 伺服器B相應:一切正常(第二次握手) 這一步是告訴伺服器A我這邊可以進行連線,防止伺服器A不斷的重試。
- 伺服器A說:那我們開始通訊吧(第三次握手)伺服器A收到了伺服器B的返回訊息,他需要告訴伺服器B他收到了訊息,以為不排除在伺服器A在收到B的相應後掛了的可能性,
- 流量控制
- 斷開機制(四次揮手):
- A傳送關閉訊息給B(第一次揮手)這個時候B就知道A沒有訊息要傳送了
- B傳送訊息給A說:我已經收到關閉訊息了,但是等我通知你後,你再進行關閉(第二次揮手)這個時候不直接關閉,因為B可能還有資料沒有處理完成,
- B傳送訊息給A說可以我處理資料完成,可以進行關閉。(第三次揮手)
- A給B說我關閉連線了(第四次揮手)
IO阻塞怎麼辦【BIO】?
上面我們執行的程式碼是一個客戶端對一個服務端,(服務端在等待獲取資料的時候是阻塞狀態)但是在真實的場景中,不可能只有一個使用者連線伺服器,那是不是可以當一個請求過來的時候,開啟執行緒去處理
客戶端
public class ServerSocketDemo { static ExecutorService executorService= Executors.newFixedThreadPool(20); public static void main(String[] args) { ServerSocket serverSocket=null; try { //localhost: 8080 serverSocket=new ServerSocket(8080); while(true) { Socket socket = serverSocket.accept(); //監聽客戶端連線(連線阻塞) System.out.println(socket.getPort()); executorService.execute(new SocketThread(socket)); //非同步 } } catch (IOException e) { e.printStackTrace(); } finally { //TODO if(serverSocket!=null){ try { serverSocket.close(); } catch (IOException e) { e.printStackTrace(); } } } } } public class SocketThread implements Runnable{ private Socket socket; public SocketThread(Socket socket) { this.socket = socket; } @Override public void run() { try { BufferedReader bufferedReader = new BufferedReader(new InputStreamReader(socket.getInputStream()));//輸入流 String s = bufferedReader.readLine(); //被阻塞了 String clientStr = s; //讀取客戶端的一行資料 System.out.println("接收到客戶端的資訊:" + clientStr); //寫回去 BufferedWriter bufferedWriter = new BufferedWriter(new OutputStreamWriter(socket.getOutputStream())); bufferedWriter.write("我收到了資訊\n"); bufferedWriter.flush(); bufferedReader.close(); bufferedWriter.close(); }catch (Exception e){ } } }View Code服務端
public class SocketClientDemo1 { public static void main(String[] args) { try { Socket socket=new Socket("localhost",8080); BufferedWriter bufferedWriter=new BufferedWriter(new OutputStreamWriter(socket.getOutputStream())); bufferedWriter.write("我是客戶端1,傳送了一個訊息\n"); bufferedWriter.flush(); BufferedReader bufferedReader=new BufferedReader(new InputStreamReader(socket.getInputStream()));//輸入流 String serverLine=bufferedReader.readLine(); //讀取服務端返回的資料 System.out.println("服務端返回的資料:"+serverLine); } catch (IOException e) { e.printStackTrace(); } } } public class SocketClientDemo { public static void main(String[] args) { try { Socket socket=new Socket("localhost",8080); BufferedWriter bufferedWriter=new BufferedWriter(new OutputStreamWriter(socket.getOutputStream())); bufferedWriter.write("我是客戶端,傳送了一個訊息\n"); bufferedWriter.flush(); BufferedReader bufferedReader=new BufferedReader(new InputStreamReader(socket.getInputStream()));//輸入流 String serverLine=bufferedReader.readLine(); //讀取服務端返回的資料(被阻塞了) System.out.println("服務端返回的資料:"+serverLine); } catch (IOException e) { e.printStackTrace(); } } }View Code流程如下:
當客戶端傳遞請求到服務端,首先在阻塞在accept這裡,然後開啟執行緒去處理IO請求,這裡可會阻塞,那執行緒的數量如何控制?如果連線數大於執行緒數,勢必會有一些請求丟失,那如何提升連線數?這就出現了非阻塞IO(NIO)
有兩個阻塞的地方:IO阻塞、連線阻塞
非阻塞IO(NIO)
非阻塞指的是:連線非阻塞、IO非阻塞
服務端:
public class NioClient { public static void main(String[] args) { try { SocketChannel socketChannel=SocketChannel.open(); /*socketChannel.configureBlocking(false);*/ socketChannel.connect(new InetSocketAddress("localhost",8080)); //如果連線已經建立 if (socketChannel.isConnectionPending()){ //完成連線 socketChannel.finishConnect(); } //這裡並不意味著連線已經建立 ByteBuffer byteBuffer=ByteBuffer.allocate(1024); byteBuffer.put("Hi,I am client".getBytes()); byteBuffer.flip(); socketChannel.write(byteBuffer); byteBuffer.clear(); //讀取服務端返回的資料 (這裡其實是阻塞的) // 有一個坑,就是如果上面設定了非阻塞,下面這裡在等待服務端返回結果,服務端是不會返回結果的,因為不阻塞的話,這裡的連線就已經關閉了 // 所以想要收到服務端的返回結果就註釋上面的configureBlocking int read = socketChannel.read(byteBuffer); if (read>0){ System.out.println("服務端的資料::"+new String(byteBuffer.array())); } else { System.out.println("服務端沒有資料返回"); } } catch (IOException e) { e.printStackTrace(); } } }View Code客戶端
public class NioClient { public static void main(String[] args) { try { SocketChannel socketChannel=SocketChannel.open(); /*socketChannel.configureBlocking(false);*/ socketChannel.connect(new InetSocketAddress("localhost",8080)); //如果連線已經建立 if (socketChannel.isConnectionPending()){ //完成連線 socketChannel.finishConnect(); } //這裡並不意味著連線已經建立 ByteBuffer byteBuffer=ByteBuffer.allocate(1024); byteBuffer.put("Hi,I am client".getBytes()); byteBuffer.flip(); socketChannel.write(byteBuffer); byteBuffer.clear(); //讀取服務端返回的資料 (這裡其實是阻塞的) // 有一個坑,就是如果上面設定了非阻塞,下面這裡在等待服務端返回結果,服務端是不會返回結果的,因為不阻塞的話,這裡的連線就已經關閉了 // 所以想要收到服務端的返回結果就註釋上面的configureBlocking int read = socketChannel.read(byteBuffer); if (read>0){ System.out.println("服務端的資料::"+new String(byteBuffer.array())); } else { System.out.println("服務端沒有資料返回"); } } catch (IOException e) { e.printStackTrace(); } } }View Code我們可以看出服務端在不斷的詢問連線,但是採用這種輪詢的方式,會不會很消耗效能?所以這裡就引出了多路複用機制。
多路複用【selector】
多路複用指的是一個執行緒管理多個通道,那一個執行緒就是我們的selector,簡單來說就是把channel註冊到selector上,註冊的事件分為,讀事件、寫事件、連線事件、接收事件、一旦有一個事件通知的話,我們的selector就由阻塞變成非阻塞,這個時候他就拿到對應的通道進行處理,當他處理的時候一定是一個可以執行的通道,不像我們上圖展示的那種(出現連線未就緒的情況發生).流程如下:
程式碼流程如下:
【客戶端】:註冊一個連線時間到他的selector中,客戶端的selector發現有一個連線事件,然後就處理這個事件傳送訊息到服務端,並且註冊一個讀的事件
【服務端】:註冊一個接受事件,當客戶端傳送一個訊息過來後,服務端的selector就註冊一個接受事件,在這個事件中他給客戶端傳送一個【收到】的訊息,並且也註冊一個讀的事件。他的selector發現了這個讀的事件後就開始讀取服務端傳遞過來的資料。
【客戶端】:selector發現了讀的事件後,就開始讀取服務端傳遞過來的【收到】的資訊。
服務端:
public class NIOServer { //多路複用 static Selector selector; public static void main(String[] args) { try { //開啟多路複用 selector= Selector.open(); ServerSocketChannel serverSocketChannel=ServerSocketChannel.open(); //這是設定非阻塞 serverSocketChannel.configureBlocking(false); serverSocketChannel.socket().bind(new InetSocketAddress(8080)); //把接收事件註冊到多路複用器上 serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT); while (true){ //只有事件到達的時候他才會被喚醒,否則是阻塞狀態。 selector.select(); //這裡是所有可以使用的channel,下面對這些可以使用的事件進行輪詢操作 Set<SelectionKey> selectionKeys = selector.selectedKeys(); Iterator<SelectionKey> iterator = selectionKeys.iterator(); while (iterator.hasNext()) { SelectionKey next = iterator.next(); // 一旦操作就對把事件進行移除 iterator.remove(); if (next.isAcceptable()){//寫事件 copeWithAccept(next); }else if (next.isReadable()){//讀事件 copeWithRead(next); } } } } catch (IOException e) { e.printStackTrace(); } } //處理寫事件 private static void copeWithAccept(SelectionKey selectionKey){ //可以使用的channel ServerSocketChannel channel = (ServerSocketChannel) selectionKey.channel(); try { //這裡就是接受一個連線 SocketChannel accept = channel.accept(); //開啟非阻塞 accept.configureBlocking(false); //註冊一個事件(因為上面寫了一些東西,現在讀的話也要註冊一個讀的事件) //這個時候上面輪詢的時候就發現有一個讀的事件準備就緒了 accept.register(selector,SelectionKey.OP_READ); accept.write(ByteBuffer.wrap("Hello,I am server".getBytes())); } catch (IOException e) { e.printStackTrace(); } } //處理讀事件 private static void copeWithRead(SelectionKey selectionKey){ //這裡拿到的通道就是上面註冊(copeWithAccept)的要讀的通道 SocketChannel socketChannel= (SocketChannel) selectionKey.channel(); ByteBuffer byteBuffer=ByteBuffer.allocate(1024); try { socketChannel.read(byteBuffer); System.out.println(new String(byteBuffer.array())); } catch (IOException e) { e.printStackTrace(); } } }View Code客戶端:
public class NioClient { //多路複用 static Selector selector; public static void main(String[] args) { try { //開啟多路複用 selector=Selector.open(); SocketChannel socketChannel=SocketChannel.open(); socketChannel.configureBlocking(false); socketChannel.connect(new InetSocketAddress("localhost",8080)); //註冊一個連線事件 socketChannel.register(selector, SelectionKey.OP_CONNECT); while (true){ //還是當有事件發生的時候才觸發 selector.select(); //一樣,輪詢所有註冊的事件 Set<SelectionKey> selectionKeys = selector.selectedKeys(); Iterator<SelectionKey> iterator = selectionKeys.iterator(); while (iterator.hasNext()) { SelectionKey next = iterator.next(); iterator.remove(); if (next.isConnectable()){//連線事件 copeWithAccept(next); }else if (next.isReadable()){//讀事件 copeWithRead(next); } } } } catch (IOException e) { e.printStackTrace(); } } //處理連線事件 private static void copeWithAccept(SelectionKey selectionKey) throws IOException { //可以使用的channel SocketChannel socketChannel = (SocketChannel) selectionKey.channel(); if (socketChannel.isConnectionPending()) { socketChannel.finishConnect(); } socketChannel.configureBlocking(false); socketChannel.write(ByteBuffer.wrap("Hello Serve,I am client".getBytes())); //註冊一個讀取的事件 socketChannel.register(selector,SelectionKey.OP_READ); } //處理讀事件 private static void copeWithRead(SelectionKey selectionKey) throws IOException { //這裡拿到的通道就是上面註冊(copeWithAccept)的要讀的通道 SocketChannel socketChannel= (SocketChannel) selectionKey.channel(); ByteBuffer byteBuffer=ByteBuffer.allocate(1024); socketChannel.read(byteBuffer); System.out.println("client receive:"+new String(byteBuffer.array())); } }View Code
零拷貝
當我們要操作一個檔案傳送給其他伺服器的時候,有三次拷貝
- 把檔案拷貝到核心空間
- 然後從核心空間拷貝到使用者空間
- 然後再從使用者空間把這個資料拷貝到核心空間,通過網絡卡傳送到其他的伺服器上去
零拷貝的意思是,我們不經過使用者空間,直接從磁碟的核心空間進行傳送。後面我們聊Netty的時候會聊到這。
實現方式:
- 把核心空間和使用者空間對映在一起(MMAP)
- 使用現成的API