Netty序章之BIO NIO AIO演變
Netty序章之BIO NIO AIO演變
Netty是一個提供異步事件驅動的網絡應用框架,用以快速開發高性能、高可靠的網絡服務器和客戶端程序。Netty簡化了網絡程序的開發,是很多框架和公司都在使用的技術。更是面試的加分項。Netty並非橫空出世,它是在BIO,NIO,AIO演變中的產物,是一種NIO框架。而BIO,NIO,AIO更是筆試中要考,面試中要問的技術。也是一個很好的加分項,加分就是加工資,你還在等什麽?本章帶你細細品味三者的不同!
流程圖:
技術:BIO,NIO,AIO
說明:github上有更全的源碼。
源碼:https://github.com/ITDragonBlog/daydayup/tree/master/Netty/socket-io
BIO
BIO 全稱Block-IO 是一種阻塞同步的通信模式。我們常說的Stock IO 一般指的是BIO。是一個比較傳統的通信方式,模式簡單,使用方便。但並發處理能力低,通信耗時,依賴網速。
BIO 設計原理:
服務器通過一個Acceptor線程負責監聽客戶端請求和為每個客戶端創建一個新的線程進行鏈路處理。典型的一請求一應答模式。若客戶端數量增多,頻繁地創建和銷毀線程會給服務器打開很大的壓力。後改良為用線程池的方式代替新增線程,被稱為偽異步IO。
服務器提供IP地址和監聽的端口,客戶端通過TCP的三次握手與服務器連接,連接成功後,雙放才能通過套接字(Stock)通信。
小結:BIO模型中通過Socket和ServerSocket完成套接字通道的實現。阻塞,同步,建立連接耗時
BIO服務器代碼,負責啟動服務,阻塞服務,監聽客戶端請求,新建線程處理任務。
import java.io.IOException; import java.net.ServerSocket; import java.net.Socket; import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; /** * IO 也稱為 BIO,Block IO 阻塞同步的通訊方式 * 比較傳統的技術,實際開發中基本上用Netty或者是AIO。熟悉BIO,NIO,體會其中變化的過程。作為一個web開發人員,stock通訊面試經常問題。 * BIO最大的問題是:阻塞,同步。 * BIO通訊方式很依賴於網絡,若網速不好,阻塞時間會很長。每次請求都由程序執行並返回,這是同步的缺陷。 * BIO工作流程: * 第一步:server端服務器啟動 * 第二步:server端服務器阻塞監聽client請求 * 第三步:server端服務器接收請求,創建線程實現任務*/ public class ITDragonBIOServer { private static final Integer PORT = 8888; // 服務器對外的端口號 public static void main(String[] args) { ServerSocket server = null; Socket socket = null; ThreadPoolExecutor executor = null; try { server = new ServerSocket(PORT); // ServerSocket 啟動監聽端口 System.out.println("BIO Server 服務器啟動........."); /*--------------傳統的新增線程處理----------------*/ /*while (true) { // 服務器監聽:阻塞,等待Client請求 socket = server.accept(); System.out.println("server 服務器確認請求 : " + socket); // 服務器連接確認:確認Client請求後,創建線程執行任務 。很明顯的問題,若每接收一次請求就要創建一個線程,顯然是不合理的。 new Thread(new ITDragonBIOServerHandler(socket)).start(); } */ /*--------------通過線程池處理緩解高並發給程序帶來的壓力(偽異步IO編程)----------------*/ executor = new ThreadPoolExecutor(10, 100, 1000, TimeUnit.SECONDS, new ArrayBlockingQueue<Runnable>(50)); while (true) { socket = server.accept(); // 服務器監聽:阻塞,等待Client請求 ITDragonBIOServerHandler serverHandler = new ITDragonBIOServerHandler(socket); executor.execute(serverHandler); } } catch (IOException e) { e.printStackTrace(); } finally { try { if (null != socket) { socket.close(); socket = null; } if (null != server) { server.close(); server = null; System.out.println("BIO Server 服務器關閉了!!!!"); } executor.shutdown(); } catch (IOException e) { e.printStackTrace(); } } } }
BIO服務端處理任務代碼,負責處理Stock套接字,返回套接字給客戶端,解耦。
import java.io.BufferedReader; import java.io.IOException; import java.io.InputStreamReader; import java.io.PrintWriter; import java.net.Socket; import com.itdragon.util.CalculatorUtil; public class ITDragonBIOServerHandler implements Runnable{ private Socket socket; public ITDragonBIOServerHandler(Socket socket) { this.socket = socket; } @Override public void run() { BufferedReader reader = null; PrintWriter writer = null; try { reader = new BufferedReader(new InputStreamReader(this.socket.getInputStream())); writer = new PrintWriter(this.socket.getOutputStream(), true); String body = null; while (true) { body = reader.readLine(); // 若客戶端用的是 writer.print() 傳值,那readerLine() 是不能獲取值,細節 if (null == body) { break; } System.out.println("server服務端接收參數 : " + body); writer.println(body + " = " + CalculatorUtil.cal(body).toString()); } } catch (IOException e) { e.printStackTrace(); } finally { if (null != writer) { writer.close(); } try { if (null != reader) { reader.close(); } if (null != this.socket) { this.socket.close(); this.socket = null; } } catch (IOException e) { e.printStackTrace(); } } } }
BIO客戶端代碼,負責啟動客戶端,向服務器發送請求,接收服務器返回的Stock套接字。
import java.io.BufferedReader; import java.io.IOException; import java.io.InputStreamReader; import java.io.PrintWriter; import java.net.Socket; import java.util.Random; /** * BIO 客戶端 * Socket : 向服務端發送連接 * PrintWriter : 向服務端傳遞參數 * BufferedReader : 從服務端接收參數 */ public class ITDragonBIOClient { private static Integer PORT = 8888; private static String IP_ADDRESS = "127.0.0.1"; public static void main(String[] args) { for (int i = 0; i < 10; i++) { clientReq(i); } } private static void clientReq(int i) { Socket socket = null; BufferedReader reader = null; PrintWriter writer = null; try { socket = new Socket(IP_ADDRESS, PORT); // Socket 發起連接操作。連接成功後,雙方通過輸入和輸出流進行同步阻塞式通信 reader = new BufferedReader(new InputStreamReader(socket.getInputStream())); // 獲取返回內容 writer = new PrintWriter(socket.getOutputStream(), true); String []operators = {"+","-","*","/"}; Random random = new Random(System.currentTimeMillis()); String expression = random.nextInt(10)+operators[random.nextInt(4)]+(random.nextInt(10)+1); writer.println(expression); // 向服務器端發送數據 System.out.println(i + " 客戶端打印返回數據 : " + reader.readLine()); } catch (Exception e) { e.printStackTrace(); } finally { try { if (null != reader) { reader.close(); } if (null != socket) { socket.close(); socket = null; } } catch (IOException e) { e.printStackTrace(); } } } }
NIO
NIO 全稱New IO,也叫Non-Block IO 是一種非阻塞同步的通信模式。
NIO 設計原理:
NIO 相對於BIO來說一大進步。客戶端和服務器之間通過Channel通信。NIO可以在Channel進行讀寫操作。這些Channel都會被註冊在Selector多路復用器上。Selector通過一個線程不停的輪詢這些Channel。找出已經準備就緒的Channel執行IO操作。
NIO 通過一個線程輪詢,實現千萬個客戶端的請求,這就是非阻塞NIO的特點。
1)緩沖區Buffer:它是NIO與BIO的一個重要區別。BIO是將數據直接寫入或讀取到Stream對象中。而NIO的數據操作都是在緩沖區中進行的。緩沖區實際上是一個數組。Buffer最常見的類型是ByteBuffer,另外還有CharBuffer,ShortBuffer,IntBuffer,LongBuffer,FloatBuffer,DoubleBuffer。
2)通道Channel:和流不同,通道是雙向的。NIO可以通過Channel進行數據的讀,寫和同時讀寫操作。通道分為兩大類:一類是網絡讀寫(SelectableChannel),一類是用於文件操作(FileChannel),我們使用的SocketChannel和ServerSocketChannel都是SelectableChannel的子類。
3)多路復用器Selector:NIO編程的基礎。多路復用器提供選擇已經就緒的任務的能力。就是Selector會不斷地輪詢註冊在其上的通道(Channel),如果某個通道處於就緒狀態,會被Selector輪詢出來,然後通過SelectionKey可以取得就緒的Channel集合,從而進行後續的IO操作。服務器端只要提供一個線程負責Selector的輪詢,就可以接入成千上萬個客戶端,這就是JDK NIO庫的巨大進步。
說明:這裏的代碼只實現了客戶端發送請求,服務端接收數據的功能。其目的是簡化代碼,方便理解。github源碼中有完整代碼。
小結:NIO模型中通過SocketChannel和ServerSocketChannel完成套接字通道的實現。非阻塞/阻塞,同步,避免TCP建立連接使用三次握手帶來的開銷。
NIO服務器代碼,負責開啟多路復用器,打開通道,註冊通道,輪詢通道,處理通道。
import java.io.IOException; import java.net.InetSocketAddress; import java.nio.ByteBuffer; import java.nio.channels.SelectionKey; import java.nio.channels.Selector; import java.nio.channels.ServerSocketChannel; import java.nio.channels.SocketChannel; import java.util.Iterator; /** * NIO 也稱 New IO, Non-Block IO,非阻塞同步通信方式 * 從BIO的阻塞到NIO的非阻塞,這是一大進步。功歸於Buffer,Channel,Selector三個設計實現。 * Buffer : 緩沖區。NIO的數據操作都是在緩沖區中進行。緩沖區實際上是一個數組。而BIO是將數據直接寫入或讀取到Stream對象。 * Channel : 通道。NIO可以通過Channel進行數據的讀,寫和同時讀寫操作。 * Selector : 多路復用器。NIO編程的基礎。多路復用器提供選擇已經就緒狀態任務的能力。 * 客戶端和服務器通過Channel連接,而這些Channel都要註冊在Selector。Selector通過一個線程不停的輪詢這些Channel。找出已經準備就緒的Channel執行IO操作。 * NIO通過一個線程輪詢,實現千萬個客戶端的請求,這就是非阻塞NIO的特點。 */ public class ITDragonNIOServer implements Runnable{ private final int BUFFER_SIZE = 1024; // 緩沖區大小 private final int PORT = 8888; // 監聽的端口 private Selector selector; // 多路復用器,NIO編程的基礎,負責管理通道Channel private ByteBuffer readBuffer = ByteBuffer.allocate(BUFFER_SIZE); // 緩沖區Buffer public ITDragonNIOServer() { startServer(); } private void startServer() { try { // 1.開啟多路復用器 selector = Selector.open(); // 2.打開服務器通道(網絡讀寫通道) ServerSocketChannel channel = ServerSocketChannel.open(); // 3.設置服務器通道為非阻塞模式,true為阻塞,false為非阻塞 channel.configureBlocking(false); // 4.綁定端口 channel.socket().bind(new InetSocketAddress(PORT)); // 5.把通道註冊到多路復用器上,並監聽阻塞事件 /** * SelectionKey.OP_READ : 表示關註讀數據就緒事件 * SelectionKey.OP_WRITE : 表示關註寫數據就緒事件 * SelectionKey.OP_CONNECT: 表示關註socket channel的連接完成事件 * SelectionKey.OP_ACCEPT : 表示關註server-socket channel的accept事件 */ channel.register(selector, SelectionKey.OP_ACCEPT); System.out.println("Server start >>>>>>>>> port :" + PORT); } catch (IOException e) { e.printStackTrace(); } } // 需要一個線程負責Selector的輪詢 @Override public void run() { while (true) { try { /** * a.select() 阻塞到至少有一個通道在你註冊的事件上就緒 * b.select(long timeOut) 阻塞到至少有一個通道在你註冊的事件上就緒或者超時timeOut * c.selectNow() 立即返回。如果沒有就緒的通道則返回0 * select方法的返回值表示就緒通道的個數。 */ // 1.多路復用器監聽阻塞 selector.select(); // 2.多路復用器已經選擇的結果集 Iterator<SelectionKey> selectionKeys = selector.selectedKeys().iterator(); // 3.不停的輪詢 while (selectionKeys.hasNext()) { // 4.獲取一個選中的key SelectionKey key = selectionKeys.next(); // 5.獲取後便將其從容器中移除 selectionKeys.remove(); // 6.只獲取有效的key if (!key.isValid()){ continue; } // 阻塞狀態處理 if (key.isAcceptable()){ accept(key); } // 可讀狀態處理 if (key.isReadable()){ read(key); } } } catch (IOException e) { e.printStackTrace(); } } } // 設置阻塞,等待Client請求。在傳統IO編程中,用的是ServerSocket和Socket。在NIO中采用的ServerSocketChannel和SocketChannel private void accept(SelectionKey selectionKey) { try { // 1.獲取通道服務 ServerSocketChannel serverSocketChannel = (ServerSocketChannel) selectionKey.channel(); // 2.執行阻塞方法 SocketChannel socketChannel = serverSocketChannel.accept(); // 3.設置服務器通道為非阻塞模式,true為阻塞,false為非阻塞 socketChannel.configureBlocking(false); // 4.把通道註冊到多路復用器上,並設置讀取標識 socketChannel.register(selector, SelectionKey.OP_READ); } catch (IOException e) { e.printStackTrace(); } } private void read(SelectionKey selectionKey) { try { // 1.清空緩沖區數據 readBuffer.clear(); // 2.獲取在多路復用器上註冊的通道 SocketChannel socketChannel = (SocketChannel) selectionKey.channel(); // 3.讀取數據,返回 int count = socketChannel.read(readBuffer); // 4.返回內容為-1 表示沒有數據 if (-1 == count) { selectionKey.channel().close(); selectionKey.cancel(); return ; } // 5.有數據則在讀取數據前進行復位操作 readBuffer.flip(); // 6.根據緩沖區大小創建一個相應大小的bytes數組,用來獲取值 byte[] bytes = new byte[readBuffer.remaining()]; // 7.接收緩沖區數據 readBuffer.get(bytes); // 8.打印獲取到的數據 System.out.println("NIO Server : " + new String(bytes)); // 不能用bytes.toString() } catch (IOException e) { e.printStackTrace(); } } public static void main(String[] args) { new Thread(new ITDragonNIOServer()).start(); } }
NIO客戶端代碼,負責連接服務器,聲明通道,連接通道
import java.io.IOException; import java.net.InetSocketAddress; import java.nio.ByteBuffer; import java.nio.channels.SocketChannel; public class ITDragonNIOClient { private final static int PORT = 8888; private final static int BUFFER_SIZE = 1024; private final static String IP_ADDRESS = "127.0.0.1"; public static void main(String[] args) { clientReq(); } private static void clientReq() { // 1.創建連接地址 InetSocketAddress inetSocketAddress = new InetSocketAddress(IP_ADDRESS, PORT); // 2.聲明一個連接通道 SocketChannel socketChannel = null; // 3.創建一個緩沖區 ByteBuffer byteBuffer = ByteBuffer.allocate(BUFFER_SIZE); try { // 4.打開通道 socketChannel = SocketChannel.open(); // 5.連接服務器 socketChannel.connect(inetSocketAddress); while(true){ // 6.定義一個字節數組,然後使用系統錄入功能: byte[] bytes = new byte[BUFFER_SIZE]; // 7.鍵盤輸入數據 System.in.read(bytes); // 8.把數據放到緩沖區中 byteBuffer.put(bytes); // 9.對緩沖區進行復位 byteBuffer.flip(); // 10.寫出數據 socketChannel.write(byteBuffer); // 11.清空緩沖區數據 byteBuffer.clear(); } } catch (IOException e) { e.printStackTrace(); } finally { if (null != socketChannel) { try { socketChannel.close(); } catch (IOException e) { e.printStackTrace(); } } } } }
AIO
AIO 也叫NIO2.0 是一種非阻塞異步的通信模式。在NIO的基礎上引入了新的異步通道的概念,並提供了異步文件通道和異步套接字通道的實現。
AIO 並沒有采用NIO的多路復用器,而是使用異步通道的概念。其read,write方法的返回類型都是Future對象。而Future模型是異步的,其核心思想是:去主函數等待時間。
小結:AIO模型中通過AsynchronousSocketChannel和AsynchronousServerSocketChannel完成套接字通道的實現。非阻塞,異步。
AIO服務端代碼,負責創建服務器通道,綁定端口,等待請求。
import java.net.InetSocketAddress; import java.nio.channels.AsynchronousChannelGroup; import java.nio.channels.AsynchronousServerSocketChannel; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; /** * AIO, 也叫 NIO2.0 是一種異步非阻塞的通信方式 * AIO 引入了異步通道的概念 AsynchronousServerSocketChannel和AsynchronousSocketChannel 其read和write方法返回值類型是Future對象。 */ public class ITDragonAIOServer { private ExecutorService executorService; // 線程池 private AsynchronousChannelGroup threadGroup; // 通道組 public AsynchronousServerSocketChannel asynServerSocketChannel; // 服務器通道 public void start(Integer port){ try { // 1.創建一個緩存池 executorService = Executors.newCachedThreadPool(); // 2.創建通道組 threadGroup = AsynchronousChannelGroup.withCachedThreadPool(executorService, 1); // 3.創建服務器通道 asynServerSocketChannel = AsynchronousServerSocketChannel.open(threadGroup); // 4.進行綁定 asynServerSocketChannel.bind(new InetSocketAddress(port)); System.out.println("server start , port : " + port); // 5.等待客戶端請求 asynServerSocketChannel.accept(this, new ITDragonAIOServerHandler()); // 一直阻塞 不讓服務器停止,真實環境是在tomcat下運行,所以不需要這行代碼 Thread.sleep(Integer.MAX_VALUE); } catch (Exception e) { e.printStackTrace(); } } public static void main(String[] args) { ITDragonAIOServer server = new ITDragonAIOServer(); server.start(8888); } }
AIO服務器任務處理代碼,負責,讀取數據,寫入數據
import java.nio.ByteBuffer; import java.nio.channels.AsynchronousSocketChannel; import java.nio.channels.CompletionHandler; import java.util.concurrent.ExecutionException; import com.itdragon.util.CalculatorUtil; public class ITDragonAIOServerHandler implements CompletionHandler<AsynchronousSocketChannel, ITDragonAIOServer> { private final Integer BUFFER_SIZE = 1024; @Override public void completed(AsynchronousSocketChannel asynSocketChannel, ITDragonAIOServer attachment) { // 保證多個客戶端都可以阻塞 attachment.asynServerSocketChannel.accept(attachment, this); read(asynSocketChannel); } //讀取數據 private void read(final AsynchronousSocketChannel asynSocketChannel) { ByteBuffer byteBuffer = ByteBuffer.allocate(BUFFER_SIZE); asynSocketChannel.read(byteBuffer, byteBuffer, new CompletionHandler<Integer, ByteBuffer>() { @Override public void completed(Integer resultSize, ByteBuffer attachment) { //進行讀取之後,重置標識位 attachment.flip(); //獲取讀取的數據 String resultData = new String(attachment.array()).trim(); System.out.println("Server -> " + "收到客戶端的數據信息為:" + resultData); String response = resultData + " = " + CalculatorUtil.cal(resultData); write(asynSocketChannel, response); } @Override public void failed(Throwable exc, ByteBuffer attachment) { exc.printStackTrace(); } }); } // 寫入數據 private void write(AsynchronousSocketChannel asynSocketChannel, String response) { try { // 把數據寫入到緩沖區中 ByteBuffer buf = ByteBuffer.allocate(BUFFER_SIZE); buf.put(response.getBytes()); buf.flip(); // 在從緩沖區寫入到通道中 asynSocketChannel.write(buf).get(); } catch (InterruptedException e) { e.printStackTrace(); } catch (ExecutionException e) { e.printStackTrace(); } } @Override public void failed(Throwable exc, ITDragonAIOServer attachment) { exc.printStackTrace(); } }
AIO客戶端代碼,負責連接服務器,聲明通道,連接通道
import java.net.InetSocketAddress; import java.nio.ByteBuffer; import java.nio.channels.AsynchronousSocketChannel; import java.util.Random; public class ITDragonAIOClient implements Runnable{ private static Integer PORT = 8888; private static String IP_ADDRESS = "127.0.0.1"; private AsynchronousSocketChannel asynSocketChannel ; public ITDragonAIOClient() throws Exception { asynSocketChannel = AsynchronousSocketChannel.open(); // 打開通道 } public void connect(){ asynSocketChannel.connect(new InetSocketAddress(IP_ADDRESS, PORT)); // 創建連接 和NIO一樣 } public void write(String request){ try { asynSocketChannel.write(ByteBuffer.wrap(request.getBytes())).get(); ByteBuffer byteBuffer = ByteBuffer.allocate(1024); asynSocketChannel.read(byteBuffer).get(); byteBuffer.flip(); byte[] respByte = new byte[byteBuffer.remaining()]; byteBuffer.get(respByte); // 將緩沖區的數據放入到 byte數組中 System.out.println(new String(respByte,"utf-8").trim()); } catch (Exception e) { e.printStackTrace(); } } @Override public void run() { while(true){ } } public static void main(String[] args) throws Exception { for (int i = 0; i < 10; i++) { ITDragonAIOClient myClient = new ITDragonAIOClient(); myClient.connect(); new Thread(myClient, "myClient").start(); String []operators = {"+","-","*","/"}; Random random = new Random(System.currentTimeMillis()); String expression = random.nextInt(10)+operators[random.nextInt(4)]+(random.nextInt(10)+1); myClient.write(expression); } } }
常見面試題
1 IO,NIO,AIO區別
IO 阻塞同步通信模式,客戶端和服務器連接需要三次握手,使用簡單,但吞吐量小
NIO 非阻塞同步通信模式,客戶端與服務器通過Channel連接,采用多路復用器輪詢註冊的Channel。提高吞吐量和可靠性。
AIO 非阻塞異步通信模式,NIO的升級版,采用異步通道實現異步通信,其read和write方法均是異步方法。
2 Stock通信的偽代碼實現流程
服務器綁定端口:server = new ServerSocket(PORT)
服務器阻塞監聽:socket = server.accept()
服務器開啟線程:new Thread(Handle handle)
服務器讀寫數據:BufferedReader PrintWriter
客戶端綁定IP和PORT:new Socket(IP_ADDRESS, PORT)
客戶端傳輸接收數據:BufferedReader PrintWriter
3 TCP協議與UDP協議有什麽區別
TCP : 傳輸控制協議是基於連接的協議,在正式收發數據前,必須和對方建立可靠的連接。速度慢,合適傳輸大量數據。
UDP : 用戶數據報協議是與TCP相對應的協議。面向非連接的協議,不與對方建立連接,而是直接就把數據包發送過去,速度快,適合傳輸少量數據。
4 什麽是同步阻塞BIO,同步非阻塞NIO,異步非阻塞AIO
同步阻塞IO : 用戶進程發起一個IO操作以後,必須等待IO操作的真正完成後,才能繼續運行。
同步非阻塞IO: 用戶進程發起一個IO操作以後,可做其它事情,但用戶進程需要經常詢問IO操作是否完成,這樣造成不必要的CPU資源浪費。
異步非阻塞IO: 用戶進程發起一個IO操作然後,立即返回,等IO操作真正的完成以後,應用程序會得到IO操作完成的通知。類比Future模式。
總結
1 BIO模型中通過Socket和ServerSocket完成套接字通道實現。阻塞,同步,連接耗時。
2 NIO模型中通過SocketChannel和ServerSocketChannel完成套接字通道實現。非阻塞/阻塞,同步,避免TCP建立連接使用三次握手帶來的開銷。
3 AIO模型中通過AsynchronousSocketChannel和AsynchronousServerSocketChannel完成套接字通道實現。非阻塞,異步。
摘自:http://www.cnblogs.com/itdragon/p/8337234.html
Netty序章之BIO NIO AIO演變