1. 程式人生 > >NIO之終極Selctor原始碼分析

NIO之終極Selctor原始碼分析

回顧一下Selector,多路複用器Selector可以管理多個Channel,可以向Selector註冊感興趣的事件,當事件就緒,通過Selector.selector方法獲取註冊的事件,進行相應的操作。

Selector selector = Selector.open();
ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
serverSocketChannel.configureBlocking(false);
serverSocketChannel.socket().bind(new InetSocketAddress
(1234)); serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT); while (selector.select() > 0) { Set<SelectionKey> keys = selector.selectedKeys(); Iterator<SelectionKey> iterator = keys.iterator(); while (iterator.hasNext()) { SelectionKey key = iterator.next(); iterator.
remove(); if (key.isAcceptable()) { ... } else if (key.isReadable() && key.isValid()) { ... } keys.remove(key); } }
1. 例項化分析

一般我們獲取Selector通過Selector.open()獲取,

public static Selector open() throws IOException {
        return SelectorProvider.provider().openSelector
(); }

Selector內部是通過SelectorProvider類的Provider方法實現。

public static SelectorProvider provider() {
    synchronized (lock) {
        if (provider != null)
            return provider;
        return AccessController.doPrivileged(
            new PrivilegedAction<SelectorProvider>() {
                public SelectorProvider run() {
                        if (loadProviderFromProperty())
                            return provider;
                        if (loadProviderAsService())
                            return provider;
                        provider = sun.nio.ch.DefaultSelectorProvider.create();
                        return provider;
                    }
                });
    }
}

這裡使用單例模式,保證只有一個SelectorProvider。provider由sun.nio.ch.DefaultSelectorProvider.create()建立。create方法內部通過系統的名稱來建立建立SelectorProvider,在這裡建立了sun.nio.ch.EPollSelectorProvider

 public static SelectorProvider create() {
        String var0 = (String)AccessController.doPrivileged(new GetPropertyAction("os.name"));
        if (var0.equals("SunOS")) {
            return createProvider("sun.nio.ch.DevPollSelectorProvider");
        } else {
            return (SelectorProvider)(var0.equals("Linux") ? createProvider("sun.nio.ch.EPollSelectorProvider") : new PollSelectorProvider());
        }
    }

繼續回到Selector的open()中,獲取到SelecProvider例項之後,繼續呼叫openSelector(),很自然進入EPollSelectorProvider的openSelector()方法:

public AbstractSelector openSelector() throws IOException {
    return new EPollSelectorProvider(this);
  }

進入EPollSelectorProvider的建構函式

 EPollSelectorImpl(SelectorProvider var1) throws IOException {
        super(var1);
        long var2 = IOUtil.makePipe(false);
        this.fd0 = (int)(var2 >>> 32);
        this.fd1 = (int)var2;
        this.pollWrapper = new EPollArrayWrapper();
        this.pollWrapper.initInterrupt(this.fd0, this.fd1);
        this.fdToKey = new HashMap();
    }

初始化的主要步驟如下:

  • 首先通過IOUtil.makePipe(false)返回了一個非堵塞的管道(pipe),底層是通過Linux的pipe系統呼叫實現的;返回兩個int常量,分別指向管道的讀、寫檔案描述符。讀端在高32位,寫端在低32位
  • 接著定義了EPollArrayWrapper變數,並呼叫initInterrup方法,將fd0註冊到epoll;
  • 最後定義了一個map型別的變數fdToKey,將channel的檔案描述符ID和SelectionKey建立對映關係,SelectionKey中儲存了Channel Selector 感興趣的事件。

EpollArrayWapper is what? EpollArrayWapper將Linux的epoll相關係統呼叫封裝成了native方法供EpollSelectorImpl使用,EpollArrayWapper類中有幾個比較重要的Navtive方法:

private native int epollCreate();
private native void epollCtl(int paramInt1, int paramInt2, int paramInt3, int paramInt4);
private native int epollWait(long paramLong1, int paramInt1, long paramLong2, int paramInt2) throws IOException;

