tomcat優化後的worker執行緒池
阿新 • • 發佈:2018-12-12
tomcat實現了自己的worker執行緒池,重寫了ThreadPoolExecutor的execute部分邏輯,使之更適合web服務這種IO密集型任務。直接貼原始碼。
自定義的ThreadPoolExecutor:
/** * Same as a java.util.concurrent.ThreadPoolExecutor but implements a much more efficient * {@link #getSubmittedCount()} method, to be used to properly handle the work queue. * If a RejectedExecutionHandler is not specified a default one will be configured * and that one will always throw a RejectedExecutionException * @author fhanik * */ public class ThreadPoolExecutor extends java.util.concurrent.ThreadPoolExecutor { /** * The string manager for this package. */ protected static final StringManager sm = StringManager .getManager("org.apache.tomcat.util.threads.res"); /** * The number of tasks submitted but not yet finished. This includes tasks * in the queue and tasks that have been handed to a worker thread but the * latter did not start executing the task yet. * This number is always greater or equal to {@link #getActiveCount()}. */ //這個標識實際執行中的任務數量,和threadPoolExecutor的poolSize方法拿到的執行緒數可能有差異 private final AtomicInteger submittedCount = new AtomicInteger(0); private final AtomicLong lastContextStoppedTime = new AtomicLong(0L); /** * Most recent time in ms when a thread decided to kill itself to avoid * potential memory leaks. Useful to throttle the rate of renewals of * threads. */ private final AtomicLong lastTimeThreadKilledItself = new AtomicLong(0L); /** * Delay in ms between 2 threads being renewed. If negative, do not renew threads. */ private long threadRenewalDelay = Constants.DEFAULT_THREAD_RENEWAL_DELAY; public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, RejectedExecutionHandler handler) { super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, handler); } public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler) { super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory, handler); } public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory) { super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory, new RejectHandler()); } public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue) { super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, new RejectHandler()); } public long getThreadRenewalDelay() { return threadRenewalDelay; } public void setThreadRenewalDelay(long threadRenewalDelay) { this.threadRenewalDelay = threadRenewalDelay; } @Override protected void afterExecute(Runnable r, Throwable t) { //任務執行完畢,重置併發任務數量 submittedCount.decrementAndGet(); if (t == null) { stopCurrentThreadIfNeeded(); } } /** * If the current thread was started before the last time when a context was * stopped, an exception is thrown so that the current thread is stopped. */ protected void stopCurrentThreadIfNeeded() { if (currentThreadShouldBeStopped()) { long lastTime = lastTimeThreadKilledItself.longValue(); if (lastTime + threadRenewalDelay < System.currentTimeMillis()) { if (lastTimeThreadKilledItself.compareAndSet(lastTime, System.currentTimeMillis() + 1)) { // OK, it's really time to dispose of this thread final String msg = sm.getString( "threadPoolExecutor.threadStoppedToAvoidPotentialLeak", Thread.currentThread().getName()); throw new StopPooledThreadException(msg); } } } } protected boolean currentThreadShouldBeStopped() { if (threadRenewalDelay >= 0 && Thread.currentThread() instanceof TaskThread) { TaskThread currentTaskThread = (TaskThread) Thread.currentThread(); if (currentTaskThread.getCreationTime() < this.lastContextStoppedTime.longValue()) { return true; } } return false; } public int getSubmittedCount() { return submittedCount.get(); } /** * {@inheritDoc} */ @Override public void execute(Runnable command) { //若等待佇列已滿,且等待了tomeout之後仍然沒法加到佇列裡,會拋異常 execute(command,0,TimeUnit.MILLISECONDS); } /** * Executes the given command at some time in the future. The command * may execute in a new thread, in a pooled thread, or in the calling * thread, at the discretion of the <tt>Executor</tt> implementation. * If no threads are available, it will be added to the work queue. * If the work queue is full, the system will wait for the specified * time and it throw a RejectedExecutionException if the queue is still * full after that. * * @param command the runnable task * @throws RejectedExecutionException if this task cannot be * accepted for execution - the queue is full * @throws NullPointerException if command or unit is null */ public void execute(Runnable command, long timeout, TimeUnit unit) { submittedCount.incrementAndGet(); try { //用一個新執行緒或者已存在的空閒執行緒跑,如果queue滿了,拋RejectedExecutionException //這裡和concurrent包下的ThreadPoolExecutor不同,標準執行緒池會先扔到佇列中,佇列滿了再開新執行緒跑,到了maxThread後才拋RejectedExecutionException //在這重寫了execute的邏輯,來一個任務先扔到執行緒裡跑,到了maxThread後才會放到佇列裡。 //標準執行緒池更適合CPU密集型任務,而tomcat優化過的這個執行緒池更適合IO密集型任務 super.execute(command); } catch (RejectedExecutionException rx) { if (super.getQueue() instanceof TaskQueue) { final TaskQueue queue = (TaskQueue)super.getQueue(); try { //強制推到佇列裡,推不進去只能拋異常了 if (!queue.force(command, timeout, unit)) { submittedCount.decrementAndGet(); throw new RejectedExecutionException("Queue capacity is full."); } } catch (InterruptedException x) { submittedCount.decrementAndGet(); Thread.interrupted(); throw new RejectedExecutionException(x); } } else { submittedCount.decrementAndGet(); throw rx; } } } public void contextStopping() { this.lastContextStoppedTime.set(System.currentTimeMillis()); // save the current pool parameters to restore them later int savedCorePoolSize = this.getCorePoolSize(); TaskQueue taskQueue = getQueue() instanceof TaskQueue ? (TaskQueue) getQueue() : null; if (taskQueue != null) { // note by slaurent : quite oddly threadPoolExecutor.setCorePoolSize // checks that queue.remainingCapacity()==0. I did not understand // why, but to get the intended effect of waking up idle threads, I // temporarily fake this condition. taskQueue.setForcedRemainingCapacity(Integer.valueOf(0)); } // setCorePoolSize(0) wakes idle threads this.setCorePoolSize(0); // TaskQueue.take() takes care of timing out, so that we are sure that // all threads of the pool are renewed in a limited time, something like // (threadKeepAlive + longest request time) if (taskQueue != null) { // ok, restore the state of the queue and pool taskQueue.setForcedRemainingCapacity(null); } this.setCorePoolSize(savedCorePoolSize); } //用了一個自定義的拒絕策略,若果沒有配置拒絕策略,則直接拋異常 private static class RejectHandler implements RejectedExecutionHandler { @Override public void rejectedExecution(Runnable r, java.util.concurrent.ThreadPoolExecutor executor) { throw new RejectedExecutionException(); } } }
自定義的執行緒池佇列:
/** * As task queue specifically designed to run with a thread pool executor. * The task queue is optimised to properly utilize threads within * a thread pool executor. If you use a normal queue, the executor will spawn threads * when there are idle threads and you wont be able to force items unto the queue itself * @author fhanik * */ public class TaskQueue extends LinkedBlockingQueue<Runnable> { private static final long serialVersionUID = 1L; private ThreadPoolExecutor parent = null; // no need to be volatile, the one times when we change and read it occur in // a single thread (the one that did stop a context and fired listeners) private Integer forcedRemainingCapacity = null; //LinkedBlockingQueue是有界佇列,預設最大長度為Integer最大值。 public TaskQueue() { super(); } public TaskQueue(int capacity) { super(capacity); } public TaskQueue(Collection<? extends Runnable> c) { super(c); } public void setParent(ThreadPoolExecutor tp) { parent = tp; } //強制新增到佇列 public boolean force(Runnable o) { if ( parent.isShutdown() ) throw new RejectedExecutionException("Executor not running, can't force a command into the queue"); return super.offer(o); //forces the item onto the queue, to be used if the task is rejected } //強制新增到佇列,並設定等待時間 public boolean force(Runnable o, long timeout, TimeUnit unit) throws InterruptedException { if ( parent.isShutdown() ) throw new RejectedExecutionException("Executor not running, can't force a command into the queue"); return super.offer(o,timeout,unit); //forces the item onto the queue, to be used if the task is rejected } @Override public boolean offer(Runnable o) { //we can't do any checks if (parent==null) return super.offer(o); // 若執行執行緒為最大執行緒數,直接加入佇列 if (parent.getPoolSize() == parent.getMaximumPoolSize()) return super.offer(o); //若執行執行緒數大於執行中的任務數,說明有執行緒空閒,直接加入佇列 if (parent.getSubmittedCount()<(parent.getPoolSize())) return super.offer(o); //發現執行執行緒數少於池最大執行緒數,拒絕加入佇列 if (parent.getPoolSize()<parent.getMaximumPoolSize()) return false; //if we reached here, we need to add it to the queue return super.offer(o); } @Override public Runnable poll(long timeout, TimeUnit unit) throws InterruptedException { Runnable runnable = super.poll(timeout, unit); if (runnable == null && parent != null) { // the poll timed out, it gives an opportunity to stop the current // thread if needed to avoid memory leaks. // parent.stopCurrentThreadIfNeeded(); } return runnable; } @Override public Runnable take() throws InterruptedException { if (parent != null && parent.currentThreadShouldBeStopped()) { return poll(parent.getKeepAliveTime(TimeUnit.MILLISECONDS), TimeUnit.MILLISECONDS); // yes, this may return null (in case of timeout) which normally // does not occur with take() // but the ThreadPoolExecutor implementation allows this } return super.take(); } @Override public int remainingCapacity() { if (forcedRemainingCapacity != null) { // ThreadPoolExecutor.setCorePoolSize checks that // remainingCapacity==0 to allow to interrupt idle threads // I don't see why, but this hack allows to conform to this // "requirement" return forcedRemainingCapacity.intValue(); } return super.remainingCapacity(); } public void setForcedRemainingCapacity(Integer forcedRemainingCapacity) { this.forcedRemainingCapacity = forcedRemainingCapacity; } }
jdk自帶的執行緒池遵循三條原則:
/* * Proceed in 3 steps: * * 1. If fewer than corePoolSize threads are running, try to * start a new thread with the given command as its first * task. The call to addWorker atomically checks runState and * workerCount, and so prevents false alarms that would add * threads when it shouldn't, by returning false. * * 2. If a task can be successfully queued, then we still need * to double-check whether we should have added a thread * (because existing ones died since last checking) or that * the pool shut down since entry into this method. So we * recheck state and if necessary roll back the enqueuing if * stopped, or start a new thread if there are none. * * 3. If we cannot queue task, then we try to add a new * thread. If it fails, we know we are shut down or saturated * and so reject the task. */
一是若執行時執行緒數小於核心執行緒數,則新建執行緒跑任務。二是如果要新增到佇列,需要檢查執行緒狀態及執行緒池是否shutdown,可能需要回滾佇列,三是如果佇列滿了,才會new一個執行緒跑任務。
可以看出來在佇列沒滿的情況下,標準執行緒池的執行緒數應該是小於等於corePoolSize的,而上面tomcat自定義的執行緒池則始終讓執行緒保持和maxPoolSize相同,更適合重IO型服務,尤其是tomcat這種web容器的worker執行緒,可以提升一部分吞吐量。