java bio nio aio
流程圖:
BIO
BIO 全稱Block-IO 是一種阻塞同步的通訊模式。我們常說的Stock IO 一般指的是BIO。是一個比較傳統的通訊方式,模式簡單,使用方便。但併發處理能力低,通訊耗時,依賴網速。 BIO 設計原理: 伺服器通過一個Acceptor執行緒負責監聽客戶端請求和為每個客戶端建立一個新的執行緒進行鏈路處理。典型的一請求一應答模式。若客戶端數量增多,頻繁地建立和銷燬執行緒會給伺服器開啟很大的壓力。後改良為用執行緒池的方式代替新增執行緒,被稱為偽非同步IO。
伺服器提供IP地址和監聽的埠,客戶端通過TCP的三次握手與伺服器連線,連線成功後,雙放才能通過套接字(Stock)通訊。 小結:BIO模型中通過Socket和ServerSocket完成套接字通道的實現。阻塞,同步,建立連線耗時
BIO伺服器程式碼,負責啟動服務,阻塞服務,監聽客戶端請求,新建執行緒處理任務。
import java.io.IOException; import java.net.ServerSocket; import java.net.Socket; import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; /** * IO 也稱為 BIO,Block IO 阻塞同步的通訊方式 * 比較傳統的技術,實際開發中基本上用Netty或者是AIO。熟悉BIO,NIO,體會其中變化的過程。作為一個web開發人員,stock通訊面試經常問題。 * BIO最大的問題是:阻塞,同步。 * BIO通訊方式很依賴於網路,若網速不好,阻塞時間會很長。每次請求都由程式執行並返回,這是同步的缺陷。 * BIO工作流程: * 第一步:server端伺服器啟動 * 第二步:server端伺服器阻塞監聽client請求 * 第三步:server端伺服器接收請求,建立執行緒實現任務 */ public class ITDragonBIOServer { private static final Integer PORT = 8888; // 伺服器對外的埠號 public static void main(String[] args) { ServerSocket server = null; Socket socket = null; ThreadPoolExecutor executor = null; try { server = new ServerSocket(PORT); // ServerSocket 啟動監聽埠 System.out.println("BIO Server 伺服器啟動........."); /*--------------傳統的新增執行緒處理----------------*/ /*while (true) { // 伺服器監聽:阻塞,等待Client請求 socket = server.accept(); System.out.println("server 伺服器確認請求 : " + socket); // 伺服器連線確認:確認Client請求後,建立執行緒執行任務 。很明顯的問題,若每接收一次請求就要建立一個執行緒,顯然是不合理的。 new Thread(new ITDragonBIOServerHandler(socket)).start(); } */ /*--------------通過執行緒池處理緩解高併發給程式帶來的壓力(偽非同步IO程式設計)----------------*/ executor = new ThreadPoolExecutor(10, 100, 1000, TimeUnit.SECONDS, new ArrayBlockingQueue<Runnable>(50)); while (true) { socket = server.accept(); // 伺服器監聽:阻塞,等待Client請求 ITDragonBIOServerHandler serverHandler = new ITDragonBIOServerHandler(socket); executor.execute(serverHandler); } } catch (IOException e) { e.printStackTrace(); } finally { try { if (null != socket) { socket.close(); socket = null; } if (null != server) { server.close(); server = null; System.out.println("BIO Server 伺服器關閉了!!!!"); } executor.shutdown(); } catch (IOException e) { e.printStackTrace(); } } } }
BIO服務端處理任務程式碼,負責處理Stock套接字,返回套接字給客戶端,解耦。
import java.io.BufferedReader; import java.io.IOException; import java.io.InputStreamReader; import java.io.PrintWriter; import java.net.Socket; import com.itdragon.util.CalculatorUtil; public class ITDragonBIOServerHandler implements Runnable{ private Socket socket; public ITDragonBIOServerHandler(Socket socket) { this.socket = socket; } @Override public void run() { BufferedReader reader = null; PrintWriter writer = null; try { reader = new BufferedReader(new InputStreamReader(this.socket.getInputStream())); writer = new PrintWriter(this.socket.getOutputStream(), true); String body = null; while (true) { body = reader.readLine(); // 若客戶端用的是 writer.print() 傳值,那readerLine() 是不能獲取值,細節 if (null == body) { break; } System.out.println("server服務端接收引數 : " + body); writer.println(body + " = " + CalculatorUtil.cal(body).toString()); } } catch (IOException e) { e.printStackTrace(); } finally { if (null != writer) { writer.close(); } try { if (null != reader) { reader.close(); } if (null != this.socket) { this.socket.close(); this.socket = null; } } catch (IOException e) { e.printStackTrace(); } } } }
BIO客戶端程式碼,負責啟動客戶端,向伺服器傳送請求,接收伺服器返回的Stock套接字。
import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
import java.io.PrintWriter;
import java.net.Socket;
import java.util.Random;
/**
* BIO 客戶端
* Socket : 向服務端傳送連線
* PrintWriter : 向服務端傳遞引數
* BufferedReader : 從服務端接收引數
*/
public class ITDragonBIOClient {
private static Integer PORT = 8888;
private static String IP_ADDRESS = "127.0.0.1";
public static void main(String[] args) {
for (int i = 0; i < 10; i++) {
clientReq(i);
}
}
private static void clientReq(int i) {
Socket socket = null;
BufferedReader reader = null;
PrintWriter writer = null;
try {
socket = new Socket(IP_ADDRESS, PORT); // Socket 發起連線操作。連線成功後,雙方通過輸入和輸出流進行同步阻塞式通訊
reader = new BufferedReader(new InputStreamReader(socket.getInputStream())); // 獲取返回內容
writer = new PrintWriter(socket.getOutputStream(), true);
String []operators = {"+","-","*","/"};
Random random = new Random(System.currentTimeMillis());
String expression = random.nextInt(10)+operators[random.nextInt(4)]+(random.nextInt(10)+1);
writer.println(expression); // 向伺服器端傳送資料
System.out.println(i + " 客戶端列印返回資料 : " + reader.readLine());
} catch (Exception e) {
e.printStackTrace();
} finally {
try {
if (null != reader) {
reader.close();
}
if (null != socket) {
socket.close();
socket = null;
}
} catch (IOException e) {
e.printStackTrace();
}
}
}
}
NIO
NIO 全稱New IO,也叫Non-Block IO 是一種非阻塞同步的通訊模式。 NIO 設計原理: NIO 相對於BIO來說一大進步。客戶端和伺服器之間通過Channel通訊。NIO可以在Channel進行讀寫操作。這些Channel都會被註冊在Selector多路複用器上。Selector通過一個執行緒不停的輪詢這些Channel。找出已經準備就緒的Channel執行IO操作。 NIO 通過一個執行緒輪詢,實現千萬個客戶端的請求,這就是非阻塞NIO的特點。 1)緩衝區Buffer:它是NIO與BIO的一個重要區別。BIO是將資料直接寫入或讀取到Stream物件中。而NIO的資料操作都是在緩衝區中進行的。緩衝區實際上是一個數組。Buffer最常見的型別是ByteBuffer,另外還有CharBuffer,ShortBuffer,IntBuffer,LongBuffer,FloatBuffer,DoubleBuffer。 2)通道Channel:和流不同,通道是雙向的。NIO可以通過Channel進行資料的讀,寫和同時讀寫操作。通道分為兩大類:一類是網路讀寫(SelectableChannel),一類是用於檔案操作(FileChannel),我們使用的SocketChannel和ServerSocketChannel都是SelectableChannel的子類。 3)多路複用器Selector:NIO程式設計的基礎。多路複用器提供選擇已經就緒的任務的能力。就是Selector會不斷地輪詢註冊在其上的通道(Channel),如果某個通道處於就緒狀態,會被Selector輪詢出來,然後通過SelectionKey可以取得就緒的Channel集合,從而進行後續的IO操作。伺服器端只要提供一個執行緒負責Selector的輪詢,就可以接入成千上萬個客戶端,這就是JDK NIO庫的巨大進步。
說明:這裡的程式碼只實現了客戶端傳送請求,服務端接收資料的功能。其目的是簡化程式碼,方便理解。github原始碼中有完整程式碼。 小結:NIO模型中通過SocketChannel和ServerSocketChannel完成套接字通道的實現。非阻塞/阻塞,同步,避免TCP建立連線使用三次握手帶來的開銷。
NIO伺服器程式碼,負責開啟多路複用器,開啟通道,註冊通道,輪詢通道,處理通道。
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.util.Iterator;
/**
* NIO 也稱 New IO, Non-Block IO,非阻塞同步通訊方式
* 從BIO的阻塞到NIO的非阻塞,這是一大進步。功歸於Buffer,Channel,Selector三個設計實現。
* Buffer : 緩衝區。NIO的資料操作都是在緩衝區中進行。緩衝區實際上是一個數組。而BIO是將資料直接寫入或讀取到Stream物件。
* Channel : 通道。NIO可以通過Channel進行資料的讀,寫和同時讀寫操作。
* Selector : 多路複用器。NIO程式設計的基礎。多路複用器提供選擇已經就緒狀態任務的能力。
* 客戶端和伺服器通過Channel連線,而這些Channel都要註冊在Selector。Selector通過一個執行緒不停的輪詢這些Channel。找出已經準備就緒的Channel執行IO操作。
* NIO通過一個執行緒輪詢,實現千萬個客戶端的請求,這就是非阻塞NIO的特點。
*/
public class ITDragonNIOServer implements Runnable{
private final int BUFFER_SIZE = 1024; // 緩衝區大小
private final int PORT = 8888; // 監聽的埠
private Selector selector; // 多路複用器,NIO程式設計的基礎,負責管理通道Channel
private ByteBuffer readBuffer = ByteBuffer.allocate(BUFFER_SIZE); // 緩衝區Buffer
public ITDragonNIOServer() {
startServer();
}
private void startServer() {
try {
// 1.開啟多路複用器
selector = Selector.open();
// 2.開啟伺服器通道(網路讀寫通道)
ServerSocketChannel channel = ServerSocketChannel.open();
// 3.設定伺服器通道為非阻塞模式,true為阻塞,false為非阻塞
channel.configureBlocking(false);
// 4.繫結埠
channel.socket().bind(new InetSocketAddress(PORT));
// 5.把通道註冊到多路複用器上,並監聽阻塞事件
/**
* SelectionKey.OP_READ : 表示關注讀資料就緒事件
* SelectionKey.OP_WRITE : 表示關注寫資料就緒事件
* SelectionKey.OP_CONNECT: 表示關注socket channel的連線完成事件
* SelectionKey.OP_ACCEPT : 表示關注server-socket channel的accept事件
*/
channel.register(selector, SelectionKey.OP_ACCEPT);
System.out.println("Server start >>>>>>>>> port :" + PORT);
} catch (IOException e) {
e.printStackTrace();
}
}
// 需要一個執行緒負責Selector的輪詢
@Override
public void run() {
while (true) {
try {
/**
* a.select() 阻塞到至少有一個通道在你註冊的事件上就緒
* b.select(long timeOut) 阻塞到至少有一個通道在你註冊的事件上就緒或者超時timeOut
* c.selectNow() 立即返回。如果沒有就緒的通道則返回0
* select方法的返回值表示就緒通道的個數。
*/
// 1.多路複用器監聽阻塞
selector.select();
// 2.多路複用器已經選擇的結果集
Iterator<SelectionKey> selectionKeys = selector.selectedKeys().iterator();
// 3.不停的輪詢
while (selectionKeys.hasNext()) {
// 4.獲取一個選中的key
SelectionKey key = selectionKeys.next();
// 5.獲取後便將其從容器中移除
selectionKeys.remove();
// 6.只獲取有效的key
if (!key.isValid()){
continue;
}
// 阻塞狀態處理
if (key.isAcceptable()){
accept(key);
}
// 可讀狀態處理
if (key.isReadable()){
read(key);
}
}
} catch (IOException e) {
e.printStackTrace();
}
}
}
// 設定阻塞,等待Client請求。在傳統IO程式設計中,用的是ServerSocket和Socket。在NIO中採用的ServerSocketChannel和SocketChannel
private void accept(SelectionKey selectionKey) {
try {
// 1.獲取通道服務
ServerSocketChannel serverSocketChannel = (ServerSocketChannel) selectionKey.channel();
// 2.執行阻塞方法
SocketChannel socketChannel = serverSocketChannel.accept();
// 3.設定伺服器通道為非阻塞模式,true為阻塞,false為非阻塞
socketChannel.configureBlocking(false);
// 4.把通道註冊到多路複用器上,並設定讀取標識
socketChannel.register(selector, SelectionKey.OP_READ);
} catch (IOException e) {
e.printStackTrace();
}
}
private void read(SelectionKey selectionKey) {
try {
// 1.清空緩衝區資料
readBuffer.clear();
// 2.獲取在多路複用器上註冊的通道
SocketChannel socketChannel = (SocketChannel) selectionKey.channel();
// 3.讀取資料,返回
int count = socketChannel.read(readBuffer);
// 4.返回內容為-1 表示沒有資料
if (-1 == count) {
selectionKey.channel().close();
selectionKey.cancel();
return ;
}
// 5.有資料則在讀取資料前進行復位操作
readBuffer.flip();
// 6.根據緩衝區大小建立一個相應大小的bytes陣列,用來獲取值
byte[] bytes = new byte[readBuffer.remaining()];
// 7.接收緩衝區資料
readBuffer.get(bytes);
// 8.列印獲取到的資料
System.out.println("NIO Server : " + new String(bytes)); // 不能用bytes.toString()
} catch (IOException e) {
e.printStackTrace();
}
}
public static void main(String[] args) {
new Thread(new ITDragonNIOServer()).start();
}
}
NIO客戶端程式碼,負責連線伺服器,宣告通道,連線通道
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SocketChannel;
public class ITDragonNIOClient {
private final static int PORT = 8888;
private final static int BUFFER_SIZE = 1024;
private final static String IP_ADDRESS = "127.0.0.1";
public static void main(String[] args) {
clientReq();
}
private static void clientReq() {
// 1.建立連線地址
InetSocketAddress inetSocketAddress = new InetSocketAddress(IP_ADDRESS, PORT);
// 2.宣告一個連線通道
SocketChannel socketChannel = null;
// 3.建立一個緩衝區
ByteBuffer byteBuffer = ByteBuffer.allocate(BUFFER_SIZE);
try {
// 4.開啟通道
socketChannel = SocketChannel.open();
// 5.連線伺服器
socketChannel.connect(inetSocketAddress);
while(true){
// 6.定義一個位元組陣列,然後使用系統錄入功能:
byte[] bytes = new byte[BUFFER_SIZE];
// 7.鍵盤輸入資料
System.in.read(bytes);
// 8.把資料放到緩衝區中
byteBuffer.put(bytes);
// 9.對緩衝區進行復位
byteBuffer.flip();
// 10.寫出資料
socketChannel.write(byteBuffer);
// 11.清空緩衝區資料
byteBuffer.clear();
}
} catch (IOException e) {
e.printStackTrace();
} finally {
if (null != socketChannel) {
try {
socketChannel.close();
} catch (IOException e) {
e.printStackTrace();
}
}
}
}
}
AIO
AIO 也叫NIO2.0 是一種非阻塞非同步的通訊模式。在NIO的基礎上引入了新的非同步通道的概念,並提供了非同步檔案通道和非同步套接字通道的實現。 AIO 並沒有採用NIO的多路複用器,而是使用非同步通道的概念。其read,write方法的返回型別都是Future物件。而Future模型是非同步的,其核心思想是:去主函式等待時間。
小結:AIO模型中通過AsynchronousSocketChannel和AsynchronousServerSocketChannel完成套接字通道的實現。非阻塞,非同步。
AIO服務端程式碼,負責建立伺服器通道,繫結埠,等待請求。
import java.net.InetSocketAddress;
import java.nio.channels.AsynchronousChannelGroup;
import java.nio.channels.AsynchronousServerSocketChannel;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
/**
* AIO, 也叫 NIO2.0 是一種非同步非阻塞的通訊方式
* AIO 引入了非同步通道的概念 AsynchronousServerSocketChannel和AsynchronousSocketChannel 其read和write方法返回值型別是Future物件。
*/
public class ITDragonAIOServer {
private ExecutorService executorService; // 執行緒池
private AsynchronousChannelGroup threadGroup; // 通道組
public AsynchronousServerSocketChannel asynServerSocketChannel; // 伺服器通道
public void start(Integer port){
try {
// 1.建立一個快取池
executorService = Executors.newCachedThreadPool();
// 2.建立通道組
threadGroup = AsynchronousChannelGroup.withCachedThreadPool(executorService, 1);
// 3.建立伺服器通道
asynServerSocketChannel = AsynchronousServerSocketChannel.open(threadGroup);
// 4.進行繫結
asynServerSocketChannel.bind(new InetSocketAddress(port));
System.out.println("server start , port : " + port);
// 5.等待客戶端請求
asynServerSocketChannel.accept(this, new ITDragonAIOServerHandler());
// 一直阻塞 不讓伺服器停止,真實環境是在tomcat下執行,所以不需要這行程式碼
Thread.sleep(Integer.MAX_VALUE);
} catch (Exception e) {
e.printStackTrace();
}
}
public static void main(String[] args) {
ITDragonAIOServer server = new ITDragonAIOServer();
server.start(8888);
}
}
AIO伺服器任務處理程式碼,負責,讀取資料,寫入資料
import java.nio.ByteBuffer;
import java.nio.channels.AsynchronousSocketChannel;
import java.nio.channels.CompletionHandler;
import java.util.concurrent.ExecutionException;
import com.itdragon.util.CalculatorUtil;
public class ITDragonAIOServerHandler implements CompletionHandler<AsynchronousSocketChannel, ITDragonAIOServer> {
private final Integer BUFFER_SIZE = 1024;
@Override
public void completed(AsynchronousSocketChannel asynSocketChannel, ITDragonAIOServer attachment) {
// 保證多個客戶端都可以阻塞
attachment.asynServerSocketChannel.accept(attachment, this);
read(asynSocketChannel);
}
//讀取資料
private void read(final AsynchronousSocketChannel asynSocketChannel) {
ByteBuffer byteBuffer = ByteBuffer.allocate(BUFFER_SIZE);
asynSocketChannel.read(byteBuffer, byteBuffer, new CompletionHandler<Integer, ByteBuffer>() {
@Override
public void completed(Integer resultSize, ByteBuffer attachment) {
//進行讀取之後,重置標識位
attachment.flip();
//獲取讀取的資料
String resultData = new String(attachment.array()).trim();
System.out.println("Server -> " + "收到客戶端的資料資訊為:" + resultData);
String response = resultData + " = " + CalculatorUtil.cal(resultData);
write(asynSocketChannel, response);
}
@Override
public void failed(Throwable exc, ByteBuffer attachment) {
exc.printStackTrace();
}
});
}
// 寫入資料
private void write(AsynchronousSocketChannel asynSocketChannel, String response) {
try {
// 把資料寫入到緩衝區中
ByteBuffer buf = ByteBuffer.allocate(BUFFER_SIZE);
buf.put(response.getBytes());
buf.flip();
// 在從緩衝區寫入到通道中
asynSocketChannel.write(buf).get();
} catch (InterruptedException e) {
e.printStackTrace();
} catch (ExecutionException e) {
e.printStackTrace();
}
}
@Override
public void failed(Throwable exc, ITDragonAIOServer attachment) {
exc.printStackTrace();
}
}
AIO客戶端程式碼,負責連線伺服器,宣告通道,連線通道
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.AsynchronousSocketChannel;
import java.util.Random;
public class ITDragonAIOClient implements Runnable{
private static Integer PORT = 8888;
private static String IP_ADDRESS = "127.0.0.1";
private AsynchronousSocketChannel asynSocketChannel ;
public ITDragonAIOClient() throws Exception {
asynSocketChannel = AsynchronousSocketChannel.open(); // 開啟通道
}
public void connect(){
asynSocketChannel.connect(new InetSocketAddress(IP_ADDRESS, PORT)); // 建立連線 和NIO一樣
}
public void write(String request){
try {
asynSocketChannel.write(ByteBuffer.wrap(request.getBytes())).get();
ByteBuffer byteBuffer = ByteBuffer.allocate(1024);
asynSocketChannel.read(byteBuffer).get();
byteBuffer.flip();
byte[] respByte = new byte[byteBuffer.remaining()];
byteBuffer.get(respByte); // 將緩衝區的資料放入到 byte陣列中
System.out.println(new String(respByte,"utf-8").trim());
} catch (Exception e) {
e.printStackTrace();
}
}
@Override
public void run() {
while(true){
}
}
public static void main(String[] args) throws Exception {
for (int i = 0; i < 10; i++) {
ITDragonAIOClient myClient = new ITDragonAIOClient();
myClient.connect();
new Thread(myClient, "myClient").start();
String []operators = {"+","-","*","/"};
Random random = new Random(System.currentTimeMillis());
String expression = random.nextInt(10)+operators[random.nextInt(4)]+(random.nextInt(10)+1);
myClient.write(expression);
}
}
}
常見面試題
1 IO,NIO,AIO區別 IO 阻塞同步通訊模式,客戶端和伺服器連線需要三次握手,使用簡單,但吞吐量小 NIO 非阻塞同步通訊模式,客戶端與伺服器通過Channel連線,採用多路複用器輪詢註冊的Channel。提高吞吐量和可靠性。 AIO 非阻塞非同步通訊模式,NIO的升級版,採用非同步通道實現非同步通訊,其read和write方法均是非同步方法。
2 Stock通訊的虛擬碼實現流程 伺服器繫結埠:server = new ServerSocket(PORT) 伺服器阻塞監聽:socket = server.accept() 伺服器開啟執行緒:new Thread(Handle handle) 伺服器讀寫資料:BufferedReader PrintWriter 客戶端繫結IP和PORT:new Socket(IP_ADDRESS, PORT) 客戶端傳輸接收資料:BufferedReader PrintWriter
3 TCP協議與UDP協議有什麼區別 TCP : 傳輸控制協議是基於連線的協議,在正式收發資料前,必須和對方建立可靠的連線。速度慢,合適傳輸大量資料。 UDP : 使用者資料報協議是與TCP相對應的協議。面向非連線的協議,不與對方建立連線,而是直接就把資料包傳送過去,速度快,適合傳輸少量資料。
4 什麼是同步阻塞BIO,同步非阻塞NIO,非同步非阻塞AIO 同步阻塞IO : 使用者程序發起一個IO操作以後,必須等待IO操作的真正完成後,才能繼續執行。 同步非阻塞IO: 使用者程序發起一個IO操作以後,可做其它事情,但使用者程序需要經常詢問IO操作是否完成,這樣造成不必要的CPU資源浪費。 非同步非阻塞IO: 使用者程序發起一個IO操作然後,立即返回,等IO操作真正的完成以後,應用程式會得到IO操作完成的通知。類比Future模式。
總結
1 BIO模型中通過Socket和ServerSocket完成套接字通道實現。阻塞,同步,連線耗時。 2 NIO模型中通過SocketChannel和ServerSocketChannel完成套接字通道實現。非阻塞/阻塞,同步,避免TCP建立連線使用三次握手帶來的開銷。 3 AIO模型中通過AsynchronousSocketChannel和AsynchronousServerSocketChannel完成套接字通道實現。非阻塞,非同步。