Java同步器之LockSupport原始碼解析
一、簡介
LockSupport
是用來建立鎖和其他同步類的基本執行緒阻塞原語。
LockSupport
中的park()
和unpark()
的作用分別是阻塞執行緒和解除阻塞執行緒,而且park()
和unpark()
不會遇到“Thread.suspend
和Thread.resume
所可能引發的死鎖”問題。因為park()
和unpark()
有許可的存在;呼叫park()
的執行緒和另一個試圖將其unpark()
的執行緒之間的競爭將保持活性。
二、原始碼
2.1 構造方法
LockSupport
類只提供了一個被private
修飾的構造方法,意味著LockSupport
不能在任何地方被例項化,但所有方法都是靜態方法,可以在任意地方被呼叫。
/**
* 私有構造
*/
private LockSupport() {}
2.2 屬性
/** * 這個方法是由於多執行緒隨機數生成器ThreadLocalRandom的package訪問許可權限制不能被這個包下的類使用, * 複製了一份實現出來,在StampedLock中被使用 */ static final int nextSecondarySeed() { int r; Thread t = Thread.currentThread(); if ((r = UNSAFE.getInt(t, SECONDARY)) != 0) { r ^= r << 13; // xorshift r ^= r >>> 17; r ^= r << 5; } else if ((r = java.util.concurrent.ThreadLocalRandom.current().nextInt()) == 0) r = 1; // avoid zero UNSAFE.putInt(t, SECONDARY, r); return r; } /** * /** * * 三種情況停止阻塞: * * 1. 呼叫unpark * * 2. 執行緒被中斷 * * 3. 設定時間到了 * * @Param isAbsolute 是否絕對時間 * * @Param time 時間 為0時代表無線等待 * * * Unsafe.park(boolean isAbsolute,long time); * * /** * * 釋放某執行緒,需要保證釋放時執行緒存活 * * * Unsafe.unpark(Thread thread); */ private static final sun.misc.Unsafe UNSAFE; /** * 獲取每個欄位的偏移地址 */ private static final long parkBlockerOffset; private static final long SEED; private static final long PROBE; private static final long SECONDARY; static { try { /** 獲取一個Unsafe例項 後續利用該例項獲取偏移量 */ UNSAFE = sun.misc.Unsafe.getUnsafe(); /** 建立一個Thread的class物件 後續利用反射獲取欄位 */ Class<?> tk = Thread.class; parkBlockerOffset = UNSAFE.objectFieldOffset (tk.getDeclaredField("parkBlocker")); SEED = UNSAFE.objectFieldOffset (tk.getDeclaredField("threadLocalRandomSeed")); PROBE = UNSAFE.objectFieldOffset (tk.getDeclaredField("threadLocalRandomProbe")); SECONDARY = UNSAFE.objectFieldOffset (tk.getDeclaredField("threadLocalRandomSecondarySeed")); } catch (Exception ex) { throw new Error(ex); } }
2.3 blocker
2.3.1 blocker的作用
在分析LockSupport
原始碼前,什麼是blocker
,blocker
的作用是什麼?
在Thread
類的原始碼中,可以找到這樣一個被volatile
修飾的變數,LockSupport
類中所有blocker
的相關變數以及方法都是為這個parkBlocker
變數服務的。
volatile Object parkBlocker;
當執行緒被阻塞時,如果該執行緒的parkBlocker
變數不為空,則在列印堆疊異常時,控制檯會列印輸出具體阻塞物件的資訊,方便錯誤排查,後文會對此進行演示
2.3.2 blocker相關方法
首先關注blocker
/**
* 設定blocker
* 將執行緒t的parkBlocker欄位的值更新為arg
*/
private static void setBlocker(Thread t, Object arg) {
UNSAFE.putObject(t, parkBlockerOffset, arg);
}
/**
* 獲取blocker
*/
public static Object getBlocker(Thread t) {
if (t == null)
throw new NullPointerException();
return UNSAFE.getObjectVolatile(t, parkBlockerOffset);
}
原始碼中可以清晰的看到方法中使用了parkBlockerOffset
,即在類初始化時獲取的parkBlocker
在記憶體中的偏移量,putObject
和getObjectVolatile
方法採用地址加偏移量的方式從記憶體直接設定或獲取parkBlocker
(Unsafe
包下的方法可以直接操作記憶體,因此該類被命名為Unsafe
),這樣做的原因是因為執行緒被阻塞時無法被賦值或取值。
2.4 park
LockSupport
為阻塞操作提供了兩組三類方法,一組是不設定blocker
的方法,另一組是設定blocker
方法的,JDK
推薦為Thread
設定blocker
方便除錯。
/**
* 基礎阻塞方法,無時限
*/
public static void park(Object blocker) {
/** 獲取當前執行緒 */
Thread t = Thread.currentThread();
setBlocker(t, blocker);
/** 阻塞核心方法 直到被喚醒前不會執行下一語句 內部邏輯後文展開討論 */
UNSAFE.park(false, 0L);
/** 執行緒被喚醒後parkBlocker重新置null */
setBlocker(t, null);
}
/**
* 邏輯與park基本相同 阻塞nanos納秒 超時後執行緒被自動喚醒
*/
public static void parkNanos(Object blocker, long nanos) {
if (nanos > 0) {
Thread t = Thread.currentThread();
setBlocker(t, blocker);
UNSAFE.park(false, nanos);
setBlocker(t, null);
}
}
/**
* 邏輯與park基本相同 阻塞到日期deadline為止 超過deadline日期後自動被喚醒
*/
public static void parkUntil(Object blocker, long deadline) {
Thread t = Thread.currentThread();
setBlocker(t, blocker);
UNSAFE.park(true, deadline);
setBlocker(t, null);
}
/**
* 無限等待其它執行緒將該執行緒unpark 或 中斷該執行緒
*/
public static void park() {
UNSAFE.park(false, 0L);
}
/**
* 指定等待時間
*/
public static void parkNanos(long nanos) {
if (nanos > 0)
UNSAFE.park(false, nanos);
}
/**
* 阻塞到指定時刻
*/
public static void parkUntil(long deadline) {
UNSAFE.park(true, deadline);
}
2.5 unpark
unpark
方法如下,UNSAFE.unpark
為喚醒執行緒的核心方法
/**
* 釋放許可,使執行緒繼續執行。
* 如果執行緒沒被阻塞,則下次park不會阻塞該執行緒?
*/
public static void unpark(Thread thread) {
if (thread != null)
UNSAFE.unpark(thread);
}
說明:LockSupport
是通過呼叫Unsafe
函式中的介面實現阻塞和解除阻塞的。
2.6 底層實現
在Unsafe
的原始碼中可見,park
與unpark
都是native
方法(意味著由C++底層實現)
void Parker::park(bool isAbsolute, jlong time) {
// 先原子的將_counter的值設為0,並返回_counter的原值,如果原值>0說明有通行證,直接返回
// 首先嚐試能否獲取許可
// 利用原子操作設_counter(許可)為0,同時返回_counter原值
// 原值為1時獲取許可成功 直接return;
if (Atomic::xchg(0, &_counter) > 0) return;
Thread* thread = Thread::current();
assert(thread->is_Java_thread(), "Must be JavaThread");
JavaThread *jt = (JavaThread *)thread;
//如果執行緒被中斷 直接返回
if (Thread::is_interrupted(thread, false)) {
return;
}
timespec absTime;
// 如果出現time小於0 或
// 呼叫了parkUntil方法(只用呼叫parkUntil時isAbsolute為true)且time為0的情況
// 意味著不需要嘗試獲取許可 直接返回
if (time < 0 || (isAbsolute && time == 0) ) {
return;
}
// 只有在java種呼叫了parkNanos或parkUntil方法才會進入此分支
if (time > 0) {
// 定時喚醒
unpackTime(&absTime, isAbsolute, time);
}
ThreadBlockInVM tbivm(jt);
// 如果執行緒被中斷,直接返回
// 如果沒有被中斷,且獲取互斥鎖失敗,直接返回
if (Thread::is_interrupted(thread, false) || pthread_mutex_trylock(_mutex) != 0) {
return;
}
int status ;
// 如果_counter > 0, 不需要等尤拉尤拉待,這裡再次檢查_counter的值
if (_counter > 0) {
_counter = 0;
status = pthread_mutex_unlock(_mutex);
assert (status == 0, "invariant") ;
// 插入寫屏障
OrderAccess::fence();
return;
}
OSThreadWaitState osts(thread->osthread(), false);
// 暫停Java執行緒
jt->set_suspend_equivalent();
assert(_cur_index == -1, "invariant");
if (time == 0) {
_cur_index = REL_INDEX; // arbitrary choice when not timed
// 執行緒進入阻塞狀態 並等待_cond[_cur_index]訊號
status = pthread_cond_wait (&_cond[_cur_index], _mutex) ;
} else {
_cur_index = isAbsolute ? ABS_INDEX : REL_INDEX;
// 執行緒進入限時阻塞狀態
status = os::Linux::safe_cond_timedwait (&_cond[_cur_index], _mutex, &absTime) ;
if (status != 0 && WorkAroundNPTLTimedWaitHang) {
pthread_cond_destroy (&_cond[_cur_index]) ;
pthread_cond_init (&_cond[_cur_index], isAbsolute ? NULL : os::Linux::condAttr());
}
}
_cur_index = -1;
_counter = 0 ;
// 互斥鎖釋放
status = pthread_mutex_unlock(_mutex) ;
assert_status(status == 0, status, "invariant") ;
OrderAccess::fence();
}
void Parker::unpark() {
int s, status ;
status = pthread_mutex_lock(_mutex);
assert (status == 0, "invariant") ;
//儲存原始許可值 用於後續判斷
s = _counter;
//許可置1
_counter = 1;
if (s < 1) {
if (WorkAroundNPTLTimedWaitHang) {
//喚醒等待執行緒
status = pthread_cond_signal (_cond) ;
assert (status == 0, "invariant") ;
//釋放鎖操作
status = pthread_mutex_unlock(_mutex);
assert (status == 0, "invariant") ;
} else {
status = pthread_mutex_unlock(_mutex);
assert (status == 0, "invariant") ;
status = pthread_cond_signal (_cond) ;
assert (status == 0, "invariant") ;
}
} else {
//釋放鎖
pthread_mutex_unlock(_mutex);
assert (status == 0, "invariant") ;
}
}
三、示例
對比下面的“示例1”和“示例2”可以更清晰的瞭解LockSupport
的用法。
示例1
public class WaitTest1 {
public static void main(String[] args) {
ThreadA ta = new ThreadA("ta");
synchronized(ta) { // 通過synchronized(ta)獲取“物件ta的同步鎖”
try {
System.out.println(Thread.currentThread().getName()+" start ta");
ta.start();
System.out.println(Thread.currentThread().getName()+" block");
// 主執行緒等待
ta.wait();
System.out.println(Thread.currentThread().getName()+" continue");
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
static class ThreadA extends Thread{
public ThreadA(String name) {
super(name);
}
public void run() {
synchronized (this) { // 通過synchronized(this)獲取“當前物件的同步鎖”
System.out.println(Thread.currentThread().getName()+" wakup others");
notify(); // 喚醒“當前物件上的等待執行緒”
}
}
}
}
示例2
import java.util.concurrent.locks.LockSupport;
public class LockSupportTest1 {
private static Thread mainThread;
public static void main(String[] args) {
ThreadA ta = new ThreadA("ta");
// 獲取主執行緒
mainThread = Thread.currentThread();
System.out.println(Thread.currentThread().getName()+" start ta");
ta.start();
System.out.println(Thread.currentThread().getName()+" block");
// 主執行緒阻塞
LockSupport.park(mainThread);
System.out.println(Thread.currentThread().getName()+" continue");
}
static class ThreadA extends Thread{
public ThreadA(String name) {
super(name);
}
public void run() {
System.out.println(Thread.currentThread().getName()+" wakup others");
// 喚醒“主執行緒”
LockSupport.unpark(mainThread);
}
}
}
執行結果 :
main start ta
main block
ta wakup others
main continue
四、總結
LocalSupport
也是實現執行緒間通訊的一種有效的方式。unpark
和park
之間不需要有嚴格的順序。可以先執行unpark
,之後再執行park
。這樣再編碼的過程中就比使用wait/notify
方法要簡單很多。此外,LocalSUpport
全部是採用UnSafe
類來實現的。這個類通過使用park/unpark
以及相關cas
操作,就實現了java
中JUC
的各種複雜的資料結構和容器。而且效率非常高。
五、拓展
5.1 park/unpark與wait/notify的區別
- 先喚醒再阻塞操作,由
LockSupport
實現,即先呼叫unpark
再呼叫park
,則該執行緒不會被阻塞;由Object
實現,即先呼叫notify
再呼叫wait
,則該執行緒會被阻塞。 -
LockSupport
允許在任意地方阻塞喚醒執行緒,Object
的wait/notify
必須在synchronized
同步程式碼塊內呼叫。因為park/unpark
依賴許可量,wait/notify
依賴鎖。 -
LockSupport
允許喚醒指定執行緒,notify
只能喚醒隨機執行緒,notifyAll
喚醒全部阻塞執行緒。