1. 程式人生 > >NIO原始碼詳解

NIO原始碼詳解

阻塞io和無阻塞io
阻塞io是指jdk1.4之前版本面向流的io,服務端需要對每個請求建立一堆執行緒等待請求,而客戶端傳送請求後,先諮詢服務端是否有執行緒相應,如果沒有則會一直等待或者遭到拒 絕請求,如果有的話,客戶端會執行緒會等待請求結束後才繼續執行。
當併發量大,而後端服務或客戶端處理資料慢時就會產生產生大量執行緒處於等待中,即上述的阻塞。

 

無阻塞io是使用單執行緒或者只使用少量的多執行緒,每個連線共用一個執行緒,當處於等待(沒有事件)的時候執行緒資源可以釋放出來處理別的請求,通過事件驅動模型當有accept/read/write等事件發生後通知(喚醒)主執行緒分配資源來處理相關事件。java.nio.channels.Selector就是在該模型中事件的觀察者,可以將多個SocketChannel的事件註冊到一個Selector上,當沒有事件發生時Selector處於阻塞狀態,當SocketChannel有accept/read/write等事件發生時喚醒Selector。
 



 這個Selector是使用了單執行緒模型,主要用來描述事件驅動模型,要優化效能需要一個好的執行緒模型來使用,目前比較好的nio框架有Netty,apache的mina等。執行緒模型這塊後面再分享,這裡重點研究Selector的阻塞和喚醒原理。
退出阻塞的方式有:register在selector上的socketChannel處於就緒狀態(放在pollArray中的socketChannel的FD就緒) 或者 第1節中放在pollArray中的wakeupSourceFd就緒。前者(socketChannel)就緒喚醒應證了文章開始的阻塞->事件驅動->喚醒的過程,後者(wakeupSourceFd)就是下面要看的主動wakeup。

這裡建立了一個管道pipe,並對pipe的source端的POLLIN事件感興趣,addWakeupSocket方法將source的POLLIN事件標識為感興趣的,當sink端有資料寫入時,source對應的檔案描述描wakeupSourceFd就會處於就緒狀態。(事實上windows就是通過向管道中寫資料來喚醒阻塞的選擇器的)從以上程式碼可以看出:通道的開啟實際上是構造了一個SelectorImpl物件

subSelector.poll() 是select的核心,由native函式poll0實現,readFds、writeFds 和exceptFds陣列用來儲存底層select的結果,陣列的第一個位置都是存放發生事件的socket的總數,其餘位置存放發生事件的socket控制代碼fd

影象詳解

1,Selector.open()

Pipe.open()開啟一個管道 拿到wakeupSourceFd和wakeupSinkFd兩個檔案描述符;把喚醒端的檔案描述符(wakeupSourceFd)放到pollWrapper裡; 上圖中最下面那部分建立pipe的過程,windows下的實現是建立兩個本地的socketChannel,然後連線(連結的過程通過寫一個隨機long做兩個socket的連結校驗),兩個socketChannel分別實現了管道的source與sink端。
source端由前面提到的WindowsSelectorImpl放到了pollWrapper中(pollWrapper.addWakeupSocket(wakeupSourceFd, 0))

pollWrapper用Unsafe類申請一塊實體記憶體,存放註冊時的socket控制代碼fdVal和event的資料結構pollfd,其中pollfd共8位元組,0~3位元組儲存socket控制代碼,4~7位元組儲存event

先了解一下Unsafe的基本操作

//分配var1位元組大小的記憶體,返回起始地址偏移量 
 public native long allocateMemory(long var1);
//重新給var1起始地址的記憶體分配長度為var3位元組大小的記憶體,返回新的記憶體起始地址偏移量
 public native long reallocateMemory(long var1, long var3);
 //釋放起始地址為var1的記憶體
 public native void freeMemory(long var1);

pollWrapper申請了一個64個位元組的對外記憶體空間,address為記憶體空間的開始地址

PollArrayWrapper pollWrapper = new PollArrayWrapper(8);
PollArrayWrapper(int var1) { int var2 = var1 * SIZE_POLLFD; 
this.pollArray = new AllocatedNativeObject(var2, true); 
this.pollArrayAddress = this.pollArray.address(); this.size = var1; }
protected NativeObject(int var1, boolean var2) {
    if(!var2) {
        this.allocationAddress = unsafe.allocateMemory((long)var1);
        this.address = this.allocationAddress;
    } else {
        int var3 = pageSize();
        long var4 = unsafe.allocateMemory((long)(var1 + var3));
        this.allocationAddress = var4;
        this.address = var4 + (long)var3 - (var4 & (long)(var3 - 1));
    }

}

pollWrapper.addWakeupSocket(wakeupSourceFd, 0),把喚醒端的檔案描述符(wakeupSourceFd)放到pollWrapper裡,本次操作pollfd會使用6個位元組

void addWakeupSocket(int var1, int var2) {
    this.putDescriptor(var2, var1);
    this.putEventOps(var2, Net.POLLIN);
}
void putDescriptor(int var1, int var2) {
    this.pollArray.putInt(SIZE_POLLFD * var1 + 0, var2);
}

void putEventOps(int var1, int var2) {
    this.pollArray.putShort(SIZE_POLLFD * var1 + 4, (short)var2);
}

實際上pollfd的結構

2,Channel.Register()

3,Selector.select()

4,SelectionKey.cancel()

   上圖淺藍色部分

網上找的一些輔助理解圖片