netty之NIO從淺入深、詳細解析。
BIO:同步、阻塞
NIO:同步、非阻塞
AIO:非同步、非阻塞
在學習NIO之前,需要大家先了解一些簡單的概念:
1.cache(快取記憶體)和buffer(緩衝區):
cache快取區和buffer緩衝區都是臨時儲存區。
buffer緩衝區主要存在於RAM中,作為CPU暫時儲存資料的區域(當計算機和其他裝置有不同的速度時,buffer儲存著緩衝的資料,這樣計算機就可以做其他應用了)
cache快取區是一種高速儲存區域,可以在主存或硬碟燈其他獨立儲存區域的一部分。
總結:cache快取記憶體為硬碟快取,buffer緩衝區被稱為記憶體快取。
2.阻塞(Block)和非阻塞(Non-Block):
阻塞和非阻塞是程序在訪問資料的時候,資料是否準備就緒的一種處理方式,當資料沒有準備的時候
阻塞:往往需要等待緩衝區中的資料準備好過後才處理其他的事情,否則一直等待在哪裡。
非阻塞:當我們的程序訪問我們的資料緩衝區的時候,如果資料沒有準備好則直接返回,不會等待。如果資料已經準備好,也直接放回。
3.同步(Synchronization)和非同步(Asynchronous)的方式:
同步和非同步都是基於應用程式和作業系統處理IO事件所採用的方式。
比如同步:是應用程式要直接參與IO讀寫的操作。
非同步:所有的IO讀寫交給作業系統去處理,應用程式只需要等待通知。
同步方式在處理IO事件的時候,必須阻塞在某個方法上面等待我們的IO事件完成(阻塞IO事件或者通過輪詢IO事件的方式),對於一部來說,所有的IO讀寫都交給了作業系統。這個時候,我們可以去做其他的事情,並不需要去完成真正的IO操作,當操作完成IO後會給我們的應用程式一個通知。
同步:阻塞到IO事件,阻塞到read或者write。這個時候我們就完全不能做自己的事情。讓讀寫方法加入到執行緒裡面,然後阻塞執行緒來實現,對執行緒的效能開銷比較大。
4.BIO(Block IO)於NIO(Non-Block IO)對比:
IO模型 |
IO |
NIO |
方式 |
從硬碟到記憶體 |
從記憶體到硬碟 |
通訊 |
面向流 |
面向緩衝 |
處理 |
阻塞IO(多執行緒) |
非阻塞IO(反應堆Reactor) |
觸發 |
無 |
選擇器(輪詢機制) |
5.面向流與面向緩衝:
Java NIO 和 IO 之間第一個最大區別是,IO是面向流的,NIO是面向緩衝區的。Java IO 面向流意味著每次從流中讀取一個或多個位元組,直至讀取所有位元組,他們沒有被快取在任何地方。此外,它不能前後移動流中的資料。如果需要前後移動從流中讀取的資料,需要先將它快取到一個緩衝區。Java NIO 的緩衝導向方法略有不同。資料讀取到一個它稍後處理的緩衝區,需要時可在緩衝區中前後移動。這就增加了處理過程中的靈活性。但是,還需要檢查是否該緩衝區中包含所有您需要處理的資料。而且,需確保當更多的資料讀入緩衝區時,不要覆蓋緩衝區裡尚未處理的資料。
6.socket(套接字):Socket clinet = new Socket("localhost",8080)
網路上兩個程式通過一個雙向的通訊連線實現資料的交換,這個連線的一端稱為一個socket。
TCP/IP協議幫我們解決能夠實現兩臺主機之間的程序通訊,網路層的IP地址可以唯一標識網路中的主機,傳輸層的協議+埠可以唯一標識主機中的應用程式(程序),這樣利用三元組(IP地址、協議、埠)就可以標識網路的程序。
NIO各沒款詳細解析
一、深入剖析 Buffer
在談到緩衝區時,我們說緩衝區物件本質上是一個數組,但它其實是一個特殊的陣列,緩衝區物件內建了一些機制,
能夠跟蹤和記錄緩衝區的狀態變化情況,如果我們使用get()方法從緩衝區獲取資料後者使用put()方法把資料寫入緩
衝區,都會引起緩衝區狀態的變化。
在緩衝區中,最重要的屬性有下面三個,它們一起合作完成對緩衝區內部狀態的變化跟蹤:
position:指定了下一個將要被寫入或者讀取的元素索引,它的值由get()/put()方法自動更新,在新建立一個Buffer
物件時,position被初始化為0
capacity 陣列容量
limit 可操作範圍
position 標記/遊標 get和put時候到哪個位置 相當於陣列索引 index
flip() 把當前緩衝區固定
二、緩衝區的分配
1、分配指定帶下的緩衝區
2、包裝一個現有的陣列 ByteBuffer.wrap(new byte[10])
3、緩衝區的切片 Slice 0<position<limit<capacity 在position和limit之間分成不同的子緩衝區
4、只讀緩衝區 buffer.asReadOnlyBuffer() 這個緩衝區的資料不能put()只能get()
5、直接緩衝區 ByteBuffer.allocateDirect(1024) Java程式再怎麼分配都是在JVM允許範圍內分配(堆疊區),
類似C語言,直接呼叫C語言模組,最底層。從作業系統中直接分配記憶體,netty中的零拷貝。
6、記憶體對映檔案I/O,在記憶體中可讀可寫,直接持久化硬碟
三、通道 Channel
通道是一個物件,通過它可以讀取和寫入資料,當然了所有資料都通過Buffer物件來處理。我們永遠不會將位元組直接
寫入通道中,相反是將資料寫入包含一個或者多個位元組的緩衝區。同樣不會直接從通道中讀取位元組,而是將資料從通
道讀入緩衝區,再從緩衝區獲取這個位元組。在NIO中,提供了多種通道物件,而所有的通道物件都實現了Channel介面。
使用NIO讀取資料
在前面我們說過,任何時候讀取資料,都不是直接從通道讀取,而是從通道讀取到緩衝區。
所以使用NIO讀取資料分為以下三個步驟:
1.從FileInputStream獲取Channel
2.建立Buffer
3.將資料從Channel讀取到Buffer中
四、反應堆 Reactor
1、阻塞I/O通訊模型
加入現在你對阻塞I/O有一定的瞭解,我們知道阻塞I/O在呼叫InputStream.read()方法時是阻塞的,它會一直
等到資料到來時(或超時)才會返回;同樣,在呼叫ServerSocket.accept()方法時,也會一直阻塞到有客戶端
連線才會返回,每個客戶端連線過來後,服務端都會啟動一個執行緒去處理該客戶端的請求。
缺點:
a.當客戶端多時,會建立大量的處理執行緒。而且每個執行緒都要佔用棧空間和cpu時間
b.阻塞可能帶來頻繁的上下文切換,而且大部分上下文切換是可能無意義的
2、Java NIO原理及通訊模型
Java NIO是在jdk 1.4開始使用的,稱為新的I/O或者是非阻塞式I/O
工作原理:
a.由一個專門的執行緒來處理所有的IO時間,並負責分發。
b.事件驅動機制:事件到的時候觸發,而不是同步的去監視事件
c.執行緒通訊:執行緒之間通過wait,notify等方式通訊,保證每次上下文切換都是有意義的,減少不必要的執行緒切換
(每個執行緒的處理流程大概都是讀取資料、解碼、計算處理、編碼、傳送響應)
五、選擇器 Selector
傳統的Server/Clinet模式會基於trp(Thread pre Request),伺服器會為每個客戶端請求建立一個執行緒,由該執行緒單獨
負責處理一個客戶端請求。這種模式帶來的一個問題是執行緒資料的劇增,大量的執行緒會增大伺服器的開銷。大多數的實現
為了避免這個問題,都採用了執行緒池模型,並設定執行緒池執行緒的最大數量,這又帶來了新的問題,如果執行緒池中有200個線
程,而又200個使用者都在進行大檔案下載,會導致201個使用者的請求無法及時處理,即便第201個使用者只想請求一個幾KB大小的頁面。
NIO中非阻塞I/O採用了基於Reactor模式的工作方式,I/O呼叫不會被阻塞,相反是註冊感興趣的特定I/O事件,如可讀資料
到達,新的套接字連線等等,在發生特定事件時,系統再通知我們。NIO中實現非阻塞I/O的核心物件就是Selector,Selector
就是註冊各種I/O事件池,而且當那些事情發生時,就是這個物件告訴我們所發生的事件。
NIOServer程式碼:
package com.sinosoft.study.nio.demo;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.util.Iterator;
import java.util.Set;
public class NIOServer {
private int port = 8080;
private Selector selector;
public NIOServer(int port){
try{
this.port = port;
// ServerSocketChannel是一個可以監聽新進來的TCP連線的通道,就像標準IO中的ServerSocket一樣
// 獲得一個ServerSocket通道
ServerSocketChannel server = ServerSocketChannel.open();
// 預設為阻塞,手動改為非阻塞
server.configureBlocking(false);
// 將該通道對於的 ServerSocket 繫結到 port埠號
server.socket().bind(new InetSocketAddress(port));
// 獲得一個通道選擇器
selector = Selector.open();
/*
* 將通道選擇器和該通道繫結,併為該通道註冊selectionKey.OP_ACCEPT事件
* 註冊該事件後,當事件到達的時候,selector.select()會返回,
* 如果事件沒有到達selector.select()會一直阻塞
*/
server.register(selector, SelectionKey.OP_ACCEPT);
System.out.println("伺服器準備就緒,監聽埠是:"+ this.port);
}catch(Exception e){
e.printStackTrace();
}
}
public void listen(){
try {
System.out.println("start server");
// 輪詢
while (true) {
// 當註冊事件到達時,方法返回,否則該方法會一直阻塞
int wait = this.selector.select();
if(wait == 0){ continue;}
// 獲得selector中選中的迭代器,選中註冊的事件
Set<SelectionKey> keys = this.selector.selectedKeys();
Iterator<SelectionKey> i = keys.iterator();
while(i.hasNext()){
SelectionKey key = i.next();
process(key);
i.remove();
}
}
}catch (Exception e){
e.printStackTrace();
}
}
public void process(SelectionKey key) throws Exception {
ByteBuffer buffer = ByteBuffer.allocate(1024);
// 客戶端請求連線事件
if(key.isAcceptable()){
ServerSocketChannel server = (ServerSocketChannel) key.channel();
// 獲得和客戶端連線的通道
SocketChannel client = server.accept();
client.configureBlocking(false);
// 在客戶端連線成功之後,為了可以接收到客戶端的資訊,需要給通道設定讀的許可權
client.register(selector,SelectionKey.OP_READ);
}else if(key.isReadable()){
SocketChannel client = (SocketChannel) key.channel();
int len = client.read(buffer);
if(len > 0){
buffer.flip();
String content = new String(buffer.array(),0,len);
System.out.println(content);
client.register(selector,SelectionKey.OP_WRITE);
}
buffer.clear();
}else if(key.isWritable()){
SocketChannel client = (SocketChannel) key.channel();
buffer = buffer.wrap("Hello Wold".getBytes());
client.read(buffer);
}
}
public static void main(String[] args) {
new NIOServer(8080).listen();
}
}
BIO客戶端程式碼:
package com.sinosoft.study.bio;
import java.io.IOException;
import java.io.OutputStream;
import java.net.Socket;
import java.net.UnknownHostException;
import java.util.UUID;
public class BIOClient {
public static void main(String[] args) throws UnknownHostException, IOException{
// 獲取一個Socket通道
Socket client = new Socket("localhost",8080);
// 輸入流通道開啟
OutputStream os = client.getOutputStream();
String uuid = UUID.randomUUID().toString();
System.out.println("要輸出的內容為:" + uuid);
os.write(uuid.getBytes());
os.close();
client.close();
}
}
BIO服務端程式碼:
package com.sinosoft.study.bio;
import java.io.IOException;
import java.io.InputStream;
import java.net.ServerSocket;
import java.net.Socket;
public class BIOServer {
ServerSocket server;
// 伺服器
public BIOServer(int port){
try{
// 把Socket服務端啟動
server = new ServerSocket(port);
System.out.println("BIO服務已啟動,監聽的埠號是:" + port);
}catch(IOException e){
e.printStackTrace();
}
}
/**
* @Description: 開始監聽,並處理邏輯
* @Param:
* @return:
* @Author: Mr.WangYu
* @Date: 2018/11/11
* @Time: 14:29
*/
public void listener() throws IOException{
// 迴圈監聽
while(true){
// 等待客戶端的連線,阻塞方法
Socket client = server.accept();
InputStream is = client.getInputStream();
byte []b = new byte[1024];
int num = is.read(b);
if (num > 0){
String msg = new String(b,0,num);
System.out.println("收到:" + msg);
}
}
}
public static void main(String[] args) throws IOException{
new BIOServer(8080).listener();
}
}
贈品:AIO服務端程式碼
package com.sinosoft.study.aio;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.AsynchronousServerSocketChannel;
import java.nio.channels.AsynchronousSocketChannel;
import java.nio.channels.CompletionHandler;
public class AIOServer {
private int port = 8080;
public AIOServer(int port){
this.port = port;
}
public void listen(){
try {
AsynchronousServerSocketChannel server = AsynchronousServerSocketChannel.open();
server.bind(new InetSocketAddress(this.port));
server.accept(null, new CompletionHandler<AsynchronousSocketChannel, Object>() {
@Override
public void completed(AsynchronousSocketChannel client, Object attachment) {
ByteBuffer buffer = ByteBuffer.allocate(1024);
client.read(buffer);
buffer.flip();
System.out.println("111111111111");
System.out.println(new String(buffer.array()));
}
@Override
public void failed(Throwable exc, Object attachment) {
}
});
}catch (Exception e){
e.printStackTrace();
}
try {
Thread.sleep(Integer.MAX_VALUE);
}catch (Exception e){
e.printStackTrace();
}
}
public static void main(String[] args) {
new AIOServer(8080).listen();
}
}