Netty實踐與NIO原理
一、阻塞IO與非阻塞IO
Linux網絡IO模型(5種)
(1)阻塞IO模型
所有文件操作都是阻塞的,以套接字接口為例,在進程空間中調用recvfrom,系統調用直到數據包到達且被復制到應用進程緩沖區或發生錯誤時才返回,期間會一直等待(阻塞)。模型如圖:
(2)非阻塞IO模型
recvfrom從應用層到內核時,如果該緩沖區沒數據,直接返回一個EWOULDBLOCK錯誤,反復輪詢檢查這個狀態,看是否有數據到來。如圖:
(3)IO復用模型
Linux提高select/poll,進程通過將一個或多個fd(file descriptor)傳遞給select或poll系統調用,阻塞在select操作上,偵測多個fd是否處於就緒狀態。select/poll順序掃描fd是否就緒,而且支持的fd數量有限。Linux還提供了一個epoll系統調用,使用基於事件驅動的方式代替順序掃描,性能更高。當有fd就緒時,立即回調函數rollback。如圖:
(4)信號驅動IO模型
首先開啟套接口信號驅動IO功能,通過系統調用sigaction執行一個信號處理函數,該函數立即返回,進程繼續工作,它是非阻塞的。當數據準備就緒時,就為該進程生成一個SIGIO信號,通過信號回調通知應用程序調用recfrom來讀取數據,通知主循環函數處理數據。如圖:
(5)異步IO模型
告知內核啟動某個操作,讓內核在整個操作完成後(包括將數據從內核復制到用戶自己的緩沖區)通知我們。它與信號驅動的主要區別是:信號驅動IO由內核告知我們何時開始一個IO操作,異步IO模型由內核通知我們IO操作何時已經完成。如圖所示:
IO多路復用的應用:
通過把多個IO的阻塞復用到一個select的阻塞上,使系統在單線程下可處理多個客戶端請求。與傳統多線程模型相比,最大優勢是系統開銷小,不需要創建額外進程或線程。主要應用場景如下:
(1)服務器需要同時處理多個處於監聽狀態或連接狀態的套接字
(2)服務器需要同時處理多種網絡協議的套接字
Linux最終選擇epoll支持IO多路復用的系統調用,優點如下:
(1)支持一個進程打開的socket描述符(FD)不受限制(select單線程默認1024太少,epoll僅受限操作系統最大文件句柄數,1GB內存機器大約10萬句柄)
(2)IO效率不會隨FD數目增加而線性下降(只對“活躍”的socke進行t操作,活躍socket才會去主動調用callback函數)
(3)使用mmap加速內核與用戶空間消息傳遞(同一塊內存,避免不必要復制)
(4)API簡單:創建epoll描述符,添加監聽事件,阻塞等待監聽事件發生,關閉epoll描述符等
二、阻塞IO的例子(結合線程池)
//1.服務端 package com.xbai.io; import java.io.IOException; import java.net.ServerSocket; import java.net.Socket; import com.xbai.executor.TimeServerHandlerExecutePool; import com.xbai.handler.TimeServerHandler; public class TimeServerExecutor { public static void main(String[] args)throws IOException { int port =8080; if(args !=null && args.length >0){ try { port = Integer.valueOf(args[0]); }catch (Exception e) { // TODO: handle exception } } ServerSocket server =null; try { server =new ServerSocket(port); System.out.println("The time server is started in port : " + port); TimeServerHandlerExecutePool singleExecutor =new TimeServerHandlerExecutePool(50,10000); while(true){ Socket socket = server.accept(); singleExecutor.execute(new TimeServerHandler(socket)); } }finally { if(server !=null){ System.out.println("The time server closed"); server.close(); server =null; } } } }
//2.服務端線程池 package com.xbai.executor; import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.ExecutorService; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; public class TimeServerHandlerExecutePool { private ExecutorService executor; public TimeServerHandlerExecutePool(int maxPoolSize,int queueSize){ executor =new ThreadPoolExecutor(Runtime.getRuntime().availableProcessors(),maxPoolSize,120L,TimeUnit.SECONDS, new ArrayBlockingQueue(queueSize));//線程池要執行的任務阻塞成一個隊列,其內部的機制是等待喚醒生產者和消費者線程,有一個生產就可喚醒一個消費,去看源碼的線程池原理 } public void execute(Runnable task){ executor.execute(task); } }
//3.服務端處理器 package com.xbai.handler; import java.io.BufferedReader; import java.io.IOException; import java.io.InputStreamReader; import java.io.OutputStreamWriter; import java.io.PrintWriter; import java.net.Socket; import java.sql.Date; public class TimeServerHandlerimplements Runnable{ private Socketsocket; public TimeServerHandler(Socket socket) { this.socket = socket; } @Override public void run() { // TODO Auto-generated method stub BufferedReader br =null; PrintWriter pw =null; try { br =new BufferedReader(new InputStreamReader(socket.getInputStream())); pw =new PrintWriter(socket.getOutputStream(),true); String curTime =null; String msg =null; while(true){ msg = br.readLine(); if(msg ==null){ break; } System.out.println("The time server received order:" + msg); curTime ="query time order".equalsIgnoreCase(msg) ?new Date( System.currentTimeMillis()).toString() :"bad order"; pw.println(curTime);//這裏不寫println,就無法插入換行符,那邊就不能readLine,一直阻塞,無法獲取數據 } }catch (IOException e) { if(br !=null){ try { br.close(); }catch (IOException e1) { // TODO Auto-generated catch block e1.printStackTrace(); } } if(pw !=null){ pw.close(); pw =null; } if(socket !=null){ try { socket.close(); }catch (IOException e1) { // TODO Auto-generated catch block e1.printStackTrace(); } socket =null; } } } }
//4.客戶端代碼 package com.xbai.io; import java.io.BufferedReader; import java.io.IOException; import java.io.InputStreamReader; import java.io.PrintWriter; import java.net.Socket; import java.net.UnknownHostException; public class TimeClient { public static void main(String[] args) { int port =8080; if(args !=null && args.length >0){ try { port = Integer.valueOf(args[0]); }catch (Exception e) { // TODO: handle exception } } Socket socket =null; BufferedReader br =null; PrintWriter pw =null; try { socket =new Socket("localhost",port); br =new BufferedReader(new InputStreamReader(socket.getInputStream())); pw =new PrintWriter(socket.getOutputStream(),true); pw.println("query time order"); System.out.println("send order succeed"); String resp = br.readLine(); System.out.println("Now is :" + resp); }catch (IOException e) { // TODO Auto-generated catch block e.printStackTrace(); }finally{ if(pw !=null){ pw.close(); pw =null; } if(br !=null){ try { br.close(); }catch (IOException e) { // TODO Auto-generated catch block e.printStackTrace(); } br =null; } if(socket !=null){ try { socket.close(); }catch (IOException e) { // TODO Auto-generated catch block e.printStackTrace(); } socket =null; } } } }
Netty實踐與NIO原理