tomcat請求處理分析(三) 繫結本地埠監聽請求
1.1.1.1 bind方法
注意:這個bind可能在load的過程就已經載入,這裡只是驗證
NioEndpoint就是使用Java中的NIO技術,來實行對Socket的處理。它主要包含兩個部業務處理部分:Poller執行緒組和Acceptor執行緒組。
1.1.1.1.1 解析過程
首先我們應該知道其bind方法做了一些什麼操作,程式碼如下:
public void bind() throws Exception { // 開啟監聽通道 serverSock =ServerSocketChannel.open(); socketProperties.setProperties(serverSock.socket()); InetSocketAddress addr= (getAddress()!=null?new InetSocketAddress(getAddress(),getPort()):new InetSocketAddress(getPort())); serverSock.socket().bind(addr,getBacklog()); serverSock.configureBlocking(true); //mimic APRbehavior serverSock.socket().setSoTimeout(getSocketProperties().getSoTimeout());
if (acceptorThreadCount==0) { // FIXME:Doesn't seem to work that well with multiple accept threads acceptorThreadCount = 1; } if (pollerThreadCount<=0) { //minimum one pollerthread pollerThreadCount = 1; } stopLatch = new CountDownLatch(pollerThreadCount); // Initialize SSL ifneeded if (isSSLEnabled()){ SSLUtil sslUtil = handler.getSslImplementation().getSSLUtil(this); sslContext =sslUtil.createSSLContext(); sslContext.init(wrap(sslUtil.getKeyManagers()), sslUtil.getTrustManagers(), null); SSLSessionContextsessionContext = sslContext.getServerSessionContext(); if (sessionContext != null) { sslUtil.configureSessionContext(sessionContext); } // Determine which ciphersuites and protocols to enable enabledCiphers =sslUtil.getEnableableCiphers(sslContext); enabledProtocols =sslUtil.getEnableableProtocols(sslContext); } if (oomParachute>0)reclaimParachute(true); selectorPool.open(); }
1.1.1.1.1.1例項化ServerSocketChannelImpl
serverSock =ServerSocketChannel.open();
其方法具體實現:
public static ServerSocketChannel open() throws IOException{ return SelectorProvider.provider().openServerSocketChannel(); }
在這個方法中進行了兩步操作,第一步呼叫SelectorProvider的provider方法
public static SelectorProvider provider() { synchronized (lock
//在與當前執行緒相同訪問控制權限的環境中,載入SelectorProvider例項
return AccessController.doPrivileged(
new PrivilegedAction<SelectorProvider>() {
public SelectorProvider run() {
if (loadProviderFromProperty())
return provider; //獲取系統配置的SelectorProvider
if (loadProviderAsService())
return provider; //獲取類載入路徑下的SelectorProvider
//載入預設的SelectorProvider
provider = sun.nio.ch.DefaultSelectorProvider.create();
return provider;
}
});
}
}
判斷provider在當前程序是否已經被例項化過了,如果已經被例項化過了,那麼就直接返回當前provider,不再執行後面的程式碼;否者就執行後面的程式碼例項化provider,
AccessController.doPrivileged()在與當前執行緒相同訪問控制權限的環境中,載入SelectorProvider例項
loadProviderFromProperty()這個函式判斷如果系統屬性java.nio.channels.spi.SelectorProvider 已經被定義了,則該屬性名看作具體提供者類的完全限定名。載入並例項化該類;如果此程序失敗,則丟擲未指定的錯誤。
loadProviderAsService()這個函式判斷:如果在對系統類載入器可見的 jar 檔案中安裝了提供者類,並且該 jar 檔案包含資源目錄 META-INF/services 中名為java.nio.channels.spi.SelectorProvider 的提供者配置檔案,則採用在該檔案中指定的第一個類名稱。載入並例項化該類;如果此程序失敗,則丟擲未指定的錯誤。
最後,如果未通過上述的方式制定任何provider,則例項化系統預設的provider並返回該結果(一般情況下,都是這種情況。)
這個地方需要注意的是:這裡系統預設的provider在不同系統上是不一樣的,下面用一個表格來表示:
系統 |
provider |
---|---|
MacOSX |
KQueueSelectorProvider |
Linux |
|
Windows |
WindowsSelectorProvider |
進入sun.nio.ch.DefaultSelectorProvider.create();
這裡系統會根據不同的作業系統返回不同的provider;具體資訊在上面的表格
總結:該方法的作用完成建立Pipe,並把pipe的讀寫檔案描述符放入pollArray中,這個pollArray是Selector的樞紐
====================方法分界線=======================
上述是呼叫provider方法的具體過程,下面講解一下呼叫其之後繼續呼叫openServerSocketChannel的過程
以osx系統為例其返回了KQueueSelectorProvider,所以呼叫的方法是KQueueSelectorProvider.openServerSocketChannel
注意:其實這個方法不在KQueueSelectorProvider這個類中,而在其父類SelectorProviderImpl中,方法如下:
publicServerSocketChannelopenServerSocketChannel() throws IOException { return new ServerSocketChannelImpl(this);
}
即ServerSocketChannel.open()方法實際上是產生了一個子類ServerSocketChannelImpl的物件例項。其構造器如下:
ServerSocketChannelImpl(SelectorProvider var1) throws IOException {
super(sp);
this.fd = Net.serverSocket(true); //獲取ServerSocket的檔案描述符
this.fdVal = IOUtil.fdVal(this.fd); //獲取檔案描述的id
this.state = ST_INUSE; //類變數 private static final int ST_INUSE = 0;
}
所以在這裡,serverSock = ServerSocketChannel.open();這個方法的作用是例項化ServerSocketChannelImpl,其成員變數具體實現程式碼如下:
//獲取ServerSocket的檔案描述符
class Net
{
private static volatile boolean checkedIPv6 = false;
private static volatile boolean isIPv6Available;
public static final int SHUT_RD = 0;//關閉讀操作
public static final int SHUT_WR = 1;//關閉寫操作
public static final int SHUT_RDWR = 2;//關閉讀寫操作
static
{
//載入nio和net資源庫
Util.load();
initIDs();
}
private static native void initIDs();
//預設協議
static final ProtocolFamily UNSPEC = new ProtocolFamily() {
public String name()
{
return "UNSPEC";
}
};
//獲取ServerSocket檔案描述
static FileDescriptor serverSocket(boolean flag)
{
return IOUtil.newFD(socket0(isIPv6Available(), flag, true));
}
private static native int socket0(boolean flag, boolean flag1, boolean flag2);
}
=============================================================
class IOUtil
{
static final int IOV_MAX = iovMax();
static final boolean $assertionsDisabled = !sun/nio/ch/IOUtil.desiredAssertionStatus();
static
{
Util.load();
}
建立檔案描述符
static FileDescriptor newFD(int i)
{
FileDescriptor filedescriptor = new FileDescriptor();
setfdVal(filedescriptor, i);
return filedescriptor;
}
}
//獲取檔案描述的id
static native int fdVal(FileDescriptor filedescriptor);
1.1.1.1.1.2 構建socket並設定相關屬性
socketProperties.setProperties(serverSock.socket());
serverSock.socket()的具體實現
publicServerSocket socket() {
synchronized(stateLock) {
// stateLock是一個new Object() 載入進行
if(socket == null)
socket =ServerSocketAdaptor.create(this);
returnsocket;
}
}
============================create方法==============================
publicstatic ServerSocket create(ServerSocketChannelImpl ssc) {
try {
return new ServerSocketAdaptor(ssc);
} catch (IOException x) {
throw new Error(x);
}
}
==============================構造器=============================
private(ServerSocketChannelImpl ssc)
throws IOException
{
this.ssc = ssc;
}
====================ServerSocketChannelImpl類屬性===============
private final ServerSocketChannelImpl ssc;
private volatile int timeout = 0;
===============================================================
此方法返回的是一個ServerSocket物件,其中利用同步保證了socket是一個單例
到了這裡socketProperties.setProperties(serverSock.socket());這個方法就等價於socketProperties.setProperties(ServerSocket),其程式碼如下:
public void setProperties(ServerSocket socket) throws SocketException{ if (rxBufSize != null) socket.setReceiveBufferSize(rxBufSize.intValue()); //設定輸入流緩衝大小 if (performanceConnectionTime!=null&&performanceLatency!=null&& performanceBandwidth != null) socket.setPerformancePreferences(//設定網路傳輸指標相對重要性 performanceConnectionTime.intValue(), performanceLatency.intValue(), performanceBandwidth.intValue()); if (soReuseAddress!=null) socket.setReuseAddress(soReuseAddress.booleanValue()); if (soTimeout != null && soTimeout.intValue()>= 0) socket.setSoTimeout(soTimeout.intValue()); }
總結:這段程式碼的作用是建立socket例項並給當前socket設定一些屬性,包括輸入流緩衝區、網路傳輸三項指標的相對重要性、埠是否可複用、設定讀取超時時間,其實在啟動過程中這些都是null,所以並沒有進行什麼設定
public int getReceiveBufferSize() throws SocketException
public void setReceiveBufferSize(int size) throwsSocketException
在預設情況下,輸入流的接收緩衝區是8096個位元組(8K)。這個值是Java所建議的輸入緩衝區的大小。如果這個預設值不能滿足要求,可以用setReceiveBufferSize方法來重新設定緩衝區的大小。但最好不要將輸入緩衝區設得太小,否則會導致傳輸資料過於頻繁,從而降低網路傳輸的效率。
如果底層的Socket實現不支援SO_RCVBUF選項,這兩個方法將會丟擲SocketException例外。必須將size設為正整數,否則setReceiveBufferSize方法將丟擲IllegalArgumentException例外
===================================================================
public void setPerformancePreferences(int connectionTime,intlatency,int bandwidth)
以上方法的三個引數表示網路傳輸資料的三項指標:
引數connectionTime:表示用最少時間建立連線。
引數latency:表示最小延遲。
引數bandwidth:表示最高頻寬。
setPerformancePreferences()方法用來設定這三項指標之間的相對重要性。可以為這些引數賦予任意的整數,這些整數之間的相對大小就決定了相應引數的相對重要性。例如,如果引數connectionTime為2,引數latency為1,而引數bandwidth為3,就表示最高頻寬最重要,其次是最少連線時間,最後是最小延遲。
public boolean getReuseAddress() throws SocketException
public void setReuseAddress(boolean on) throws SocketException
錯誤的說法:
通過這個選項,可以使多個Socket物件繫結在同一個埠上。
正確的說明是:
如果埠忙,但TCP狀態位於 TIME_WAIT ,可以重用埠。如果埠忙,而TCP狀態位於其他狀態,重用埠時依舊得到一個錯誤資訊,丟擲“Addressalready in use: JVM_Bind”。如果你的服務程式停止後想立即重啟,不等60秒,而新套接字依舊使用同一埠,此時SO_REUSEADDR 選項非常有用。必須意識到,此時任何非期望資料到達,都可能導致服務程式反應混亂,不過這只是一種可能,事實上很不可能。
這個引數在Windows平臺與Linux平臺表現的特點不一樣。在Windows平臺表現的特點是不正確的,在Linux平臺表現的特點是正確的。
在Windows平臺,多個Socket新建立物件可以繫結在同一個埠上,這些新連線是非TIME_WAIT狀態的。這樣做並沒有多大意義。
在Linux平臺,只有TCP狀態位於 TIME_WAIT ,才可以重用埠。這才是正確的行為。
使用SO_REUSEADDR選項時有兩點需要注意:
1. 必須在呼叫bind方法之前使用setReuseAddress方法來開啟SO_REUSEADDR選項。因此,要想使用SO_REUSEADDR選項,就不能通過Socket類的構造方法來繫結埠。
2. 必須將繫結同一個埠的所有的Socket物件的SO_REUSEADDR選項都開啟才能起作用。如在例程4-12中,socket1和socket2都使用了setReuseAddress方法打開了各自的SO_REUSEADDR選項。
在Windows作業系統上執行上面的程式碼的執行結果如下:
這種結果是不正確的。
socket1.getReuseAddress():true
socket2.getReuseAddress():true
在Linux作業系統上執行上面的程式碼的執行結果如下:
這種結果是正確的。因為第一個連線不是TIME_WAIT狀態的,第二個連線就不能使用8899埠;
只有第一個連線是TIME_WAIT狀態的,第二個連線就才能使用8899埠;
public int getSoTimeout() throws SocketException
public void setSoTimeout(int timeout) throws SocketException
這個Socket選項在前面已經討論過。可以通過這個選項來設定讀取資料超時。當輸入流的read方法被阻塞時,如果設定timeout(timeout的單位是毫秒),那麼系統在等待了timeout毫秒後會丟擲一個InterruptedIOException例外。在丟擲例外後,輸入流並未關閉,你可以繼續通過read方法讀取資料。
如果將timeout設為0,就意味著read將會無限等待下去,直到服務端程式關閉這個Socket.這也是timeout的預設值。如下面的語句將讀取資料超時設為30秒:
1.1.1.1.1.3 建立套接字地址
InetSocketAddress addr = (getAddress()!=null?new InetSocketAddress(getAddress(),getPort()):new InetSocketAddress(getPort()));
建立套接字地址,並設定其埠
1.1.1.1.1.4 繫結地址和埠
serverSock.socket().bind(addr,getBacklog());
socket()是一個單例模式建立其例項,所以在這裡還是上面的ServerSocketChannelImpl例項,然後呼叫其bind方法,方法程式碼如下:
public ServerSocketChannel bind(SocketAddress socketaddress, int i)
throws IOException
{
synchronized(lock)
{
if(!isOpen())
//如果socket關閉,則丟擲ClosedChannelException
throw new ClosedChannelException();
if(isBound())
//如果已繫結,則丟擲AlreadyBoundException
throw new AlreadyBoundException();
//確定inetsocketaddress
InetSocketAddress inetsocketaddress = socketaddress != null ? Net.checkAddress(socketaddress) : new InetSocketAddress(0);
SecurityManager securitymanager = System.getSecurityManager();
if(securitymanager != null)
//檢查地址埠監聽許可權
securitymanager.checkListen(inetsocketaddress.getPort());
//繫結前工作
NetHooks.beforeTcpBind(fd, inetsocketaddress.getAddress(), inetsocketaddress.getPort());
//實際地址繫結
Net.bind(fd, inetsocketaddress.getAddress(), inetsocketaddress.getPort());
//開啟監聽,如果引數i小於1,預設接受50個連線
Net.listen(fd, i >= 1 ? i : 50);
synchronized(stateLock)
{
//更新ocalAddress
localAddress = Net.localAddress(fd);
}
}
return this;
}
從上面可以看出,bind首先檢查ServerSocket是否關閉,是否繫結地址,如果既沒有繫結也沒關閉,則檢查繫結的socketaddress是否正確或合法;然後通過Net工具類的bind(native)和listen(native),完成實際的
ServerSocket地址繫結和開啟監聽,如果繫結是開啟的引數小於1,則預設接受50個連線。
1.1.1.1.1.5 serverSock設定成阻塞IO
serverSock.configureBlocking(true);
程式碼如下:
public finalSelectableChannelconfigureBlocking(boolean block) throws IOException { synchronized (regLock) { if (!isOpen()) throw new ClosedChannelException(); if (blocking == block) return this; if (block && haveValidKeys()) throw new IllegalBlockingModeException(); implConfigureBlocking(block); blocking = block; } return this; }
1.1.1.1.1.6 設定讀取超時時間
serverSock.socket().setSoTimeout(getSocketProperties().getSoTimeout());
1.1.1.1.1.7 初始化執行緒數
//初始化acceptor和poller執行緒數
if (acceptorThreadCount == 0) {
// FIXME: Doesn't seem to work that well with multiple accept threads
acceptorThreadCount = 1;
}
if (pollerThreadCount <= 0) {
//minimum one poller thread
pollerThreadCount = 1;
}
1.1.1.1.1.8 例項化執行緒同步輔助類
ountDownLatch類是一個同步計數器,構造時傳入int引數,該引數就是計數器的初始值,每呼叫一次countDown()方法,計數器減1,計數器大於0 時,await()方法會阻塞程式繼續執行
stopLatch = new CountDownLatch(pollerThreadCount);
參考連結:http://www.cnblogs.com/yezhenhan/archive/2012/01/07/2315652.html
這個屬性的作用是為了在關閉的時候確定所有的pollers關閉才繼續向後執行
public void stopInternal() { releaseConnectionLatch(); if (!paused) { pause(); } if (running) { running = false; unlockAccept(); for (int i=0; pollers!=null &&i<pollers.length; i++) { if (pollers[i]==null) continue; pollers[i].destroy(); pollers[i] = null; } try { stopLatch.await(selectorTimeout+100, TimeUnit.MILLISECONDS); } catch (InterruptedExceptionignore) { } shutdownExecutor(); eventCache.clear(); nioChannels.clear(); processorCache.clear(); } }
1.1.1.1.1.9 NioSelectorPool例項設定屬性
selectorPool.open();
其中selectorPool是成員變數
private NioSelectorPool selectorPool = new NioSelectorPool();
在分析selectorPool.open();這段程式碼之前,我們必須瞭解Selector open()這個方法是幹嘛,這個方法也在NioSelectorPool中
程式碼如下:
public staticSelector open() throws IOException {
returnSelectorProvider.provider().openSelector();
}
通過呼叫系統預設的SelectorProvider(這裡不同的系統會有不同的SelectorProvider實現類)的openSelector()方法來建立新的selector
SelectorProvider.provider()這個方法我們已經在上文分析過,這裡獲取的就是同一個KQueueSelectorProvider例項
後面呼叫的也就是KQueueSelectorProvider.openSelector();原始碼如下:
public AbstractSelector openSelector()throws IOException {
returnnew KQueueSelectorImpl(this);
}
根據程式碼可以看出其例項化了一個KQueueSelectorImpl,這是一個選擇器,看一下選擇器的作用,Selector選擇器類管理著一個被註冊的通道集合的資訊和它們的就緒狀態。通道是和選擇器一起被註冊的,並且使用選擇器來更新通道的就緒狀態。當這麼做的時候,可以選擇將被激發的執行緒掛起,直到有就緒的的通道。
所以下面程式碼的則用是構建blockingSelector例項,並將KQueueSelectorImpl給注入sharedSelector,這兩個變數都是NioSelectorPool的屬性
public void open() throws IOException{ enabled = true; getSharedSelector(); if (SHARED) { blockingSelector = new NioBlockingSelector(); blockingSelector.open(getSharedSelector()); } }
protected Selector getSharedSelector() throws IOException{ if (SHARED && SHARED_SELECTOR==null) { synchronized ( NioSelectorPool.class ) { if ( SHARED_SELECTOR==null) { synchronized (Selector.class) { SHARED_SELECTOR=Selector.open(); } log.info("Usinga shared selector for servlet write/read"); } } } return SHARED_SELECTOR; }
public static Selector open() throws IOException{ return SelectorProvider.provider().openSelector(); }
下面這個方法是建立一個輪詢執行緒,然後將選擇器賦值給這個公司,並設定起為守護執行緒
public void open(Selector selector) { sharedSelector = selector; poller = new BlockPoller(); poller.selector = sharedSelector; poller.setDaemon(true); poller.setName("NioBlockingSelector.BlockPoller-"+(++threadCounter)); poller.start(); }