看起來似乎比較眼熟,好像和Epoll的三個方法有些雷同,沒錯這三個native方法正是對上述epoll系列系統呼叫的包裝。

2. Resigter

下面分析serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT)的底層邏輯

public final SelectionKey register(Selector sel, int ops, Object att) throws ClosedChannelException { 
synchronized (regLock) { 
	SelectionKey k = findKey(sel); 
	if (k != null) { 
		k.interestOps(ops); 
		k.attach(att); 
	}
	 if (k == null) { // New registration synchronized (keyLock) { 
		 if (!isOpen()) throw new ClosedChannelException(); 
		 k = ((AbstractSelector)sel).register(this, ops, att); addKey(k); 
	 }
	  	} return k; 
	 } 
 }
  • 如果該channel和selector已經註冊過,則直接新增事件和附件。
  • 否則通過selector實現註冊過程。 繼續呼叫SelectorImp的register方法:
protected final SelectionKey register(AbstractSelectableChannel ch, int ops, Object attachment) { 
	SelectionKeyImpl k = new SelectionKeyImpl((SelChImpl)ch, this);
	 k.attach(attachment); 
	 synchronized (publicKeys) { 
		 implRegister(k); 
	 } 
	 k.interestOps(ops);
	 return k; 
  }
  1. 以當前channel和selector為引數,初始化SelectionKeyImpl 物件selectionKeyImpl ,並新增附件attachment。

  2. implRegister(k)將channel註冊到epoll中

  3. k.interestOps(int) 的內部呼叫了nioInterestOps:

public SelectionKey nioInterestOps(int var1) {
            this.channel.translateAndSetInterestOps(var1, this);
            this.interestOps = var1;
            return this;
    }
  • translateAndSetInterestOps方法會將註冊的感興趣的事件和其對應的檔案描述儲存到EPollArrayWrapper物件的eventsLow或eventsHigh中,這是給底層實現epoll_wait時使用的。以SeverSocketChannel的實現為例。
 public void translateAndSetInterestOps(int var1, SelectionKeyImpl var2) {
       int var3 = 0;
       if ((var1 & 16) != 0) {
           var3 |= Net.POLLIN;
       }
       var2.selector.putEventOps(var2, var3);
   }
   //EpollSelector中
    public void putEventOps(SelectionKeyImpl var1, int var2) {
         SelChImpl var3 = var1.channel;
         this.pollWrapper.setInterest(var3.getFDVal(), var2);
    }
  • 同時該操作還會將設定SelectionKey的interestOps欄位,這是給我們程式設計師獲取使用的。

EPollSelectorImpl.implRegister:

protected void implRegister(SelectionKeyImpl var1) {
     SelChImpl var2 = var1.channel;
     int var3 = Integer.valueOf(var2.getFDVal());
     this.fdToKey.put(var3, var1);
     this.pollWrapper.add(var3);
     this.keys.add(var1);
}
  • 將channel對應的fd(檔案描述符)和對應的SelectionKeyImpl放到fdToKey對映表中。
  • 將channel對應的fd(檔案描述符)新增到EPollArrayWrapper中,並強制初始化fd的事件為0 ( 強制初始更新事件為0,因為該事件可能存在於之前被取消過的註冊中。)
  • 將selectionKey放入到keys集合中。

3. Select

Selector.select方法最終呼叫的是doSelect方法:

protected int doSelect(long paramLong) throws IOException{
	//處理待取消的SelectionKey(呼叫SelectionKey.cancel()方法取消)
	processDeregisterQueue();
	try {
		begin();
		this.pollWrapper.poll(paramLong);
	} finally {
		end();
	}
	processDeregisterQueue();
	int i = updateSelectedKeys();
	if (this.pollWrapper.interrupted())	{
		this.pollWrapper.putEventOps(this.pollWrapper.interruptedIndex(), 0);
		synchronized (this.interruptLock) {
			this.pollWrapper.clearInterrupted();
			IOUtil.drain(this.fd0);
			this.interruptTriggered = false;
		}
	}
	return i;
}

未完!