劍指架構師系列-tomcat6通過偽異步實現connector
首先在StandardService中start接收請求的線程,如下:
synchronized (connectors) { for (int i = 0; i < connectors.length; i++) { try { ((Lifecycle) connectors[i]).start(); } catch (Exception e) { log.error(sm.getString("standardService.connector.startFailed",connectors[i]), e); } } }
然後進入Connector,在這個類中調用了org.apache.coyote.http11.Http11Protocol類
protocolHandler.start();
在Http11Protocol類中又調用了org.apache.tomcat.util.net.JIoEndpoint類
endpoint.start();
下面看一下JIoEndpoint類中的start源代碼,如下:
public void start() throws Exception { // Initialize socket if not done before if (!initialized) { init(); } if (!running) { running = true; paused = false; // Create worker collection if (executor == null) { workers = new WorkerStack(maxThreads); // maxThreads值為200,可同時處理200個請求 } // Start acceptor threads for (int i = 0; i < acceptorThreadCount; i++) { // acceptorThreadCount值為1,只有一個接收請求的線程 Thread acceptorThread = new Thread(new Acceptor(), getName() + "-Acceptor-" + i); acceptorThread.setPriority(threadPriority); acceptorThread.setDaemon(daemon); acceptorThread.start(); } } }
WorkerStack類使用了定長的數組來方便存取worker,也就是真正處理請求的線程。重點看一下Acceptor這個Runnable類的實現。
/** * Server socket acceptor thread. */ protected class Acceptor implements Runnable { /** * The background thread that listens for incoming TCP/IP connections * and hands them off to an appropriate processor. */ public void run() { // Loop until we receive a shutdown command while (running) { // Loop if endpoint is paused while (paused) { try { Thread.sleep(1000); } catch (InterruptedException e) { // Ignore } } // Accept the next incoming connection from the server socket try { Socket socket = serverSocketFactory.acceptSocket(serverSocket); // 阻塞接收socket請求 serverSocketFactory.initSocket(socket); // Hand this socket off to an appropriate processor if (!processSocket(socket)) { // 處理socket請求 // Close socket right away try { socket.close(); } catch (IOException e) { // Ignore } } } catch (IOException x) { if (running) log.error(sm.getString("endpoint.accept.fail"), x); } catch (Throwable t) { log.error(sm.getString("endpoint.accept.fail"), t); } // The processor will recycle itself when it finishes ??? } }// end run }
最重要的就是processSocket()方法了,看下源代碼:
protected boolean processSocket(Socket socket) { try { if (executor == null) { // 默認情況 getWorkerThread().assign(socket); } else { // 用戶自己指定了執行任務的線程池 executor.execute(new SocketProcessor(socket)); } } catch (Throwable t) { // This means we got an OOM or similar creating a thread, or that // the pool and its queue are full log.error(sm.getString("endpoint.process.fail"), t); return false; } return true; }
在默認情況下,首先要get到一個Worker的Thread,然後才能assign任務。
看一下getWorkerThread()這條邏輯:
/** * Return a new worker thread, and block while to worker is available. */ protected Worker getWorkerThread() { // Allocate a new worker thread synchronized (workers) { // 獲取workers鎖 Worker workerThread; while ((workerThread = createWorkerThread()) == null) { try { workers.wait(); // 沒有可用線程時釋放workers鎖,等待notify } catch (InterruptedException e) { // Ignore } } return workerThread; } }
代碼要通過createWorkerThread()方法來獲取一個workerThread,閱讀如下代碼就可以知道,這個方法有可能返回null。這樣這個線程就需要讓鎖等待了,直到有線程notify。想一下就知道,肯定是分配出去執行任務的線程執行完成後,就可以notify接口請求的線程。接收請求的線程繼續while循環,直到獲取到一個workerThread為止。
createWorkerThread()方法源代碼:
protected Worker createWorkerThread() { synchronized (workers) { if (workers.size() > 0) { // 通過WorkerStack提供的方法來操作Worker curThreadsBusy++; return workers.pop(); } if ((maxThreads > 0) && (curThreads < maxThreads)) { // 保證不能大於指定的最大線程數 curThreadsBusy++; if (curThreadsBusy == maxThreads) { log.info(sm.getString("endpoint.info.maxThreads", Integer.toString(maxThreads), address,Integer.toString(port))); } return (newWorkerThread()); } else { if (maxThreads < 0) { // maxThreads小於0時會無限制的new WorkerThread,表示不限制 curThreadsBusy++; return (newWorkerThread()); } else { // 當curThreads等於maxThreads或者大於maxThreads且maxThreads大於0的情況 return (null); } } } }
recycleWorkerThread()方法源代碼:
protected void recycleWorkerThread(Worker workerThread) { synchronized (workers) { workers.push(workerThread); curThreadsBusy--; workers.notify(); } }
這個方法被誰調用了呢?當然是被執行任何的線程調用了。
下面來看一下最重要的Worker類中非常重要的幾個方法,如下:
protected class Worker implements Runnable { protected Thread thread = null; protected boolean available = false; // available初始化為false protected Socket socket = null; /** * The background thread that listens for incoming TCP/IP connections * and hands them off to an appropriate processor. */ public void run() { // Process requests until we receive a shutdown signal while (running) { // Wait for the next socket to be assigned Socket socket = await(); // 1 if (socket == null) continue; // Process the request from this socket if (!setSocketOptions(socket) || !handler.process(socket)) { // Close socket try { socket.close(); } catch (IOException e) { } } // Finish up this request socket = null; recycleWorkerThread(this); } } /** * Start the background processing thread. */ public void start() { thread = new Thread(this); thread.setName(getName() + "-" + (++curThreads)); thread.setDaemon(true); thread.start(); } }
這個線程在assign任務之前是start的,看一下run()方法中的第1步調用了await()方法,在await()方法中由於available值默認為false,所以進入了while循環後讓出了線程鎖並等待assign()方法notifyAll()。
/** * Await a newly assigned Socket from our Connector, or * null if we are supposed to shut down. */ private synchronized Socket await() { // Wait for the Connector to provide a new Socket while (!available) { try { wait(); } catch (InterruptedException e) { } } // Notify the Connector that we have received this Socket Socket socket = this.socket; available = false; notifyAll(); return (socket); }
當我們assign任務後,調用的assign()方法如下:
/** * Process an incoming TCP/IP connection on the specified socket. Any * exception that occurs during processing must be logged and swallowed. * NOTE: This method is called from our Connector‘s thread. We * must assign it to our own thread so that multiple simultaneous * requests can be handled. */ synchronized void assign(Socket socket) { // Wait for the Processor to get the previous Socket while (available) { try { wait(); } catch (InterruptedException e) { } } // Store the newly available Socket and notify our thread this.socket = socket; available = true; notifyAll(); }
沒有進入while循環,置available為true後notifyAll()。這樣await()方法就跳出循環並置available為false後返回一個局部變量socket(為什麽要返回一個局部變量socket呢?),這樣run()方法就可以開始往下走了,完成後調用recycleWorkerThread()方法進行線程回收。
這個run()方法再次進入while循環,調用await()方法後,由於await()方法在之前跳出循環時將available設置為false,所以就進入了讓鎖等待,等待請求線程調用assign()方法指定任務,這樣就回到了開始敘述的地方了。
為什麽在await()方法中使用局部變量socket呢?
摘自深入剖析Tomcat:因為使用局部變量可以在當前Socket對象處理完之前,繼續接收下一個Socket對象。
個人認為是怕在run()方法運行的過程中其它線程調用這個Worker對象的assign()方法,畢竟這個對象的引用是可以被其它線程獲取到的。為什麽可以調用assign()方法重新指定呢?因為run()方法沒有加synchronized關鍵字,所以不能與assign()方法互斥訪問socket資源。還是為了安全性吧。
劍指架構師系列-tomcat6通過偽異步實現connector