Dubbo 執行緒池原始碼解析
本文首發於個人微信公眾號《andyqian》,期待你的關注!
前言
之前文章《Java執行緒池ThreadPoolExecutor》《ThreadPoolExecutor 原理解析》中,分別講述了ThreadPoolExecutor 的概念以及原理,今天就一起來看看其在 Dubbo 框架中的應用。
ThreadFactory 與 AbortPolicy
Dubbo 為我們提供了幾種不同型別的執行緒池實現,其底層均使用的是 JDK 中的 ThreadPoolExecutor 執行緒池。ThreadPoolExecutor 我們都已經非常熟悉,其建構函式中有幾個非常重要的引數。其中就包括:拒絕策略( ThreadPoolExecutor.AbortPolicy ) 以及 ThreadFactory,在 Dubbo 中自定義了 ThreadPoolExecutor.AbortPolicy 以及 ThreadFactory。在學習執行緒池之前,我們先來看看這兩者的實現,更有益於後面的理解。
在Dubbo中NamedInternalThreadFactory 為自定義的執行緒 ThreadFactory 的子類。其類圖如下:
其中 NamedInternalThreadFactory 類,其實現如下所示:
public class NamedInternalThreadFactory extends NamedThreadFactory { public NamedInternalThreadFactory() { super(); } public NamedInternalThreadFactory(String prefix) { super(prefix, false); } public NamedInternalThreadFactory(String prefix, boolean daemon) { super(prefix, daemon); } @Override public Thread newThread(Runnable runnable) { String name = mPrefix + mThreadNum.getAndIncrement(); InternalThread ret = new InternalThread(mGroup, runnable, name, 0); ret.setDaemon(mDaemon); return ret; } }
其中 NamedThreadFactory 類的實現如下:
public class NamedThreadFactory implements ThreadFactory { protected static final AtomicInteger POOL_SEQ = new AtomicInteger(1); protected final AtomicInteger mThreadNum = new AtomicInteger(1); protected final String mPrefix; protected final boolean mDaemon; protected final ThreadGroup mGroup; public NamedThreadFactory() { this("pool-" + POOL_SEQ.getAndIncrement(), false); } public NamedThreadFactory(String prefix) { this(prefix, false); } public NamedThreadFactory(String prefix, boolean daemon) { mPrefix = prefix + "-thread-"; mDaemon = daemon; SecurityManager s = System.getSecurityManager(); mGroup = (s == null) ? Thread.currentThread().getThreadGroup() : s.getThreadGroup(); } @Override public Thread newThread(Runnable runnable) { String name = mPrefix + mThreadNum.getAndIncrement(); Thread ret = new Thread(mGroup, runnable, name, 0); ret.setDaemon(mDaemon); return ret; } public ThreadGroup getThreadGroup() { return mGroup; }
到這裡,上述程式碼描述的是Dubbo對執行緒池中執行緒的命名規則,其作用是為了方便追蹤資訊。
接下來,我們來看下拒絕策略 AbortPolicyWithReport 類的實現,其類圖如下所示:
原始碼如下:
public class AbortPolicyWithReport extends ThreadPoolExecutor.AbortPolicy {
protected static final Logger logger = LoggerFactory.getLogger(AbortPolicyWithReport.class);
private final String threadName;
private final URL url;
private static volatile long lastPrintTime = 0;
private static final long TEN_MINUTES_MILLS = 10 * 60 * 1000;
private static final String OS_WIN_PREFIX = "win";
private static final String OS_NAME_KEY = "os.name";
private static final String WIN_DATETIME_FORMAT = "yyyy-MM-dd_HH-mm-ss";
private static final String DEFAULT_DATETIME_FORMAT = "yyyy-MM-dd_HH:mm:ss";
private static Semaphore guard = new Semaphore(1);
public AbortPolicyWithReport(String threadName, URL url) {
this.threadName = threadName;
this.url = url;
}
// 覆蓋 父類 ThreadPoolExecutor.AbortPolicy 的 rejectedExecution 方法。
@Override
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
// 構造 warn 引數,其中包括:執行緒狀態,執行緒池數量,活躍數量,核心執行緒池數量,最大執行緒池數量 等資訊。
String msg = String.format("Thread pool is EXHAUSTED!" +
" Thread Name: %s, Pool Size: %d (active: %d, core: %d, max: %d, largest: %d), Task: %d (completed: "
+ "%d)," +
" Executor status:(isShutdown:%s, isTerminated:%s, isTerminating:%s), in %s://%s:%d!",
threadName, e.getPoolSize(), e.getActiveCount(), e.getCorePoolSize(), e.getMaximumPoolSize(),
e.getLargestPoolSize(),
e.getTaskCount(), e.getCompletedTaskCount(), e.isShutdown(), e.isTerminated(), e.isTerminating(),
url.getProtocol(), url.getIp(), url.getPort());
logger.warn(msg);
// dump 堆疊資訊
dumpJStack();
throw new RejectedExecutionException(msg);
}
// 當執行 rejectedExecution 方法時,會執行該方法。將會 dump 堆疊資訊 至 DUMP_DIRECTORY 目錄,預設為:user.name 目錄下。
private void dumpJStack() {
long now = System.currentTimeMillis();
//dump every 10 minutes
if (now - lastPrintTime < TEN_MINUTES_MILLS) {
return;
}
if (!guard.tryAcquire()) {
return;
}
ExecutorService pool = Executors.newSingleThreadExecutor();
pool.execute(() -> {
String dumpPath = url.getParameter(DUMP_DIRECTORY, System.getProperty("user.home"));
SimpleDateFormat sdf;
String os = System.getProperty(OS_NAME_KEY).toLowerCase();
// window system don't support ":" in file name
if (os.contains(OS_WIN_PREFIX)) {
sdf = new SimpleDateFormat(WIN_DATETIME_FORMAT);
} else {
sdf = new SimpleDateFormat(DEFAULT_DATETIME_FORMAT);
}
String dateStr = sdf.format(new Date());
//try-with-resources
try (FileOutputStream jStackStream = new FileOutputStream(
new File(dumpPath, "Dubbo_JStack.log" + "." + dateStr))) {
// 工具類,此處實現省略,有興趣的可以檢視。
JVMUtil.jstack(jStackStream);
} catch (Throwable t) {
logger.error("dump jStack error", t);
} finally {
guard.release();
}
lastPrintTime = System.currentTimeMillis();
});
//must shutdown thread pool ,if not will lead to OOM
pool.shutdown();
}
}
上面的程式碼不難,都是打日誌,dump 堆疊資訊,其目的就是:用於線上程池被打滿時,也就是記錄執行AbortPolicy時現場資訊,主要是便於後期的分析與問題排查。
執行緒池的實現
上面講述了Dubbo執行緒池中自定義的 ThreadFactory 類 以及 AbortPolicyWithReport 類。接下來,我們繼續講解 Dubbo 提供的不同執行緒池實現,其類圖如下所示:
1. LimitedThreadPool 執行緒池
原始碼如下:
public class LimitedThreadPool implements ThreadPool {
@Override
public Executor getExecutor(URL url) {
String name = url.getParameter(THREAD_NAME_KEY, DEFAULT_THREAD_NAME);
int cores = url.getParameter(CORE_THREADS_KEY, DEFAULT_CORE_THREADS);
int threads = url.getParameter(THREADS_KEY, DEFAULT_THREADS);
int queues = url.getParameter(QUEUES_KEY, DEFAULT_QUEUES);
return new ThreadPoolExecutor(cores, threads, Long.MAX_VALUE, TimeUnit.MILLISECONDS,
queues == 0 ? new SynchronousQueue<Runnable>() :
(queues < 0 ? new LinkedBlockingQueue<Runnable>()
: new LinkedBlockingQueue<Runnable>(queues)),
new NamedInternalThreadFactory(name, true), new AbortPolicyWithReport(name, url));
}
其中:
-
THREAD_NAME_KEY 值為:threadname ,表示為:執行緒名,其預設值為:Dubbo。
-
CORE_THREADS_KEY 值為:corethreads,表示:核心執行緒池數量,其預設值為:0。
-
THREADS_KEY 值為:threads 表示:最大執行緒數,預設值為:200。
-
QUEUES_KEY 值為:queues 表示:阻塞佇列大小,預設值為:0。
備註:
-
該執行緒池中的 cores,threads 引數由外部制定,其中 keepAliveTime 值為:Long.MAX_VALUE,TimeUnit 為 TimeUnit.MILLISECONDS (毫秒)。(意味著執行緒池中的所有執行緒永不過期,理論上大於Long.MAX_VALUE 即會過期,因為其足夠大,這裡可以看為是永不過期 )。
-
此處使用了三目運算子:
當 queues = 0 時,BlockingQueue為SynchronousQueue。
當 queues < 0 時,則構造一個新的LinkedBlockingQueue。
當 queues > 0 時,構造一個指定元素的LinkedBlockingQueue。
queues == 0 ? new SynchronousQueue<Runnable>() : (queues < 0 ? new LinkedBlockingQueue<Runnable>() : new LinkedBlockingQueue<Runnable>(queues)
該執行緒池的特點是:可以建立若干個執行緒,其預設值為 200,執行緒池中的執行緒生命週期非常長,甚至可以看做是永不過期。
2. CachedThreadPool 執行緒池
原始碼:
public class CachedThreadPool implements ThreadPool {
@Override
public Executor getExecutor(URL url) {
String name = url.getParameter(THREAD_NAME_KEY, DEFAULT_THREAD_NAME);
int cores = url.getParameter(CORE_THREADS_KEY, DEFAULT_CORE_THREADS);
int threads = url.getParameter(THREADS_KEY, Integer.MAX_VALUE);
int queues = url.getParameter(QUEUES_KEY, DEFAULT_QUEUES);
int alive = url.getParameter(ALIVE_KEY, DEFAULT_ALIVE);
return new ThreadPoolExecutor(cores, threads, alive, TimeUnit.MILLISECONDS,
queues == 0 ? new SynchronousQueue<Runnable>() :
(queues < 0 ? new LinkedBlockingQueue<Runnable>()
: new LinkedBlockingQueue<Runnable>(queues)),
new NamedInternalThreadFactory(name, true), new AbortPolicyWithReport(name, url));
}
}
其中:
-
THREAD_NAME_KEY 值為:threadname , 表示為:執行緒名,其預設值為:Dubbo。
-
CORE_THREADS_KEY 值為:corethreads,表示為:核心執行緒池數量,其預設值為:0。
-
THREADS_KEY 值為:threads,表示:最大執行緒數,預設值為:Integer.MAX_VALUE。
-
QUEUES_KEY 值為:queues,表示:阻塞佇列大小,預設值為:0。
-
ALIVE_KEY 值為:alive, 表示: keepAliveTime 表示執行緒池中執行緒的存活時間,其預設值為:60 * 1000 (毫秒) 也就是一分鐘。
該執行緒池的特點是:可建立無限多執行緒(在作業系統的限制下,會遠遠低於Integer.MAX_VALUE值,這裡視為無限大),其執行緒的最大存活時間預設為 1 分鐘。意味著可以建立無限多執行緒,但是執行緒的生命週期預設較短!
3. FixedThreadPool 執行緒池
public class FixedThreadPool implements ThreadPool {
@Override
public Executor getExecutor(URL url) {
String name = url.getParameter(THREAD_NAME_KEY, DEFAULT_THREAD_NAME);
int threads = url.getParameter(THREADS_KEY, DEFAULT_THREADS);
int queues = url.getParameter(QUEUES_KEY, DEFAULT_QUEUES);
return new ThreadPoolExecutor(threads, threads, 0, TimeUnit.MILLISECONDS,
queues == 0 ? new SynchronousQueue<Runnable>() :
(queues < 0 ? new LinkedBlockingQueue<Runnable>()
: new LinkedBlockingQueue<Runnable>(queues)),
new NamedInternalThreadFactory(name, true), new AbortPolicyWithReport(name, url));
}
其中:
-
THREADS_KEY 值為:threads,表示:最大執行緒數,預設值為:200。
-
QUEUES_KEY 值為:queues,表示:阻塞佇列大小,預設值為:0。
-
corePoolSize,maximumPoolSize 的執行緒數量均為:threads,(也就意味著核心執行緒數等於最大執行緒數)。
-
keepAliveTime 的預設值為0,當執行緒數大於corePoolSize 時,多餘的空閒執行緒會立即終止。
該執行緒池的特點是:該執行緒池中corePoolSize 數量 與 maxinumPoolSize 數量一致,當提交的任務大於核心執行緒池時,則會將其放入到LinkedBlockingQueue佇列中等待執行,也是Dubbo中預設使用的執行緒池。
4. EagerThreadPool 執行緒池
原始碼:
public class EagerThreadPool implements ThreadPool {
@Override
public Executor getExecutor(URL url) {
String name = url.getParameter(THREAD_NAME_KEY, DEFAULT_THREAD_NAME);
int cores = url.getParameter(CORE_THREADS_KEY, DEFAULT_CORE_THREADS);
int threads = url.getParameter(THREADS_KEY, Integer.MAX_VALUE);
int queues = url.getParameter(QUEUES_KEY, DEFAULT_QUEUES);
int alive = url.getParameter(ALIVE_KEY, DEFAULT_ALIVE);
// init queue and executor
TaskQueue<Runnable> taskQueue = new TaskQueue<Runnable>(queues <= 0 ? 1 : queues);
EagerThreadPoolExecutor executor = new EagerThreadPoolExecutor(cores,
threads,
alive,
TimeUnit.MILLISECONDS,
taskQueue,
new NamedInternalThreadFactory(name, true),
new AbortPolicyWithReport(name, url));
taskQueue.setExecutor(executor);
return executor;
}
}
備註:
該執行緒池與上面的執行緒池實現方式有些不一樣,上面是直接使用了ThreadPoolExecutor 類的建構函式。在該執行緒池實現中,首先構造了一個自定義的 EagerThreadPoolExecutor 執行緒池,其底層實現也是基於 ThreadPoolExecutor 類的,其程式碼如下所示:
public class EagerThreadPoolExecutor extends ThreadPoolExecutor {
/**
* task count
*/
private final AtomicInteger submittedTaskCount = new AtomicInteger(0);
public EagerThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit, TaskQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler) {
super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory, handler);
}
/**
* @return current tasks which are executed
*/
public int getSubmittedTaskCount() {
return submittedTaskCount.get();
}
@Override
protected void afterExecute(Runnable r, Throwable t) {
// 執行任務數依次遞減
submittedTaskCount.decrementAndGet();
}
@Override
public void execute(Runnable command) {
if (command == null) {
throw new NullPointerException();
}
// do not increment in method beforeExecute!
// 提交任務書 依次遞加。
submittedTaskCount.incrementAndGet();
try {
// 呼叫父類方法執行執行緒任務
super.execute(command);
} catch (RejectedExecutionException rx) {
// 將任務重新新增到佇列中
final TaskQueue queue = (TaskQueue) super.getQueue();
try {
//如果新增失敗,則減少任務數,並丟擲異常。
if (!queue.retryOffer(command, 0, TimeUnit.MILLISECONDS)) {
submittedTaskCount.decrementAndGet();
throw new RejectedExecutionException("Queue capacity is full.", rx);
}
} catch (InterruptedException x) {
submittedTaskCount.decrementAndGet();
throw new RejectedExecutionException(x);
}
} catch (Throwable t) {
// decrease any way
submittedTaskCount.decrementAndGet();
throw t;
}
}
在這裡我們發現,在 EagerThreadPoolExecutor 類中,過載了父類ThreadPoolExecutor 類的幾個方法,分別如下:afterExecute,execute方法。分別加入 submittedTaskCount 屬性進行任務的統計,當父類的execute方法丟擲 RejectedExecutionExcetion 異常時,則會將任務重新放入佇列中執行,其TaskQueue程式碼如下:
public class TaskQueue<R extends Runnable> extends LinkedBlockingQueue<Runnable> {
private static final long serialVersionUID = -2635853580887179627L;
private EagerThreadPoolExecutor executor;
public TaskQueue(int capacity) {
super(capacity);
}
public void setExecutor(EagerThreadPoolExecutor exec) {
executor = exec;
}
@Override
public boolean offer(Runnable runnable) {
if (executor == null) {
throw new RejectedExecutionException("The task queue does not have executor!");
}
int currentPoolThreadSize = executor.getPoolSize();
// have free worker. put task into queue to let the worker deal with task.
// 當提交的任務數,小於 當前執行緒時,則之間呼叫父類的offer 方法。
if (executor.getSubmittedTaskCount() < currentPoolThreadSize) {
return super.offer(runnable);
}
// return false to let executor create new worker.
// 噹噹前執行緒數大小小於,最大執行緒數時,則直接返回false,建立worker。
if (currentPoolThreadSize < executor.getMaximumPoolSize()) {
return false;
}
// currentPoolThreadSize >= max
return super.offer(runnable);
}
/**
* retry offer task
*
* @param o task
* @return offer success or not
* @throws RejectedExecutionException if executor is terminated.
*/
public boolean retryOffer(Runnable o, long timeout, TimeUnit unit) throws InterruptedException {
if (executor.isShutdown()) {
throw new RejectedExecutionException("Executor is shutdown!");
}
return super.offer(o, timeout, unit);
}
}
該執行緒池的特點是:可以重新將拒絕掉的task,重新新增的work queue中執行。相當於有一個重試機制!
結語
通過上面的分析,我相信大家對Dubbo中執行緒池應該有所瞭解。如果還有不清楚的地方,可以通過debug的方式進行跟蹤分析。其實在很多的開源框架中,都有自定義的執行緒池,但其底層最終使用的還是 ThreadPoolExecutor 執行緒池,這個知識點建議大家一定要掌握,無論是實際工作還是面試,都是一個常用的知識點。
相關閱讀: