NIO、Netty(NIO基礎)
1、阻塞與非阻塞
阻塞與非阻塞是描述程序在訪問某個資源時,資料是否準備就緒的的一種處理方式。當資料沒有準備就緒時:
阻塞:執行緒持續等待資源中資料準備完成,直到返回響應結果。
非阻塞:執行緒直接返回結果,不會持續等待資源準備資料結束後才響應結果。
2、同步與非同步
同步與非同步是指訪問資料的機制,同步一般指主動請求並等待IO操作完成的方式。
非同步則指主動請求資料後便可以繼續處理其它任務,隨後等待IO操作完畢的通知。
老王燒開水:
1、普通水壺煮水,站在旁邊,主動的看水開了沒有?同步的阻塞
2、普通水壺煮水,去幹點別的事
3、響水壺煮水,站在旁邊,不會每過一段時間主動看水開了沒有。如果水開了,水壺自動通知他。 非同步阻塞
4、響水壺煮水,去幹點別的事,如果水開了,水壺自動通知他。非同步非阻塞
3、傳統BIO模型
傳統BIO是一種同步的阻塞IO,IO在進行讀寫時,該執行緒將被阻塞,執行緒無法進行其它操作。
IO流在讀取時,會阻塞。直到發生以下情況:1、有資料可以讀取。2、資料讀取完成。3、發生異常。
4、偽非同步IO模型
以傳統BIO模型為基礎,通過執行緒池的方式維護所有的IO執行緒,實現相對高效的執行緒開銷及管理。
5.NIO模型
NIO(JDK1.4)模型是一種同步非阻塞IO,主要有三大核心部分:Channel(通道),Buffer(緩衝區), Selector(多路複用器)。傳統IO基於位元組流和字元流進行操作,而NIO基於Channel和Buffer(緩衝區)進行操作,資料總是從通道讀取到緩衝區中,或者從緩衝區寫入到通道中。Selector(多路複用器)用於監聽多個通道的事件(比如:連線開啟,資料到達)。因此,單個執行緒可以監聽多個數據通道。
NIO和傳統IO(一下簡稱IO)之間第一個最大的區別是,IO是面向流的,NIO是面向緩衝區的。 Java IO面向流意味著每次從流中讀一個或多個位元組,直至讀取所有位元組,它們沒有被快取在任何地方。此外,它不能前後移動流中的資料。如果需要前後移動從流中讀取的資料,需要先將它快取到一個緩衝區。NIO的緩衝導向方法略有不同。資料讀取到一個它稍後處理的緩衝區,需要時可在緩衝區中前後移動。這就增加了處理過程中的靈活性。但是,還需要檢查是否該緩衝區中包含所有您需要處理的資料。而且,需確保當更多的資料讀入緩衝區時,不要覆蓋緩衝區裡尚未處理的資料。
IO的各種流是阻塞的。這意味著,當一個執行緒呼叫read() 或 write()時,該執行緒被阻塞,直到有一些資料被讀取,或資料完全寫入。該執行緒在此期間不能再幹任何事情了。 NIO的非阻塞模式,使一個執行緒從某通道傳送請求讀取資料,但是它僅能得到目前可用的資料,如果目前沒有資料可用時,就什麼都不會獲取。而不是保持執行緒阻塞,所以直至資料變的可以讀取之前,該執行緒可以繼續做其他的事情。 非阻塞寫也是如此。一個執行緒請求寫入一些資料到某通道,但不需要等待它完全寫入,這個執行緒同時可以去做別的事情。 執行緒通常將非阻塞IO的空閒時間用於在其它通道上執行IO操作,所以一個單獨的執行緒現在可以管理多個輸入和輸出通道(channel)。
NIO優點:
- 通過Channel註冊到Selector上的狀態來實現一種客戶端與服務端的通訊。
- Channel中資料的讀取是通過Buffer , 一種非阻塞的讀取方式。
- Selector 多路複用器 單執行緒模型, 執行緒的資源開銷相對比較小。
Channel(通道)
傳統IO操作對read()或write()方法的呼叫,可能會因為沒有資料可讀/可寫而阻塞,直到有資料響應。也就是說讀寫資料的IO呼叫,可能會無限期的阻塞等待,效率依賴網路傳輸的速度。最重要的是在呼叫一個方法前,無法知道是否會被阻塞。
NIO的Channel抽象了一個重要特徵就是可以通過配置它的阻塞行為,來實現非阻塞式的通道。
Channel是一個雙向通道,與傳統IO操作只允許單向的讀寫不同的是,NIO的Channel允許在一個通道上進行讀和寫的操作。
FileChannel:檔案
SocketChannel:
ServerSocketChannel:
DatagramChannel: UDP
Buffer(緩衝區)
Bufer顧名思義,它是一個緩衝區,實際上是一個容器,一個連續陣列。Channel提供從檔案、網路讀取資料的渠道,但是讀寫的資料都必須經過Buffer。
Buffer緩衝區本質上是一塊可以寫入資料,然後可以從中讀取資料的記憶體。這塊記憶體被包裝成NIO Buffer物件,並提供了一組方法,用來方便的訪問該模組記憶體。為了理解Buffer的工作原理,需要熟悉它的三個屬性:capacity、position和limit。
position和limit的含義取決於Buffer處在讀模式還是寫模式。不管Buffer處在什麼模式,capacity的含義總是一樣的。見下圖:
capacity:作為一個記憶體塊,Buffer有固定的大小值,也叫作“capacity”,只能往其中寫入capacity個byte、long、char等型別。一旦Buffer滿了,需要將其清空(通過讀資料或者清楚資料)才能繼續寫資料。
position:當你寫資料到Buffer中時,position表示當前的位置。出事的position值為0,當寫入一個位元組資料到Buffer中後,position會向前移動到下一個可插入資料的Buffer單元。position最大可為capacity-1。當讀取資料時,也是從某個特定位置讀,講Buffer從寫模式切換到讀模式,position會被重置為0。當從Buffer的position處讀取一個位元組資料後,position向前移動到下一個可讀的位置。
limit:在寫模式下,Buffer的limit表示你最多能往Buffer裡寫多少資料。 寫模式下,limit等於Buffer的capacity。當切換Buffer到讀模式時, limit表示你最多能讀到多少資料。因此,當切換Buffer到讀模式時,limit會被設定成寫模式下的position值。換句話說,你能讀到之前寫入的所有資料(limit被設定成已寫資料的數量,這個值在寫模式下就是position)
Buffer的分配:對Buffer物件的操作必須首先進行分配,Buffer提供一個allocate(int capacity)方法分配一個指定位元組大小的物件。
向Buffer中寫資料:寫資料到Buffer中有兩種方式:
1、從channel寫到Buffer
int bytes = channel.read(buf); //將channel中的資料讀取到buf中
2、通過Buffer的put()方法寫到Buffer
buf.put(byte); //將資料通過put()方法寫入到buf中
flip()方法:將Buffer從寫模式切換到讀模式,呼叫flip()方法會將position設定為0,並將limit設定為之前的position的值。
從Buffer中讀資料:從Buffer中讀資料有兩種方式:
1、從Buffer讀取資料到Channel
int bytes = channel.write(buf); //將buf中的資料讀取到channel中
2、通過Buffer的get()方法讀取資料
byte bt = buf.get(); //從buf中讀取一個byte
rewind()方法:Buffer.rewind()方法將position設定為0,使得可以重讀Buffer中的所有資料,limit保持不變。
clear()與compact()方法:一旦讀完Buffer中的資料,需要讓Buffer準備好再次被寫入,可以通過clear()或compact()方法完成。如果呼叫的是clear()方法,position將被設定為0,limit設定為capacity的值。但是Buffer並未被清空,只是通過這些標記告訴我們可以從哪裡開始往Buffer中寫入多少資料。如果Buffer中還有一些未讀的資料,呼叫clear()方法將被"遺忘 "。compact()方法將所有未讀的資料拷貝到Buffer起始處,然後將position設定到最後一個未讀元素的後面,limit屬性依然設定為capacity。可以使得Buffer中的未讀資料還可以在後續中被使用。
mark()與reset()方法:通過呼叫Buffer.mark()方法可以標記一個特定的position,之後可以通過呼叫Buffer.reset()恢復到這個position上。
Selector(多路複用器)
Selector與Channel是相互配合使用的,將Channel註冊在Selector上之後,才可以正確的使用Selector,但此時Channel必須為非阻塞模式。Selector可以監聽Channel的四種狀態(Connect、Accept、Read、Write),當監聽到某一Channel的某個狀態時,才允許對Channel進行相應的操作。
測試程式碼
/**
* 服務端
*/
public class MultiplexerTimeServer implements Runnable {
private Selector selector;
private ServerSocketChannel serverChannel;
private volatile boolean stop;
public MultiplexerTimeServer(int port) {
try {
//開啟ServerSocketChannel
serverChannel = ServerSocketChannel.open();
//設定為非阻塞模式
serverChannel.configureBlocking(false);
//繫結監聽的埠地址
serverChannel.socket().bind(new InetSocketAddress(port), 1024);
//建立Selector執行緒
selector = Selector.open();
//將ServerSocketChannel註冊到Selector,交給Selector監聽
serverChannel.register(selector, SelectionKey.OP_ACCEPT);
System.out.println("The time server is start in port:"+port);
} catch (Exception e) {
e.printStackTrace();
System.exit(1);
}
}
public void stop(){
this.stop = true;
}
@Override
public void run() {
while(!stop){
try {
//通過Selector迴圈準備就緒的Key
selector.select();
Set<SelectionKey> selectionKeys = selector.selectedKeys();
Iterator<SelectionKey> iterator = selectionKeys.iterator();
SelectionKey selectionKey = null;
while(iterator.hasNext()){
selectionKey = iterator.next();
iterator.remove();
try {
handleInput(selectionKey);
} catch (Exception e) {
if(selectionKey!=null){
selectionKey.cancel();
if(selectionKey.channel()!=null){
selectionKey.channel().close();
}
}
}
}
} catch (Exception e) {
e.printStackTrace();
}
}
if(selector !=null){
try {
selector.close();
} catch (IOException e) {
e.printStackTrace();
}
}
}
private void handleInput(SelectionKey selectionKey) throws IOException {
if(selectionKey.isValid()){
if (selectionKey.isAcceptable()) {
ServerSocketChannel server = (ServerSocketChannel) selectionKey.channel();
//多路複用器監聽到新的客戶端連線,處理連線請求,完成TCP三次握手。
SocketChannel client = server.accept();
//設定為非阻塞模式
client.configureBlocking(false);
// 將新連線註冊到多路複用器上,監聽其讀操作,讀取客戶端傳送的訊息。
client.register(selector, SelectionKey.OP_READ);
}
if(selectionKey.isReadable()){
SocketChannel client = (SocketChannel) selectionKey.channel();
ByteBuffer receivebuffer = ByteBuffer.allocate(1024);
//讀取客戶端請求訊息到緩衝區
int count = client.read(receivebuffer); //非阻塞
if (count > 0) {
receivebuffer.flip();
byte[] bytes = new byte[receivebuffer.remaining()]; //remaining()方法
//從緩衝區讀取訊息
receivebuffer.get(bytes);
String body = new String(bytes, "UTF-8");
System.out.println("The time server(Thread:"+Thread.currentThread()+") receive order : "+body);
//將currentTime響應給客戶端(客戶端Channel)
String currentTime = "QUERY TIME ORDER".equalsIgnoreCase(body) ? new Date(System.currentTimeMillis()).toString() : "BAD ORDER";
doWrite(client, currentTime);
}else if(count < 0){
selectionKey.channel();
client.close();
}else{
}
}
}
}
private void doWrite(SocketChannel client, String currentTime) throws IOException {
if(currentTime != null && currentTime.trim().length()>0){
ByteBuffer sendbuffer = ByteBuffer.allocate(1024);
sendbuffer.put(currentTime.getBytes());
sendbuffer.flip();
//將客戶端響應訊息寫入到客戶端Channel中。
client.write(sendbuffer);
System.out.println("伺服器端向客戶端傳送資料--:" + currentTime);
}else{
System.out.println("沒有資料");
}
}
}
/**
* 客戶端
*/
public class TimeClientHandler implements Runnable {
private String host;
private int port;
private SocketChannel socketChannel;
private Selector selector;
private volatile boolean stop;
public TimeClientHandler(String host, int port) {
this.host = host;
this.port = port;
try {
//開啟SocketChannel
socketChannel = SocketChannel.open();
//建立Selector執行緒
selector = Selector.open();
//設定為非阻塞模式
socketChannel.configureBlocking(false);
} catch (Exception e) {
e.printStackTrace();
System.exit(1);
}
}
@Override
public void run() {
try {
doConnect();
} catch (Exception e) {
e.printStackTrace();
System.exit(1);
}
while(!stop){
//輪訓通道的狀態
try {
selector.select(1000);
Set<SelectionKey> selectionKeys = selector.selectedKeys();
Iterator<SelectionKey> iterator = selectionKeys.iterator();
SelectionKey selectionKey = null;
while(iterator.hasNext()){
selectionKey = iterator.next();
iterator.remove();
try {
handleInput(selectionKey);
} catch (Exception e) {
if(selectionKey!=null){
selectionKey.cancel();
if(selectionKey.channel()!=null){
selectionKey.channel().close();
}
}
}
}
} catch (Exception e) {
e.printStackTrace();
System.exit(1);
}
}
if(selector !=null){
try {
selector.close();
} catch (IOException e) {
e.printStackTrace();
}
}
}
private void handleInput(SelectionKey selectionKey) throws Exception {
if(selectionKey.isValid()){
SocketChannel client = (SocketChannel) selectionKey.channel();
if (selectionKey.isConnectable()){
if(client.finishConnect()){
client.register(selector, SelectionKey.OP_READ);
doWrite(client);
}else{
System.exit(1);
}
}
if (selectionKey.isReadable()) {
ByteBuffer receivebuffer = ByteBuffer.allocate(1024);
int count = client.read(receivebuffer);
if (count > 0) {
receivebuffer.flip();
byte[] bytes = new byte[receivebuffer.remaining()]; //remaining()方法
receivebuffer.get(bytes);
String body = new String(bytes, "UTF-8");
System.out.println("Now is "+body);
this.stop = true;
}else if(count < 0){
selectionKey.channel();
client.close();
}else{
}
}
}
}
private void doConnect() throws Exception {
//連線服務端
boolean connect = socketChannel.connect(new InetSocketAddress(host, port));
//判斷是否連線成功,如果連線成功,則監聽Channel的讀狀態。
if(connect){
socketChannel.register(selector, SelectionKey.OP_READ);
//寫資料 寫給服務端
doWrite(socketChannel);
}else{
//如果沒有連線成功,則向多路複用器註冊Connect狀態
socketChannel.register(selector, SelectionKey.OP_CONNECT);
}
}
private void doWrite(SocketChannel channel) throws IOException {
ByteBuffer sendbuffer = ByteBuffer.allocate(1024);
sendbuffer.put("QUERY TIME ORDER".getBytes());
sendbuffer.flip();
//向Channel中寫入客戶端的請求指令 寫到服務端
channel.write(sendbuffer);
if(!sendbuffer.hasRemaining()){
System.out.println("Send order to server succeed.");
}
}
}
/**
* 執行服務端
**/
public class TimeServer {
public static void main(String[] args) {
int port=8080; //服務端預設埠
MultiplexerTimeServer timeServer=new MultiplexerTimeServer(port);
new Thread(timeServer, "NIO-MultiplexerTimeServer-001").start();
}
}
/**
* 執行客戶端
**/
public class TimeServerClient {
public static void main(String[] args) {
int port=8080; //服務端預設埠
new Thread(new TimeClientHandler("127.0.0.1", port), "NIO-TimeServerClient-001").start();
}
}