1. 程式人生 > 其它 >tomcat請求處理分析(三) 繫結本地埠監聽請求

tomcat請求處理分析(三) 繫結本地埠監聽請求

1.1.1.1  bind方法

注意:這個bind可能在load的過程就已經載入,這裡只是驗證

   NioEndpoint就是使用Java中的NIO技術,來實行對Socket的處理。它主要包含兩個部業務處理部分:Poller執行緒組和Acceptor執行緒組。

1.1.1.1.1     解析過程

   首先我們應該知道其bind方法做了一些什麼操作,程式碼如下:

public void bind() throws Exception { // 開啟監聽通道 serverSock =ServerSocketChannel.open(); socketProperties.setProperties(serverSock.socket()); InetSocketAddress addr= (getAddress()!=null?new InetSocketAddress(getAddress(),getPort()):new InetSocketAddress(getPort())); serverSock.socket().bind(addr,getBacklog()); serverSock.configureBlocking(true); //mimic APRbehavior serverSock.socket().setSoTimeout(getSocketProperties().getSoTimeout());

if (acceptorThreadCount==0) { // FIXME:Doesn't seem to work that well with multiple accept threads acceptorThreadCount = 1; } if (pollerThreadCount<=0) { //minimum one pollerthread pollerThreadCount = 1; } stopLatch = new CountDownLatch(pollerThreadCount); // Initialize SSL ifneeded if (isSSLEnabled()){         SSLUtil sslUtil = handler.getSslImplementation().getSSLUtil(this); sslContext =sslUtil.createSSLContext(); sslContext.init(wrap(sslUtil.getKeyManagers()), sslUtil.getTrustManagers(), null); SSLSessionContextsessionContext = sslContext.getServerSessionContext();         if (sessionContext != null) {             sslUtil.configureSessionContext(sessionContext); } // Determine which ciphersuites and protocols to enable enabledCiphers =sslUtil.getEnableableCiphers(sslContext); enabledProtocols =sslUtil.getEnableableProtocols(sslContext); } if (oomParachute>0)reclaimParachute(true); selectorPool.open(); }

1.1.1.1.1.1例項化ServerSocketChannelImpl

serverSock =ServerSocketChannel.open();

其方法具體實現:

public static ServerSocketChannel open() throws IOException{ return SelectorProvider.provider().openServerSocketChannel(); }

   在這個方法中進行了兩步操作,第一步呼叫SelectorProvider的provider方法

public static SelectorProvider provider() { synchronized (lock

) { if (provider != null) return provider;

        //在與當前執行緒相同訪問控制權限的環境中,載入SelectorProvider例項  
        return AccessController.doPrivileged(
            new PrivilegedAction<SelectorProvider>() {
                public SelectorProvider run() {
                        if (loadProviderFromProperty())
                            return provider; //獲取系統配置的SelectorProvider  
                        if (loadProviderAsService())
                            return provider; //獲取類載入路徑下的SelectorProvider  
                        //載入預設的SelectorProvider  
                        provider = sun.nio.ch.DefaultSelectorProvider.create();
                        return provider;
                    }
                });
    }

}

判斷provider在當前程序是否已經被例項化過了,如果已經被例項化過了,那麼就直接返回當前provider,不再執行後面的程式碼;否者就執行後面的程式碼例項化provider,

AccessController.doPrivileged()在與當前執行緒相同訪問控制權限的環境中,載入SelectorProvider例項 

loadProviderFromProperty()這個函式判斷如果系統屬性java.nio.channels.spi.SelectorProvider 已經被定義了,則該屬性名看作具體提供者類的完全限定名。載入並例項化該類;如果此程序失敗,則丟擲未指定的錯誤。

   loadProviderAsService()這個函式判斷:如果在對系統類載入器可見的 jar 檔案中安裝了提供者類,並且該 jar 檔案包含資源目錄 META-INF/services 中名為java.nio.channels.spi.SelectorProvider 的提供者配置檔案,則採用在該檔案中指定的第一個類名稱。載入並例項化該類;如果此程序失敗,則丟擲未指定的錯誤。

最後,如果未通過上述的方式制定任何provider,則例項化系統預設的provider並返回該結果(一般情況下,都是這種情況。)

這個地方需要注意的是:這裡系統預設的provider在不同系統上是不一樣的,下面用一個表格來表示:

系統

provider

MacOSX

KQueueSelectorProvider

Linux

Windows

WindowsSelectorProvider

進入sun.nio.ch.DefaultSelectorProvider.create(); 這裡系統會根據不同的作業系統返回不同的provider;具體資訊在上面的表格

總結:該方法的作用完成建立Pipe,並把pipe的讀寫檔案描述符放入pollArray中,這個pollArray是Selector的樞紐

====================方法分界線=======================

上述是呼叫provider方法的具體過程,下面講解一下呼叫其之後繼續呼叫openServerSocketChannel的過程

以osx系統為例其返回了KQueueSelectorProvider,所以呼叫的方法是KQueueSelectorProvider.openServerSocketChannel

注意:其實這個方法不在KQueueSelectorProvider這個類中,而在其父類SelectorProviderImpl中,方法如下:

publicServerSocketChannelopenServerSocketChannel() throws IOException { return new ServerSocketChannelImpl(this);

}

即ServerSocketChannel.open()方法實際上是產生了一個子類ServerSocketChannelImpl的物件例項。其構造器如下:

ServerSocketChannelImpl(SelectorProvider var1) throws IOException {
    super(sp);
    this.fd = Net.serverSocket(true); //獲取ServerSocket的檔案描述符
    this.fdVal = IOUtil.fdVal(this.fd); //獲取檔案描述的id  
    this.state = ST_INUSE; //類變數 private static final int ST_INUSE = 0;

}

  所以在這裡,serverSock = ServerSocketChannel.open();這個方法的作用是例項化ServerSocketChannelImpl,其成員變數具體實現程式碼如下:

//獲取ServerSocket的檔案描述符

class Net
{
   private static volatile boolean checkedIPv6 = false;
    private static volatile boolean isIPv6Available;
    public static final int SHUT_RD = 0;//關閉讀操作
    public static final int SHUT_WR = 1;//關閉寫操作
    public static final int SHUT_RDWR = 2;//關閉讀寫操作
    static 
    {
        //載入nio和net資源庫
        Util.load();
        initIDs();
    }
    private static native void initIDs();
    //預設協議
    static final ProtocolFamily UNSPEC = new ProtocolFamily() {
    public String name()
    {
        return "UNSPEC";
    }

    };
    //獲取ServerSocket檔案描述
    static FileDescriptor serverSocket(boolean flag)
    {
        return IOUtil.newFD(socket0(isIPv6Available(), flag, true));
    }
    private static native int socket0(boolean flag, boolean flag1, boolean flag2);
}
=============================================================
class IOUtil
{
    static final int IOV_MAX = iovMax();
    static final boolean $assertionsDisabled = !sun/nio/ch/IOUtil.desiredAssertionStatus();
    static 
    {
        Util.load();
    }
    建立檔案描述符
    static FileDescriptor newFD(int i)
    {
        FileDescriptor filedescriptor = new FileDescriptor();
        setfdVal(filedescriptor, i);
        return filedescriptor;
    }
}

//獲取檔案描述的id      

static native int fdVal(FileDescriptor filedescriptor);
1.1.1.1.1.2   構建socket並設定相關屬性

socketProperties.setProperties(serverSock.socket());

serverSock.socket()的具體實現

publicServerSocket socket() {

    synchronized(stateLock) { // stateLock是一個new Object() 載入進行

        if(socket == null)

socket =ServerSocketAdaptor.create(this);

        returnsocket;

    }

}

============================create方法==============================

publicstatic ServerSocket create(ServerSocketChannelImpl ssc) {

           try {

 return new ServerSocketAdaptor(ssc);

           } catch (IOException x) {

               throw new Error(x);

           }

        }

==============================構造器=============================

private(ServerSocketChannelImpl ssc)

        throws IOException

    {

this.ssc = ssc;

}

====================ServerSocketChannelImpl類屬性===============

    private final ServerSocketChannelImpl ssc;

private volatile int timeout = 0;

===============================================================

   此方法返回的是一個ServerSocket物件,其中利用同步保證了socket是一個單例

到了這裡socketProperties.setProperties(serverSock.socket());這個方法就等價於socketProperties.setProperties(ServerSocket),其程式碼如下:

public void setProperties(ServerSocket socket) throws SocketException{ if (rxBufSize != null)         socket.setReceiveBufferSize(rxBufSize.intValue()); //設定輸入流緩衝大小     if (performanceConnectionTime!=null&&performanceLatency!=null&& performanceBandwidth != null)         socket.setPerformancePreferences(//設定網路傳輸指標相對重要性 performanceConnectionTime.intValue(), performanceLatency.intValue(), performanceBandwidth.intValue());     if (soReuseAddress!=null)         socket.setReuseAddress(soReuseAddress.booleanValue());     if (soTimeout != null && soTimeout.intValue()>= 0)         socket.setSoTimeout(soTimeout.intValue()); }

總結:這段程式碼的作用是建立socket例項並給當前socket設定一些屬性,包括輸入流緩衝區、網路傳輸三項指標的相對重要性、埠是否可複用、設定讀取超時時間,其實在啟動過程中這些都是null,所以並沒有進行什麼設定

public int getReceiveBufferSize() throws SocketException

public void setReceiveBufferSize(int size) throwsSocketException

在預設情況下,輸入流的接收緩衝區是8096個位元組(8K)。這個值是Java所建議的輸入緩衝區的大小。如果這個預設值不能滿足要求,可以用setReceiveBufferSize方法來重新設定緩衝區的大小。但最好不要將輸入緩衝區設得太小,否則會導致傳輸資料過於頻繁,從而降低網路傳輸的效率。

如果底層的Socket實現不支援SO_RCVBUF選項,這兩個方法將會丟擲SocketException例外。必須將size設為正整數,否則setReceiveBufferSize方法將丟擲IllegalArgumentException例外

===================================================================

public void setPerformancePreferences(int connectionTime,intlatency,int bandwidth)

以上方法的三個引數表示網路傳輸資料的三項指標:

引數connectionTime:表示用最少時間建立連線。

引數latency:表示最小延遲。

引數bandwidth:表示最高頻寬。

setPerformancePreferences()方法用來設定這三項指標之間的相對重要性。可以為這些引數賦予任意的整數,這些整數之間的相對大小就決定了相應引數的相對重要性。例如,如果引數connectionTime為2,引數latency為1,而引數bandwidth為3,就表示最高頻寬最重要,其次是最少連線時間,最後是最小延遲。

public boolean getReuseAddress() throws SocketException          

public void setReuseAddress(boolean on) throws SocketException

錯誤的說法:

通過這個選項,可以使多個Socket物件繫結在同一個埠上。

正確的說明是:

如果埠忙,但TCP狀態位於 TIME_WAIT ,可以重用埠。如果埠忙,而TCP狀態位於其他狀態,重用埠時依舊得到一個錯誤資訊,丟擲“Addressalready in use: JVM_Bind”。如果你的服務程式停止後想立即重啟,不等60秒,而新套接字依舊使用同一埠,此時SO_REUSEADDR 選項非常有用。必須意識到,此時任何非期望資料到達,都可能導致服務程式反應混亂,不過這只是一種可能,事實上很不可能。

這個引數在Windows平臺與Linux平臺表現的特點不一樣。在Windows平臺表現的特點是不正確的,在Linux平臺表現的特點是正確的。

在Windows平臺,多個Socket新建立物件可以繫結在同一個埠上,這些新連線是非TIME_WAIT狀態的。這樣做並沒有多大意義。

在Linux平臺,只有TCP狀態位於 TIME_WAIT ,才可以重用埠。這才是正確的行為。

使用SO_REUSEADDR選項時有兩點需要注意:

    1.  必須在呼叫bind方法之前使用setReuseAddress方法來開啟SO_REUSEADDR選項。因此,要想使用SO_REUSEADDR選項,就不能通過Socket類的構造方法來繫結埠。

    2.  必須將繫結同一個埠的所有的Socket物件的SO_REUSEADDR選項都開啟才能起作用。如在例程4-12中,socket1和socket2都使用了setReuseAddress方法打開了各自的SO_REUSEADDR選項。

在Windows作業系統上執行上面的程式碼的執行結果如下:

這種結果是不正確的。

socket1.getReuseAddress():true

socket2.getReuseAddress():true

在Linux作業系統上執行上面的程式碼的執行結果如下:

這種結果是正確的。因為第一個連線不是TIME_WAIT狀態的,第二個連線就不能使用8899埠;

只有第一個連線是TIME_WAIT狀態的,第二個連線就才能使用8899埠;

public int getSoTimeout() throws SocketException

public void setSoTimeout(int timeout) throws SocketException

這個Socket選項在前面已經討論過。可以通過這個選項來設定讀取資料超時。當輸入流的read方法被阻塞時,如果設定timeout(timeout的單位是毫秒),那麼系統在等待了timeout毫秒後會丟擲一個InterruptedIOException例外。在丟擲例外後,輸入流並未關閉,你可以繼續通過read方法讀取資料。

如果將timeout設為0,就意味著read將會無限等待下去,直到服務端程式關閉這個Socket.這也是timeout的預設值。如下面的語句將讀取資料超時設為30秒:

1.1.1.1.1.3   建立套接字地址

InetSocketAddress addr = (getAddress()!=null?new InetSocketAddress(getAddress(),getPort()):new InetSocketAddress(getPort()));

建立套接字地址,並設定其埠

1.1.1.1.1.4   繫結地址和埠
serverSock.socket().bind(addr,getBacklog());
socket()是一個單例模式建立其例項,所以在這裡還是上面的ServerSocketChannelImpl例項,然後呼叫其bind方法,方法程式碼如下:
public ServerSocketChannel bind(SocketAddress socketaddress, int i)
        throws IOException
    {
        synchronized(lock)
        {
            if(!isOpen())
                //如果socket關閉,則丟擲ClosedChannelException
                throw new ClosedChannelException();
            if(isBound())
                 //如果已繫結,則丟擲AlreadyBoundException
                throw new AlreadyBoundException();
             //確定inetsocketaddress
            InetSocketAddress inetsocketaddress = socketaddress != null ? Net.checkAddress(socketaddress) : new InetSocketAddress(0);
            SecurityManager securitymanager = System.getSecurityManager();
            if(securitymanager != null)
                 //檢查地址埠監聽許可權
                securitymanager.checkListen(inetsocketaddress.getPort());
             //繫結前工作
            NetHooks.beforeTcpBind(fd, inetsocketaddress.getAddress(), inetsocketaddress.getPort());
             //實際地址繫結
            Net.bind(fd, inetsocketaddress.getAddress(), inetsocketaddress.getPort());
             //開啟監聽,如果引數i小於1,預設接受50個連線
            Net.listen(fd, i >= 1 ? i : 50);
            synchronized(stateLock)
            {
                 //更新ocalAddress
                localAddress = Net.localAddress(fd);
            }
        }
        return this;
    }

    從上面可以看出,bind首先檢查ServerSocket是否關閉,是否繫結地址,如果既沒有繫結也沒關閉,則檢查繫結的socketaddress是否正確或合法;然後通過Net工具類的bind(native)和listen(native),完成實際的

ServerSocket地址繫結和開啟監聽,如果繫結是開啟的引數小於1,則預設接受50個連線。

1.1.1.1.1.5   serverSock設定成阻塞IO

serverSock.configureBlocking(true);

程式碼如下:

public finalSelectableChannelconfigureBlocking(boolean block) throws IOException { synchronized (regLock) { if (!isOpen()) throw new ClosedChannelException();         if (blocking == block) return this;         if (block && haveValidKeys()) throw new IllegalBlockingModeException(); implConfigureBlocking(block); blocking = block; } return this; }

1.1.1.1.1.6   設定讀取超時時間

serverSock.socket().setSoTimeout(getSocketProperties().getSoTimeout());

1.1.1.1.1.7   初始化執行緒數
//初始化acceptor和poller執行緒數
if (acceptorThreadCount == 0) {
    // FIXME: Doesn't seem to work that well with multiple accept threads
    acceptorThreadCount = 1;
}
if (pollerThreadCount <= 0) {
    //minimum one poller thread
    pollerThreadCount = 1;
}
1.1.1.1.1.8   例項化執行緒同步輔助類
ountDownLatch類是一個同步計數器,構造時傳入int引數,該引數就是計數器的初始值,每呼叫一次countDown()方法,計數器減1,計數器大於0 時,await()方法會阻塞程式繼續執行
stopLatch = new CountDownLatch(pollerThreadCount);
參考連結:http://www.cnblogs.com/yezhenhan/archive/2012/01/07/2315652.html

   這個屬性的作用是為了在關閉的時候確定所有的pollers關閉才繼續向後執行

public void stopInternal() {     releaseConnectionLatch();     if (!paused) {         pause(); } if (running) { running = false; unlockAccept();         for (int i=0; pollers!=null &&i<pollers.length; i++) { if (pollers[i]==null) continue; pollers[i].destroy(); pollers[i] = null; } try { stopLatch.await(selectorTimeout+100, TimeUnit.MILLISECONDS); } catch (InterruptedExceptionignore) {         }         shutdownExecutor(); eventCache.clear(); nioChannels.clear(); processorCache.clear(); } }

1.1.1.1.1.9   NioSelectorPool例項設定屬性

selectorPool.open();

 其中selectorPool是成員變數

private NioSelectorPool selectorPool = new NioSelectorPool();

在分析selectorPool.open();這段程式碼之前,我們必須瞭解Selector open()這個方法是幹嘛,這個方法也在NioSelectorPool中

程式碼如下:

public staticSelector open() throws IOException {

      returnSelectorProvider.provider().openSelector();

}

   通過呼叫系統預設的SelectorProvider(這裡不同的系統會有不同的SelectorProvider實現類)的openSelector()方法來建立新的selector

SelectorProvider.provider()這個方法我們已經在上文分析過,這裡獲取的就是同一個KQueueSelectorProvider例項

後面呼叫的也就是KQueueSelectorProvider.openSelector();原始碼如下:

public AbstractSelector openSelector()throws IOException {

    returnnew KQueueSelectorImpl(this);

}

根據程式碼可以看出其例項化了一個KQueueSelectorImpl,這是一個選擇器,看一下選擇器的作用,Selector選擇器類管理著一個被註冊的通道集合的資訊和它們的就緒狀態。通道是和選擇器一起被註冊的,並且使用選擇器來更新通道的就緒狀態。當這麼做的時候,可以選擇將被激發的執行緒掛起,直到有就緒的的通道。

所以下面程式碼的則用是構建blockingSelector例項,並將KQueueSelectorImpl給注入sharedSelector,這兩個變數都是NioSelectorPool的屬性

public void open() throws IOException{ enabled = true; getSharedSelector();     if (SHARED) { blockingSelector = new NioBlockingSelector(); blockingSelector.open(getSharedSelector()); } }

protected Selector getSharedSelector() throws IOException{ if (SHARED && SHARED_SELECTOR==null) { synchronized ( NioSelectorPool.class ) { if ( SHARED_SELECTOR==null)  { synchronized (Selector.class) { SHARED_SELECTOR=Selector.open(); } log.info("Usinga shared selector for servlet write/read"); }         }     } return  SHARED_SELECTOR; }

public static Selector open() throws IOException{ return SelectorProvider.provider().openSelector(); }

     下面這個方法是建立一個輪詢執行緒,然後將選擇器賦值給這個公司,並設定起為守護執行緒

public void open(Selector selector) { sharedSelector = selector; poller = new BlockPoller(); poller.selector = sharedSelector; poller.setDaemon(true); poller.setName("NioBlockingSelector.BlockPoller-"+(++threadCounter)); poller.start(); }