Java線程池的理論與實踐
Java中的Thread與操作系統中的線程的關系
線程切換的各種開銷
ThreadGroup存在的意義
使用線程池減少線程開銷
Executor的概念
ThreadPoolExecutor中的一些具體實現
如何監控線程的健康
參考ThreadPoolExecutor來設計適合自己的線程模型
這個項目所在系統的軟件架構(從開發到運維)基本上采用的是微服務架構,微服務很好地解決了我們系統的復雜性問題,但是隨之也帶來了一些問題,比如在此架構中大部分的服務都擁有自己單獨的數據庫,而有些(很重要的)業務需要做跨庫查詢。相信這種「跨庫查詢」的問題很多實踐微服務的公司都碰到過,通常這類問題有以下幾種解決方案(當然,還有更多其他的方案,這裏就不一一敘述了):
嚴格通過服務提供的API查詢。
這樣做的好處是將服務完全當做黑盒,可以最大限度得減少服務間的依賴與耦合關系,其次還能根據實際需求服務之間使用不同的數據庫類型;缺點是則代價太大。
將關心的信息冗余到自己的庫中,並提供API讓其他服務來主動修改。
指令與查詢分離(CQRS)。將可能被其他服務關心的數據放入數據倉庫(或者做成類似於物化視圖、搜索引擎等),數據倉庫只提供讀的功能。
優點是對主庫不會有壓力,服務只要關心實現自己的業務就好,缺點是數據的實時性會受到了挑戰。
指令與查詢分離
結合實際情況,我們使用的是第3種方案。然而隨著越來越多的業務依賴讀庫,甚至依賴其中一些狀態的變化,所以讀庫的數據同步如果出現高延時,則會直接影響業務的進行。出了幾次這種事情後,於是下決心要改善這種情況。首先想到的就是使用線程池來進行消息的消費(寫入讀庫),JDK自從1.5開始提供了實用而強大的線程池工具——Executor框架。
二、Executor框架
Executor框架在Java1.5中引入,大部分的類都在包java.util.concurrent中,由大神Doug Lea寫成,其中常用到的有以下幾個類和接口:
java.util.concurrent.Executor
一個只包含一個方法的接口,它的抽象含義是:用來執行一個Runnable任務的執行器。
java.util.concurrent.ExecutorService
對Executor的一個擴展,增加了很多對於任務和執行器的生命周期進行管理的接口,也是通常進行多線程開發最常使用的接口。
java.util.concurrent.ThreadFactory
一個生成新線程的接口。用戶可以通過實現這個接口管理對線程池中生成線程的邏輯
java.util.concurrent.Executors
提供了很多不同的生成執行器的實用方法,比如基於線程池的執行器的實現。
三、為什麽要用線程池
Java從最開始就是基於線程的,線程在Java裏被封裝成一個類java.lang.Thread。在面試中很多面試官都會問一個很基礎的關於線程問題:
Java中有幾種方法新建一個線程?
所有人都知道,標準答案是兩種:繼承Thread或者實現Runnable,在JDK源代碼中Thread類的註釋中也是這麽寫的。
然而在我看來這兩種方法根本就是一種,所有想要開啟線程的操作,都必須生成了一個Thread類(或其子類)的實例,執行其中的native方法start0()。
Java中的線程
Java中將線程抽象為一個普通的類,這樣帶來了很多好處,譬如可以很簡單的使用面向對象的方法實現多線程的編程,然而這種程序寫多了容易會忘記,這個對象在底層是實實在在地對應了一個OS中的線程。
操作系統中的線程和進程
上圖中的進程(Process)可以看做一個JVM,可以看出,所有的進程有自己的私有內存,這塊內存會在主存中有一段映射,而所有的線程共享JVM中的內存。在現代的操作系統中,線程的調度通常都是集成在操作系統中的,操作系統能通過分析更多的信息來決定如何更高效地進行線程的調度,這也是為什麽Java中會一直強調,線程的執行順序是不會得到保證的,因為JVM自己管不了這個,所以只能認為它是完全無序的。
另外,類java.lang.Thread中的很多屬性也會直接映射為操作系統中線程的一些屬性。Java的Thread中提供的一些方法如sleep和yield其實依賴於操作系統中線程的調度算法。
關於線程的調度算法可以去讀操作系統相關的書籍,這裏就不做太多敘述了。
線程的開銷
通常來說,操作系統中線程之間的上下文切換大約要消耗1到10微秒
從上圖中可以看出線程中包含了一些上下文信息:
CPU棧指針(Stack)、
一組寄存器的值(Registers),
指令計數器的值(PC)等,
它們都保存在此線程所在的進程所映射的主存中,而對於Java來說,這個進程就是JVM所在的那個進程,JVM的運行時內存可以簡單的分為如下幾部分:
若幹個棧(Stack)。每個線程有自己的棧,JVM中的棧是不能存儲對象的,只能存儲基礎變量和對象引用。
堆(Heap)。一個JVM只有一個堆,所有的對象都在堆上分配。
方法區(Method Area)。一個JVM只有一個方法區,包含了所有載入的類的字節碼和靜態變量。
其中#1中的棧可以認為是這個線程的上下文,創建線程要申請相應的棧空間,而棧空間的大小是一定的,所以當棧空間不夠用時,會導致線程申請不成功。在Thread的源代碼中可以看到,啟動線程的最後一步是執行一個本地方法private native void start0(),代碼1是OpenJDK中start0最終調用的方法:
//代碼1
JVM_ENTRY(void, JVM_StartThread(JNIEnv env, jobject jthread))
JVMWrapper("JVM_StartThread");
JavaThread native_thread = NULL;
bool throw_illegal_thread_state = false;
// We must release the Threads_lock before we can post a jvmti event
// in Thread::start.
{
MutexLocker mu(Threads_lock);
//省略一些代碼
jlong size =
java_lang_Thread::stackSize(JNIHandles::resolve_non_null(jthread));
size_t sz = size > 0 ? (size_t) size : 0;
native_thread = new JavaThread(&thread_entry, sz);
}
if (native_thread->osthread() == NULL) {
THROW_MSG(vmSymbols::java_lang_OutOfMemoryError(),
"unable to create new native thread");
}
Thread::start(native_thread);
JVM_END
從代碼1中可以看到,線程的創建首先需要棧空間,所以過多的線程創建可能會導致OOM。
同時,線程的切換會有以下開銷:
CPU中執行上下文的切換,導致CPU中的「指令流水線(Instruction Pipeline)」的中斷和CPU緩存的失效。
如果線程太多,線程切換的時間會比線程執行的時間要長,嚴重浪費了CPU資源。
對於共享資源的競爭(鎖)會導致線程切換開銷急劇增加。
根據以上的描述,所以通常建議盡可能創建較少的線程,減少鎖的使用(尤其是synchronized),盡量使用JDK提供的同步工具。而為了減少線程上下文切換帶來的開銷,通常使用線程池是一個有效的方法。
具有1-5工作經驗的,面對目前流行的技術不知從何下手,需要突破技術瓶頸的可以加群。在公司待久了,過得很安逸,但跳槽時面試碰壁。需要在短時間內進修、跳槽拿高薪的可以加群。如果沒有工作經驗,但基礎非常紮實,對java工作機制,常用設計思想,常用java開發框架掌握熟練的可以加群。java架構群:582505643一起交流。
Java中的線程池
Executor框架中最常用的大概就是java.util.concurrent.ThreadPoolExecutor了,對於它的描述,簡單的說就是「它維護了一個線程池,對於提交到此Executor中的任務,它不是創建新的線程而是使用池內的線程進行執行」。對於「數量巨大但執行時間很小」的任務,可以顯著地減少對於任務執行的開銷。java.util.concurrent.ThreadPoolExecutor中包含了很多屬性,通過這些屬性開發者可以定制不同的線程池行為,大致如下:
- 線程池的大小:corePoolSize和maximumPoolSize
ThreadPoolExecutor中線程池的大小由這兩個屬性決定,前者指當線程池正常運行起來後的最小(核心)線程數,當一個任務到來時,若當前池中線程數小於corePoolSize,則會生成新的線程;後者指當等待隊列滿了之後可生成的最大的線程數。在例1中返回的對象中這兩個值相等,均等於用戶傳入的值。
-
用戶可以通過調用java.util.concurrent.ThreadPoolExecutor上的實例方法來啟動核心線程(core pool)
- 可定制化的線程生成方式:threadFactory
默認線程由方法Executors.defaultThreadFactory()返回的ThreadFactory進行創建,默認創建的線程都不是daemon,開發者可以傳入自定義的ThreadFactory進行對線程的定制化。
-
非核心線程的空閑等待時間:keepAliveTime
- 任務等待隊列:workQueue
這個隊列是java.util.concurrent.BlockingQueue<E>的一個實例。當池中當前沒有空閑的線程來執行任務,就會將此任務放入等待隊列,根據其具體實現類的不同,又可分為3種不同的隊列策略:
容量為0。如:java.util.concurrent.SynchronousQueue
等待隊列容量為0,所有需要阻塞的任務必須等待池內的某個線程有空閑,才能繼續執行,否則阻塞。調用Executors.newCachedThreadPool的兩個函數生成的線程池是這個策略。
不限容量。如:不指定容量的java.util.concurrent.LinkedBlockingQueue
等待隊列的長度無窮大,根據上文中的敘述,在這種策略下,不會有多於corePoolSize的線程被創建,所以maximumPoolSize也就沒有任何意義了。調用Executors.newFixedThreadPool生成的線程池是這個策略。
限制容量。如:指定容量的任何java.util.concurrent.BlockingQueue<E>
在某些場景下(本文中將描述這種場景),需要指定等待隊列的容量,以防止過多的資源消耗,比如如果使用不限容量的等待隊列,當有大量的任務到來而池內又無空閑線程執行任務時,會有大量的任務堆積,這些任務都是某個類的對象,是要消耗內存的,就可能導致OOM。如何去平衡等待隊列和線程池的大小要根據實際場景去斷定,如果配置不當,可能會導致資源耗盡、線程上下文切換消耗、或者線程調度消耗。這些都會直接影響系統的吞吐。
- 任務拒絕處理器:defaultHandler
如果任務被拒絕執行,則會調用這個對象上的RejectedExecutionHandler.rejectedExecution()方法,JDK定義了4種處理策略,用戶可以自定義自己的任務處理策略。
- 允許核心線程過期:allowCoreThreadTimeOut
上面說的所有情況都是基於這個變量為false(默認值)來說的,如果你的線程池已經不使用了(不被引用),但是其中還有活著的線程時,這個線程池是不會被回收的,這種情況就造成了內存泄漏——一塊永遠不會被訪問到的內存卻無法被GC回收。
用戶可以通過在拋棄線程池引用的時候顯式地調用shutdown()來釋放它,或者將allowCoreThreadTimeOut設置為true,則在過期時間後,核心線程會被釋放,則其會被GC回收。
四、如果線程死掉了怎麽辦
幾乎所有Executors中生成線程池的方法的註釋上,都有代表相同意思的一句話,表示如果線程池中的某個線程死掉了,線程池會生成一個新的線程代替它。下面是方法java.util.concurrent.Executors.newFixedThreadPool(int)上的註釋。
If any thread terminates due to a failure during execution prior to shutdown, a new one will take its place if needed to execute subsequent tasks.
線程死亡的原因
我們都知道守護線程(daemon)會在所有的非守護線程都死掉之後也死掉,除此之外導致一個非守護線程死掉有以下幾種可能:
自然死亡,Runnable.run()方法執行完後返回。
執行過程中有未捕獲異常,被拋到了Runnable.run()之外,導致線程死亡。
其宿主死亡,進程關閉或者機器死機。在Java中通常是System.exit()方法被調用
其他硬件問題。
線程池要保證其高可用性,就必須保證線程的可用。如一個固定容量的線程池,其中一個線程死掉了,它必須要能監控到線程的死亡並生成一個新的線程來代替它。ThreadPoolExecutor中與線程相關的有這樣幾個概念:
java.util.concurrent.ThreadFactory,在Executors中有兩種ThreadFactory,但其提供的線程池只使用了一種java.util.concurrent.Executors.DefaultThreadFactory,它是簡單的使用ThreadGroup來實現。
java.lang.ThreadGroup,從Java1開始就存在的類,用來建立一個線程的樹形結構,可以用它來組織線程間的關系,但其並沒有對其包含的子線程的監控。
java.util.concurrent.ThreadPoolExecutor.Worker,ThreadPoolExecutor對線程的封裝,其中還包含了一些統計功能。
ThreadPoolExecutor中如何保障線程的可用
在ThreadPoolExecutor中使用了一個很巧妙的方法實現了對線程池中線程健康狀況的監控,代碼2是從ThreadPoolExecutor類源碼中截取的一段代碼,它們在一起說明了其對線程的監控。
可以看到,在ThreadPoolExecutor中的線程被封裝成一個對象Worker,而將其中的run()代理到ThreadPoolExecutor中的runWorker(),在runWorker()方法中是一個獲取任務並執行的死循環。如果任務的運行出了什麽問題(如拋出未捕獲異常),processWorkerExit()方法會被執行,同時傳入的completedAbruptly參數為true,會重新添加一個初始任務為null的Worker,並隨之啟動一個新的線程。
//代碼2
//ThreadPoolExecutor的動態內部類
private final class Worker extends AbstractQueuedSynchronizer implements Runnable {
/** 對象中封裝的線程 */
final Thread thread;
/** 第一個要運行的任務,可能為null. */
Runnable firstTask;
/** 任務計數器 */
volatile long completedTasks;
//省略其他代碼
Worker(Runnable firstTask) {
setState(-1); // inhibit interrupts until runWorker
this.firstTask = firstTask;
this.thread = getThreadFactory().newThread(this);
}
/** Delegates main run loop to outer runWorker */
public void run() {
runWorker(this);
}
}
final void runWorker(Worker w) {
Thread wt = Thread.currentThread();
boolean completedAbruptly = true;
try {
while (task != null || (task = getTask()) != null) {
w.lock();
try {
beforeExecute(wt, task);
try {
task.run();
} finally {
afterExecute(task, thrown);
}
} finally {
task = null;
w.completedTasks++;
w.unlock();
}
}
completedAbruptly = false;
} finally {
processWorkerExit(w, completedAbruptly);
}
}
private void processWorkerExit(Worker w, boolean completedAbruptly) {
if (runStateLessThan(c, STOP)) {
if (!completedAbruptly) {
int min = allowCoreThreadTimeOut ? 0 : corePoolSize;
if (min == 0 && ! workQueue.isEmpty())
min = 1;
if (workerCountOf(c) >= min)
return; // replacement not needed
}
addWorker(null, false);
}
}
private boolean addWorker(Runnable firstTask, boolean core) {
retry:
for (;;) {
int c = ctl.get();
int rs = runStateOf(c);
// Check if queue empty only if necessary.
if (rs >= SHUTDOWN &&
! (rs == SHUTDOWN &&
firstTask == null &&
! workQueue.isEmpty()))
return false;
for (;;) {
int wc = workerCountOf(c);
if (wc >= CAPACITY ||
wc >= (core ? corePoolSize : maximumPoolSize))
return false;
if (compareAndIncrementWorkerCount(c))
break retry;
c = ctl.get(); // Re-read ctl
if (runStateOf(c) != rs)
continue retry;
// else CAS failed due to workerCount change; retry inner loop
}
}
boolean workerStarted = false;
boolean workerAdded = false;
Worker w = null;
try {
w = new Worker(firstTask);
final Thread t = w.thread;
if (t != null) {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
// Recheck while holding lock.
// Back out on ThreadFactory failure or if
// shut down before lock acquired.
int rs = runStateOf(ctl.get());
if (rs < SHUTDOWN ||
(rs == SHUTDOWN && firstTask == null)) {
if (t.isAlive()) // precheck that t is startable
throw new IllegalThreadStateException();
workers.add(w);
int s = workers.size();
if (s > largestPoolSize)
largestPoolSize = s;
workerAdded = true;
}
} finally {
mainLock.unlock();
}
if (workerAdded) {
t.start();
workerStarted = true;
}
}
} finally {
if (! workerStarted)
addWorkerFailed(w);
}
return workerStarted;
}
五、回到我的問題
由於各種各樣的原因,我們並沒有使用數據庫自帶的主從機制來做數據的復制,而是將主庫的所有DML語句作為消息發送到讀庫(DTS),同時自己實現了數據的重放。第一版的數據同步服務十分簡單,對於主庫的DML消息處理和消費(寫入讀庫)都是在一個線程內完成的.這麽實現的優點是簡單,但缺點是直接導致了表與表之間的數據同步會受到影響,如果有一個表A忽然來了很多的消息(往往是批量修改數據造成的),則會占住消息處理通道,影響其他業務數據的及時同步,同時單線程寫庫吞吐太小。
上文說到,首先想到的是使用線程池來做消息的消費,但是不能直接套用上邊說的Executor框架,由於以下幾個原因:
ThreadPoolExecutor中默認所有的任務之間是不互相影響的,然而對於數據庫的DML來說,消息的順序不能被打亂,至少單表的消息順序必須有序,不然會影響最終的數據一致。
ThreadPoolExecutor中所有的線程共享一個等待隊列,然而為了防止表與表之間的影響,每個線程應該有自己的任務等待隊列。
寫庫操作的吞吐直接受到提交事務數的影響,所以此多線程框架要可以支持任務的合並。
重復造輪子是沒有意義的,但是在我們這種場景下JDK中現有的Executor框架不符合要求,只能自己造輪子。
具有1-5工作經驗的,面對目前流行的技術不知從何下手,需要突破技術瓶頸的可以加群。在公司待久了,過得很安逸,但跳槽時面試碰壁。需要在短時間內進修、跳槽拿高薪的可以加群。如果沒有工作經驗,但基礎非常紮實,對java工作機制,常用設計思想,常用java開發框架掌握熟練的可以加群。java架構群:582505643一起交流。
我的實現
首先把線程抽象成「DML語句的執行器(Executor)」。其中包含了一個Thread的實例,維護了自己的等待隊列(限定容量的阻塞隊列),和對應的消息執行邏輯。
除此之外還包含了一些簡單的統計、線程健康監控、合並事務等處理。
Executor的對象實現了Thread.UncaughtExceptionHandler接口,並綁定到其工作線程上。同時ExecutorGroup也會再生成一個守護線程專門來守護池內所有線程,作為額外的保險措施。
把線程池的概念抽象成執行器組(ExecutorGroup),其中維護了執行器的數組,並維護了目標表到特定執行器的映射關系,並對外提供執行消息的接口,其主要代碼如下:
//代碼3
public class ExecutorGroup {
Executor[] group = new Executor[NUM];
Thread boss = null;
Map<String, Integer> registeredTables = new HashMap<>(32);
// AtomicInteger cursor = new AtomicInteger();
volatile int cursor = 0;
public ExecutorGroup(String name) {
//init group
for(int i = 0; i < NUM; i++) {
logger.debug("啟動線程{},{}", name, i);
group[i] = new Executor(this, String.format("sync-executor-%s-%d", name, i), i / NUM_OF_FIRST_CLASS);
}
startDaemonBoss(String.format("sync-executor-%s-boss", name));
}
//額外的保險
private void startDaemonBoss(String name) {
if (boss != null) {
boss.interrupt();
}
boss = new Thread(() -> {
while(true) {
//休息一分鐘。。。
if (this.group != null) {
for (int i = 0; i < group.length; i++) {
Executor executor = group[i];
if (executor != null) {
executor.checkThread();
}
}
}
}
});
boss.setName(name);
boss.setDaemon(true);
boss.start();
}
public void execute(Message message){
logger.debug("執行消息");
//省略消息合法性驗證
if (!registeredTables.containsKey(taskKey)) {
//已註冊
// registeredTables.put(taskKey, cursor.getAndIncrement());
registeredTables.put(taskKey, cursor++ % NUM);
}
int index = registeredTables.get(taskKey);
logger.debug("執行消息{},註冊索引{}", taskKey, index);
try {
group[index].schedule(message);
} catch (InterruptedException e) {
logger.error("準備消息出錯", e);
}
}
}
完成後整體的線程模型如下圖所示:
新的線程模型
Java1.7新加入的TransferQueue
Java1.7中提供了新的隊列類型TransferQueue,但只提供了一個它的實現java.util.concurrent.LinkedTransferQueue<E>,它有更好的性能表現,可它是一個無容量限制的隊列,而在我們的這個場景下必須要限制隊列的容量,所以要自己實現一個有容量限制的隊列。
Java線程池的理論與實踐