Java synchronized那點事
前言
請看上篇:Java 物件頭那點事
文章中的原始碼都有不同程度縮減,來源於openjdk8的開原始碼(tag:jdk8-b120)。
鎖粗化過程
偏向鎖
①:markword中儲存的執行緒ID是自己且epoch等於class的epoch,則說明是偏向鎖重入。
②:偏向鎖若已禁用,進行撤銷偏向鎖。
③:偏向鎖開啟,都進行進行重偏向操作。
④:若進行了鎖撤銷操作或重偏向操作失敗,則需要升級為輕量級鎖或者進一步升級為重量級鎖。
匿名偏向
鎖物件在傳送鎖競爭後會升級為偏向鎖,不過當不發生鎖競爭時,鎖物件依然會升級為偏向鎖,這種情況叫匿名偏向。
當jvm啟動4s後,會預設給新建的物件加上偏向鎖。
上程式碼:
<dependency>
<groupId>org.openjdk.jol</groupId>
<artifactId>jol-core</artifactId>
<version>0.8</version>
</dependency>
這個包下的工具類的功能有:
// 檢視物件內部結構 System.out.println(ClassLayout.parseInstance(bingo).toPrintable()); // 檢視物件外部資訊 System.out.println(GraphLayout.parseInstance(bingo).toPrintable()); // 檢視物件總大小 System.out.println(GraphLayout.parseInstance(bingo).totalSize());
預設JVM是開啟指標壓縮,可以通過vm引數開啟關閉指標壓縮:-XX:-UseCompressedOops
。
當建立鎖物件前不進行休眠4s的操作:
@Test public void mark() throws InterruptedException { Bingo bingo = new Bingo(); bingo.setP(1); bingo.setB(false); // 檢視物件內部結構 System.out.println(ClassLayout.parseInstance(bingo).toPrintable()); System.out.println("\n++++++++++++++++++++++++++\n"); synchronized (bingo) { // 檢視物件內部結構 System.out.println(ClassLayout.parseInstance(bingo).toPrintable()); } }
看我標紅線的後三位的值,由於啟動過快,鎖直接從無鎖升級成了輕量級鎖。
當建立鎖物件前進行休眠4s的操作:
@Test
public void mark() throws InterruptedException {
TimeUnit.SECONDS.sleep(4);
Bingo bingo = new Bingo();
bingo.setP(1);
bingo.setB(false);
// 檢視物件內部結構
System.out.println(ClassLayout.parseInstance(bingo).toPrintable());
System.out.println("\n++++++++++++++++++++++++++\n");
synchronized (bingo) {
// 檢視物件內部結構
System.out.println(ClassLayout.parseInstance(bingo).toPrintable());
}
}
當在程式啟動4s後建立鎖物件,就會預設偏向。
重偏向
因為偏向鎖不會自動釋放,因此當鎖物件處於偏向鎖時,另一個執行緒進來只能依託VM判斷上一個獲取偏向鎖的執行緒是否存活、是否退出持有鎖來決定是鎖升級還是進行重偏向。
鎖撤銷
①:偏向鎖的撤銷必須等待VM全域性安全點(安全點指所有java執行緒都停在安全點,只有vm執行緒執行)。
②:撤銷偏向鎖恢復到無鎖(標誌位為 01)或輕量級鎖(標誌位為 00)的狀態。
③:只要發生鎖競爭,就會進行鎖撤銷。
備註:
當開啟偏向鎖時,若持有偏向鎖的執行緒仍然存活且未退出同步程式碼塊,鎖升級為輕量級鎖/重量級鎖之前會進行偏向鎖撤銷操作。
如果是升級為輕量級鎖,撤銷之後需要建立Lock Record 來儲存之前的markword資訊。
批量偏向/撤銷概念:
參考1:https://www.cnblogs.com/LemonFive/p/11248248.html
- 批量重偏向
當一個執行緒同時持有同一個類的多個物件的偏向鎖時(這些物件的鎖競爭不激烈),執行完同步程式碼塊後,如果另一個執行緒也要持有這些物件的鎖,當物件數量達到一定程度時,會觸發批量重偏向機制(進行過批量重偏向的物件不可再進行批量重偏向)。 - 批量鎖撤銷
當觸發批量重偏向後,會觸發批量撤銷機制。
閾值定義在globals.hpp中:
// 批量重偏向閾值
product(intx, BiasedLockingBulkRebiasThreshold, 20)
// 批量鎖撤銷閾值
product(intx, BiasedLockingBulkRevokeThreshold, 40)
可以在VM啟動引數中通過-XX:BiasedLockingBulkRebiasThreshold
和 -XX:BiasedLockingBulkRevokeThreshold
來手動設定閾值。
偏向鎖的撤銷和重偏向的程式碼(過於複雜)在biasedLocking.cpp中:
void BiasedLocking::revoke_at_safepoint(Handle h_obj) {
assert(SafepointSynchronize::is_at_safepoint(), "must only be called while at safepoint");
oop obj = h_obj();
HeuristicsResult heuristics = update_heuristics(obj, false);
if (heuristics == HR_SINGLE_REVOKE) {
// 重偏向
revoke_bias(obj, false, false, NULL);
} else if ((heuristics == HR_BULK_REBIAS) ||
(heuristics == HR_BULK_REVOKE)) {
// 批量撤銷或重偏向
bulk_revoke_or_rebias_at_safepoint(obj, (heuristics == HR_BULK_REBIAS), false, NULL);
}
clean_up_cached_monitor_info();
}
參考2:
對於存在明顯多執行緒競爭的場景下使用偏向鎖是不合適的,比如生產者-消費者佇列。生產者執行緒獲得了偏向鎖,消費者執行緒再去獲得鎖的時候,就涉及到這個偏向鎖的撤銷(revoke)操作,而這個撤銷是比較昂貴的。那麼怎麼判斷這些物件是否適合偏向鎖呢?jvm採用以類為單位的做法,其內部為每個類維護一個偏向鎖計數器,對其物件進行偏向鎖的撤銷操作進行計數。當這個值達到指定閾值的時候,jvm就認為這個類的偏向鎖有問題,需要進行重偏向(rebias)。對所有屬於這個類的物件進行重偏向的操作叫批量重偏向(bulk rebias),之前的做法是對heap進行遍歷,後來引入epoch。當需要bulk rebias時,對這個類的epoch值加1,以後分配這個類的物件的時候mark欄位裡就是這個epoch值了,同時還要對當前已經獲得偏向鎖的物件的epoch值加1,這些鎖資料記錄在方法棧裡。這樣判斷這個物件是否獲得偏向鎖的條件就是:mark欄位後3位是101,thread欄位跟當前執行緒相同,epoch欄位跟所屬類的epoch值相同。如果epoch值不一樣,即使thread欄位指向當前執行緒,也是無效的,相當於進行過了rebias,只是沒有對物件的mark欄位進行更新。如果這個類的revoke計數器繼續增加到一個閾值,那個jvm就認為這個類不適合偏向鎖了,就要進行bulk revoke。於是多了一個判斷條件,要檢視所屬類的欄位,看看是否允許對這個類使用偏向鎖。
輕量級鎖
輕量級體現線上程會嘗試在自己的堆疊中建立Lock Record儲存鎖物件的相關資訊,不需要在核心態和使用者態之間進行切換,不需要作業系統進行排程。
加鎖
拿到輕量級鎖執行緒堆疊:
Lock Record主要分為兩部分:
- obj
指向鎖物件本身。重入時也如此。 - displaced header(縮寫為hdr)
第一次拿到鎖時hdr存放的是encode加密後的markword,重入時存放null。
思考:為什麼鎖重入時hdr存放的是null,而不是用計數器來實現呢?
假設一個場景,當一個執行緒同時拿到A、B、C...N 多個鎖的時候,那麼執行緒的堆疊中,肯定有多個鎖物件的Lock Record,
如:
synchronized(a){
synchronized(b){
synchronized(c){
// do something
synchronized(a){
// do something
}
}
}
}
當鎖a重入時,如果用計數器,還得遍歷當前執行緒堆疊拿到第一次的Lock Record,解鎖時也要遍歷,效率必然低下。作為jdk底層程式碼必然講究效率。
以上純屬個人看法(歡迎交流)。
解鎖
①:使用遍歷方式將當前執行緒堆疊中屬於該鎖物件的Lock Record 指向Null。
②:CAS還原markword為無鎖狀態。
③:第②步失敗需要升級為重量級鎖。
優缺點
-
優點
線上程接替/交替執行的情況下,鎖競爭比較小,可以避免成為重量級鎖而引起的效能問題。 -
缺點
當鎖競爭比較激烈、多執行緒同事競爭鎖的時候,需要從輕量級升級為重量級,產生了額外的開銷。
原始碼分析
加鎖
加鎖、解鎖流程的程式碼在InterpreterRuntime.cpp中。
這是我從github拉下來的原始碼:
/**
* (輕量級鎖)加鎖流程
* */
CASE(_monitorenter): {
// (鎖物件本身)
oop lockee = STACK_OBJECT(-1);
// derefing's lockee ought to provoke implicit null check
CHECK_NULL(lockee);
// find a free monitor or one already allocated for this object
// if we find a matching object then we need a new monitor
// since this is recursive enter
BasicObjectLock* limit = istate->monitor_base();
BasicObjectLock* most_recent = (BasicObjectLock*) istate->stack_base();
// (這個entry就是大家常說的Lock Record吧)
BasicObjectLock* entry = NULL;
while (most_recent != limit ) {
if (most_recent->obj() == NULL) entry = most_recent;
else if (most_recent->obj() == lockee) break;
most_recent++;
}
if (entry != NULL) {
entry->set_obj(lockee);
// (構建一個無鎖狀態的mark word)
markOop displaced = lockee->mark()->set_unlocked();
// (放到lock record 中)
entry->lock()->set_displaced_header(displaced);
// 鎖物件的markword是否為這個無鎖的displaced markword
// (CAS替換失敗說明鎖物件的markword 不是無所狀態)
if (Atomic::cmpxchg_ptr(entry, lockee->mark_addr(), displaced) != displaced) {
// Is it simple recursive case?
// (判斷是否是鎖重入)
if (THREAD->is_lock_owned((address) displaced->clear_lock_bits())) {
// (如果是重入場景,那麼新的Lock Record 設定為Null)
entry->lock()->set_displaced_header(NULL);
} else {
// (不是鎖重入,且搶鎖失敗,說明鎖競爭激烈,升級為重量級。進入重量級鎖搶鎖流程)
CALL_VM(InterpreterRuntime::monitorenter(THREAD, entry), handle_exception);
}
}
UPDATE_PC_AND_TOS_AND_CONTINUE(1, -1);
} else {
istate->set_msg(more_monitors);
UPDATE_PC_AND_RETURN(0); // Re-execute
}
}
可以看得出來,這部分程式碼並沒有體現出偏向鎖的邏輯,有大佬給出原因,可以參考這篇部落格:https://www.jianshu.com/p/4758852cbff4
其他大佬解析後的程式碼:
點選檢視程式碼
CASE(_monitorenter): {
// lockee 就是鎖物件
oop lockee = STACK_OBJECT(-1);
// derefing's lockee ought to provoke implicit null check
CHECK_NULL(lockee);
// code 1:找到一個空閒的Lock Record
BasicObjectLock* limit = istate->monitor_base();
BasicObjectLock* most_recent = (BasicObjectLock*) istate->stack_base();
BasicObjectLock* entry = NULL;
while (most_recent != limit ) {
if (most_recent->obj() == NULL) entry = most_recent;
else if (most_recent->obj() == lockee) break;
most_recent++;
}
//entry不為null,代表還有空閒的Lock Record
if (entry != NULL) {
// code 2:將Lock Record的obj指標指向鎖物件
entry->set_obj(lockee);
int success = false;
uintptr_t epoch_mask_in_place = (uintptr_t)markOopDesc::epoch_mask_in_place;
// markoop即物件頭的mark word
markOop mark = lockee->mark();
intptr_t hash = (intptr_t) markOopDesc::no_hash;
// code 3:如果鎖物件的mark word的狀態是偏向模式
if (mark->has_bias_pattern()) {
uintptr_t thread_ident;
uintptr_t anticipated_bias_locking_value;
thread_ident = (uintptr_t)istate->thread();
// code 4:這裡有幾步操作,下文分析
anticipated_bias_locking_value =
(((uintptr_t)lockee->klass()->prototype_header() | thread_ident) ^ (uintptr_t)mark) &
~((uintptr_t) markOopDesc::age_mask_in_place);
// code 5:如果偏向的執行緒是自己且epoch等於class的epoch
if (anticipated_bias_locking_value == 0) {
// already biased towards this thread, nothing to do
if (PrintBiasedLockingStatistics) {
(* BiasedLocking::biased_lock_entry_count_addr())++;
}
success = true;
}
// code 6:如果偏向模式關閉,則嘗試撤銷偏向鎖
else if ((anticipated_bias_locking_value & markOopDesc::biased_lock_mask_in_place) != 0) {
markOop header = lockee->klass()->prototype_header();
if (hash != markOopDesc::no_hash) {
header = header->copy_set_hash(hash);
}
// 利用CAS操作將mark word替換為class中的mark word
if (Atomic::cmpxchg_ptr(header, lockee->mark_addr(), mark) == mark) {
if (PrintBiasedLockingStatistics)
(*BiasedLocking::revoked_lock_entry_count_addr())++;
}
}
// code 7:如果epoch不等於class中的epoch,則嘗試重偏向
else if ((anticipated_bias_locking_value & epoch_mask_in_place) !=0) {
// 構造一個偏向當前執行緒的mark word
markOop new_header = (markOop) ( (intptr_t) lockee->klass()->prototype_header() | thread_ident);
if (hash != markOopDesc::no_hash) {
new_header = new_header->copy_set_hash(hash);
}
// CAS替換物件頭的mark word
if (Atomic::cmpxchg_ptr((void*)new_header, lockee->mark_addr(), mark) == mark) {
if (PrintBiasedLockingStatistics)
(* BiasedLocking::rebiased_lock_entry_count_addr())++;
}
else {
// 重偏向失敗,代表存在多執行緒競爭,則呼叫monitorenter方法進行鎖升級
CALL_VM(InterpreterRuntime::monitorenter(THREAD, entry), handle_exception);
}
success = true;
}
else {
// 走到這裡說明當前要麼偏向別的執行緒,要麼是匿名偏向(即沒有偏向任何執行緒)
// code 8:下面構建一個匿名偏向的mark word,嘗試用CAS指令替換掉鎖物件的mark word
markOop header = (markOop) ((uintptr_t) mark & ((uintptr_t)markOopDesc::biased_lock_mask_in_place |(uintptr_t)markOopDesc::age_mask_in_place |epoch_mask_in_place));
if (hash != markOopDesc::no_hash) {
header = header->copy_set_hash(hash);
}
markOop new_header = (markOop) ((uintptr_t) header | thread_ident);
// debugging hint
DEBUG_ONLY(entry->lock()->set_displaced_header((markOop) (uintptr_t) 0xdeaddead);)
if (Atomic::cmpxchg_ptr((void*)new_header, lockee->mark_addr(), header) == header) {
// CAS修改成功
if (PrintBiasedLockingStatistics)
(* BiasedLocking::anonymously_biased_lock_entry_count_addr())++;
}
else {
// 如果修改失敗說明存在多執行緒競爭,所以進入monitorenter方法
CALL_VM(InterpreterRuntime::monitorenter(THREAD, entry), handle_exception);
}
success = true;
}
}
// 如果偏向執行緒不是當前執行緒或沒有開啟偏向模式等原因都會導致success==false
if (!success) {
// 輕量級鎖的邏輯
//code 9: 構造一個無鎖狀態的Displaced Mark Word,並將Lock Record的lock指向它
markOop displaced = lockee->mark()->set_unlocked();
entry->lock()->set_displaced_header(displaced);
//如果指定了-XX:+UseHeavyMonitors,則call_vm=true,代表禁用偏向鎖和輕量級鎖
bool call_vm = UseHeavyMonitors;
// 利用CAS將物件頭的mark word替換為指向Lock Record的指標
if (call_vm || Atomic::cmpxchg_ptr(entry, lockee->mark_addr(), displaced) != displaced) {
// 判斷是不是鎖重入
if (!call_vm && THREAD->is_lock_owned((address) displaced->clear_lock_bits())) {
//code 10: 如果是鎖重入,則直接將Displaced Mark Word設定為null
entry->lock()->set_displaced_header(NULL);
} else {
CALL_VM(InterpreterRuntime::monitorenter(THREAD, entry), handle_exception);
}
}
}
UPDATE_PC_AND_TOS_AND_CONTINUE(1, -1);
} else {
// lock record不夠,重新執行
istate->set_msg(more_monitors);
UPDATE_PC_AND_RETURN(0); // Re-execute
}
}
解鎖
/**
* (輕量級鎖)解鎖流程
* */
CASE(_monitorexit): {
oop lockee = STACK_OBJECT(-1);
CHECK_NULL(lockee);
BasicObjectLock* limit = istate->monitor_base();
BasicObjectLock* most_recent = (BasicObjectLock*) istate->stack_base();
// (挨個遍歷當前執行緒棧中的Lock Record)
while (most_recent != limit ) {
// (Lock Record 的obj是否是需解鎖的鎖物件)
if ((most_recent)->obj() == lockee) {
BasicLock* lock = most_recent->lock();
markOop header = lock->displaced_header();
// (將obj設定為null(作刪除處理))
most_recent->set_obj(NULL);
// If it isn't recursive we either must swap old header or call the runtime
if (header != NULL) {
// (非重入,CAS替換物件頭的markword 為Lock Rocord中的displaced markword)
if (Atomic::cmpxchg_ptr(header, lockee->mark_addr(), lock) != lock) {
// restore object for the slow case
// (替換失敗,表示鎖已膨脹為重量級鎖,此時markword指向ObjectMonitor的地址)
most_recent->set_obj(lockee);
// (走重量級鎖的鎖退出流程)
CALL_VM(InterpreterRuntime::monitorexit(THREAD, most_recent), handle_exception);
}
}
UPDATE_PC_AND_TOS_AND_CONTINUE(1, -1);
}
most_recent++;
}
// Need to throw illegal monitor state exception
CALL_VM(InterpreterRuntime::throw_illegal_monitor_state_exception(THREAD), handle_exception);
ShouldNotReachHere();
}
重量級鎖
重量級鎖是基於monitor模型進行實現的。
重量級鎖是如何體現重量級的?
①:需要建立monitor,包含阻塞佇列、競爭佇列、繼承者、鎖擁有者等大量資料,會佔用大量記憶體。
②:需要呼叫作業系統對執行緒進行park、unpark操作,會涉及到cpu在使用者態和核心態之間切換,開銷大。
③:monitor所執行的VM執行緒(核心執行緒)需要作業系統將那些排程,耗費時間。
monitor的初始化
①:monitor並不是一下子初始化完成的。
②:monitor在初始化的過程中,如果有執行緒進來獲取鎖,則會進行自旋。
③:執行緒進入monitor後會被封裝成一個ObjectWaiter(雙向連結串列結構),然後park住當前執行緒。當有執行緒退出鎖後會進行unpark操作(喚醒操作涉及到作業系統,會產生額外的開銷)。
ObjectWaiter的結構:
class ObjectWaiter : public StackObj {
// ...
ObjectWaiter * volatile _next;
ObjectWaiter * volatile _prev;
Thread* _thread;
// ...
};
monitor的組成
volatile markOop _header; // displaced object header word - mark
void* volatile _object; // backward object pointer - strong root
void * volatile _owner; // pointer to owning thread OR BasicLock
volatile jlong _previous_owner_tid; // thread id of the previous owner of the monitor
volatile intptr_t _recursions; // recursion count, 0 for first entry
int OwnerIsThread ; // _owner is (Thread *) vs SP/BasicLock
ObjectWaiter * volatile _cxq ; // LL of recently-arrived threads blocked on entry.
ObjectWaiter * volatile _EntryList ; // Threads blocked on entry or reentry.
Thread * volatile _succ ; // Heir presumptive thread - used for futile wakeup throttling
volatile intptr_t _count;
volatile intptr_t _waiters; // number of waiting threads
ObjectWaiter * volatile _WaitSet; // LL of threads wait()ing on the monitor
monitor的工作流程
阻塞佇列中的執行緒進入_cxq、_EntryList佇列的過程有著不同的策略:
- policy == 0,頭插_EntryList
- policy == 1,尾插_EntryList
- policy == 2,頭插_cxq
- policy == 3,尾插_cxq
原始碼分析
加鎖第一階段
這部分程式碼並沒有建立monitor。
大部分工作是對鎖狀態做判斷、安全點的檢查,考慮無鎖、輕量級鎖的重入情況,因為鎖升級為重量級鎖就直接進核心態了,消耗資源太多。
InterpreterRuntime.cpp#monitorenter原始碼:
IRT_ENTRY_NO_ASYNC(void, InterpreterRuntime::monitorenter(JavaThread* thread, BasicObjectLock* elem))
#ifdef ASSERT
thread->last_frame().interpreter_frame_verify_monitor(elem);
#endif
if (PrintBiasedLockingStatistics) {
Atomic::inc(BiasedLocking::slow_path_entry_count_addr());
}
Handle h_obj(thread, elem->obj());
assert(Universe::heap()->is_in_reserved_or_null(h_obj()),
"must be NULL or an object");
// 開啟偏向鎖
if (UseBiasedLocking) {
// Retry fast entry if bias is revoked to avoid unnecessary inflation
ObjectSynchronizer::fast_enter(h_obj, elem->lock(), true, CHECK);
} else {
ObjectSynchronizer::slow_enter(h_obj, elem->lock(), CHECK);
}
assert(Universe::heap()->is_in_reserved_or_null(elem->obj()),
"must be NULL or an object");
#ifdef ASSERT
thread->last_frame().interpreter_frame_verify_monitor(elem);
#endif
IRT_END
主要還是看ObjectSynchronizer::fast_enter、ObjectSynchronizer::slow_enter,這部分原始碼在synchronizer.cpp中。
void ObjectSynchronizer::fast_enter(Handle obj, BasicLock* lock, bool attempt_rebias, TRAPS) {
// 開啟偏向鎖
if (UseBiasedLocking) {
if (!SafepointSynchronize::is_at_safepoint()) {
// 不在安全點(安全點指所有java執行緒都停在安全點,只有vm執行緒執行),需要撤銷並重偏向
BiasedLocking::Condition cond = BiasedLocking::revoke_and_rebias(obj, attempt_rebias, THREAD);
if (cond == BiasedLocking::BIAS_REVOKED_AND_REBIASED) {
return;
}
} else {
assert(!attempt_rebias, "can not rebias toward VM thread");
// 在安全點進行偏向鎖的撤銷
BiasedLocking::revoke_at_safepoint(obj);
}
assert(!obj->mark()->has_bias_pattern(), "biases should be revoked by now");
}
// 上述操作是要保證在進入重量級鎖之前鎖狀態應該處於輕量級鎖
slow_enter (obj, lock, THREAD) ;
}
/**
* slow enter
* 主要對鎖狀態做判斷,考慮無鎖、輕量級鎖的重入情況,因為鎖升級為重量級鎖就直接進核心態了,消耗資源太多。
* */
void ObjectSynchronizer::slow_enter(Handle obj, BasicLock* lock, TRAPS) {
markOop mark = obj->mark();
assert(!mark->has_bias_pattern(), "should not see bias pattern here");
// (mark word是無鎖狀態)
if (mark->is_neutral()) {
lock->set_displaced_header(mark);
if (mark == (markOop) Atomic::cmpxchg_ptr(lock, obj()->mark_addr(), mark)) {
TEVENT (slow_enter: release stacklock) ;
return ;
}
} else
// (如果是鎖重入)
if (mark->has_locker() && THREAD->is_lock_owned((address)mark->locker())) {
assert(lock != mark->locker(), "must not re-lock the same lock");
assert(lock != (BasicLock*)obj->mark(), "don't relock with same BasicLock");
lock->set_displaced_header(NULL);
return;
}
// markword的值設定為值為marked_value的markword(不能看起來無鎖,也不能看起來像持有偏向鎖、輕量級鎖的情況)
lock->set_displaced_header(markOopDesc::unused_mark());
// 膨脹為重量級鎖,enter方法後面進入重量級鎖的搶佔流程
ObjectSynchronizer::inflate(THREAD, obj())->enter(THREAD);
}
如果是進入fast_enter(),那麼就會再進行一次偏向鎖開啟的判斷,再進入slow_enter()的邏輯中去,那麼為什麼不開始就直接進行slow_enter呢?就為了判斷下鎖偏向和撤銷嗎?這部分邏輯也完全可以寫到slow_enter中去。這麼寫的原因未知。
加鎖第二階段
形成monitor,用來排程競爭鎖的執行緒。
先看鎖的膨脹過程:
ObjectMonitor * ATTR ObjectSynchronizer::inflate (Thread * Self, oop object) {
// 自旋
for (;;) {
const markOop mark = object->mark() ;
assert (!mark->has_bias_pattern(), "invariant") ;
// The mark can be in one of the following states:
// * Inflated - just return(膨脹完成,直接返回)
// * Stack-locked - coerce it to inflated(輕量級加鎖狀態)
// * INFLATING - busy wait for conversion to complete(膨脹中)
// * Neutral - aggressively inflate the object.(無鎖狀態)
// * BIASED - Illegal. We should never see this()(偏向鎖,非法,這裡不能出現)
// CASE: inflated
if (mark->has_monitor()) {
ObjectMonitor * inf = mark->monitor() ;
assert (inf->header()->is_neutral(), "invariant");
assert (inf->object() == object, "invariant") ;
assert (ObjectSynchronizer::verify_objmon_isinpool(inf), "monitor is invalid");
return inf ;
}
// 膨脹中,進行下一輪自旋
if (mark == markOopDesc::INFLATING()) {
TEVENT (Inflate: spin while INFLATING) ;
ReadStableMark(object) ;
continue ;
}
// 輕量級鎖狀態
if (mark->has_locker()) {
// 為當前執行緒分配一個monitor
ObjectMonitor * m = omAlloc (Self) ;
m->Recycle();
m->_Responsible = NULL ;
m->OwnerIsThread = 0 ;
m->_recursions = 0 ;
m->_SpinDuration = ObjectMonitor::Knob_SpinLimit ; // Consider: maintain by type/class
// CAS操作:嘗試將markword設定為INFLATING狀態,失敗進行下一輪自旋
markOop cmp = (markOop) Atomic::cmpxchg_ptr (markOopDesc::INFLATING(), object->mark_addr(), mark) ;
if (cmp != mark) {
omRelease (Self, m, true) ;
continue ; // Interference -- just retry
}
markOop dmw = mark->displaced_mark_helper() ;
assert (dmw->is_neutral(), "invariant") ;
m->set_header(dmw) ;
m->set_owner(mark->locker());
m->set_object(object);
guarantee (object->mark() == markOopDesc::INFLATING(), "invariant") ;
object->release_set_mark(markOopDesc::encode(m));
if (ObjectMonitor::_sync_Inflations != NULL) ObjectMonitor::_sync_Inflations->inc() ;
TEVENT(Inflate: overwrite stacklock) ;
if (TraceMonitorInflation) {
if (object->is_instance()) {
ResourceMark rm;
tty->print_cr("Inflating object " INTPTR_FORMAT " , mark " INTPTR_FORMAT " , type %s",
(void *) object, (intptr_t) object->mark(),
object->klass()->external_name());
}
}
return m ;
}
/**
* 走到這裡說明1:monitor 未膨脹完成 2:monitor不在膨脹過程中 3:鎖狀態也不是輕量級狀態
* 能走到這裡說明鎖狀態已經變為無鎖狀態了
*/
assert (mark->is_neutral(), "invariant");
ObjectMonitor * m = omAlloc (Self) ;
m->Recycle();
m->set_header(mark);
m->set_owner(NULL);
m->set_object(object);
m->OwnerIsThread = 1 ;
m->_recursions = 0 ;
m->_Responsible = NULL ;
m->_SpinDuration = ObjectMonitor::Knob_SpinLimit ; // consider: keep metastats by type/class
// (省略部分程式碼)
return m ;
}
}
ObjectSynchronizer::omAlloc的作用:
嘗試從執行緒的本地omFreeList 分配。執行緒將首先嚐試從其本地列表中分配,然後從全域性列表中,只有在那些嘗試失敗後,執行緒才會嘗試例項化新的監視器。執行緒本地空閒列表佔用 加熱 ListLock 並改善分配延遲,並減少共享全域性列表上的一致性流量。
總之我也沒看懂,大概就是分配一個monitor給該執行緒用...
加鎖第三階段
當monitor形成之後,執行緒是阻塞還是拿到鎖執行同步塊程式碼,就看執行緒自己的運氣了。
執行緒進入monitor:
void ATTR ObjectMonitor::EnterI (TRAPS) {
// 省略部分程式碼...
// 嘗試獲取鎖
if (TryLock (Self) > 0) {
return ;
}
DeferredInitialize () ;
// 不死心,再來一次
if (TrySpin (Self) > 0) {
return ;
}
ObjectWaiter node(Self) ;
Self->_ParkEvent->reset() ;
node._prev = (ObjectWaiter *) 0xBAD ;
node.TState = ObjectWaiter::TS_CXQ ;
ObjectWaiter * nxt ;
for (;;) {
// 頭插_cxq
node._next = nxt = _cxq ;
if (Atomic::cmpxchg_ptr (&node, &_cxq, nxt) == nxt) break ;
// 還來?
if (TryLock (Self) > 0) {
return ;
}
}
// 省略部分程式碼...
for (;;) {
if (TryLock (Self) > 0) break ;
assert (_owner != Self, "invariant") ;
if ((SyncFlags & 2) && _Responsible == NULL) {
Atomic::cmpxchg_ptr (Self, &_Responsible, NULL) ;
}
// park self
if (_Responsible == Self || (SyncFlags & 1)) {
TEVENT (Inflated enter - park TIMED) ;
Self->_ParkEvent->park ((jlong) RecheckInterval) ;
// Increase the RecheckInterval, but clamp the value.
RecheckInterval *= 8 ;
if (RecheckInterval > 1000) RecheckInterval = 1000 ;
} else {
TEVENT (Inflated enter - park UNTIMED) ;
Self->_ParkEvent->park() ;
}
// 喚醒後又可以進行搶鎖啦~
if (TryLock(Self) > 0) break ;
// 省略部分程式碼...
}
return ;
}
果然synchronized不是公平鎖,不過這也太不公平了。
解鎖第一階段
owner在退出持有鎖的時候,會根據monitor的QMode策略,決定繼承者的選取方式,選定繼承者之前owner仍然會持有鎖,以保證並行性。
void ATTR ObjectMonitor::exit(bool not_suspended, TRAPS) {
// 省略部分程式碼...
// 重入次數遞減至0
if (_recursions != 0) {
_recursions--; // this is simple recursive enter
TEVENT (Inflated exit - recursive) ;
return ;
}
if ((SyncFlags & 4) == 0) {
_Responsible = NULL ;
}
// 自旋
for (;;) {
// (...) 省略部分程式碼
ObjectWaiter * w = NULL ;
int QMode = Knob_QMode ;
// 繞過EntryList,直接從_cxq中喚醒執行緒作為下一個繼承者用於競爭鎖
if (QMode == 2 && _cxq != NULL) {
w = _cxq ;
assert (w != NULL, "invariant") ;
assert (w->TState == ObjectWaiter::TS_CXQ, "Invariant") ;
ExitEpilog (Self, w) ;
return ;
}
// 將_cxq佇列中的執行緒移到_EntryList尾部
if (QMode == 3 && _cxq != NULL) {
w = _cxq ;
for (;;) {
assert (w != NULL, "Invariant") ;
ObjectWaiter * u = (ObjectWaiter *) Atomic::cmpxchg_ptr (NULL, &_cxq, w) ;
if (u == w) break ;
w = u ;
}
assert (w != NULL , "invariant") ;
ObjectWaiter * q = NULL ;
ObjectWaiter * p ;
for (p = w ; p != NULL ; p = p->_next) {
guarantee (p->TState == ObjectWaiter::TS_CXQ, "Invariant") ;
p->TState = ObjectWaiter::TS_ENTER ;
p->_prev = q ;
q = p ;
}
ObjectWaiter * Tail ;
for (Tail = _EntryList ; Tail != NULL && Tail->_next != NULL ; Tail = Tail->_next) ;
if (Tail == NULL) {
_EntryList = w ;
} else {
// _EntryList 的tail的next執行_cxq的頭部
Tail->_next = w ;
w->_prev = Tail ;
}
}
// 將_cxq佇列中的執行緒移到_EntryList頭部
if (QMode == 4 && _cxq != NULL) {
// 如此可以保證最近競爭鎖執行緒處於_EntryList的頭部
w = _cxq ;
for (;;) {
assert (w != NULL, "Invariant") ;
ObjectWaiter * u = (ObjectWaiter *) Atomic::cmpxchg_ptr (NULL, &_cxq, w) ;
if (u == w) break ;
w = u ;
}
assert (w != NULL , "invariant") ;
ObjectWaiter * q = NULL ;
ObjectWaiter * p ;
for (p = w ; p != NULL ; p = p->_next) {
guarantee (p->TState == ObjectWaiter::TS_CXQ, "Invariant") ;
p->TState = ObjectWaiter::TS_ENTER ;
p->_prev = q ;
q = p ;
}
// 此時q為_cxq對了的tail執行緒
if (_EntryList != NULL) {
q->_next = _EntryList ;
_EntryList->_prev = q ;
}
_EntryList = w ;
}
// 若_EntryList不為空,QMode = 3 || QMode = 4 會喚醒_EntryList頭部執行緒作為下一位繼承者,並進行unpark操作
w = _EntryList ;
if (w != NULL) {
assert (w->TState == ObjectWaiter::TS_ENTER, "invariant") ;
ExitEpilog (Self, w) ;
return ;
}
w = _cxq ;
if (w == NULL) continue ;
/*
* 能走到這裡說明在這步採用執行緒進入_cxq佇列,前面的操作中_cxq和_EntryList都是空佇列
*/
for (;;) {
assert (w != NULL, "Invariant") ;
ObjectWaiter * u = (ObjectWaiter *) Atomic::cmpxchg_ptr (NULL, &_cxq, w) ;
if (u == w) break ;
w = u ;
}
TEVENT (Inflated exit - drain cxq into EntryList) ;
assert (w != NULL , "invariant") ;
assert (_EntryList == NULL , "invariant") ;
if (QMode == 1) {
ObjectWaiter * s = NULL ;
ObjectWaiter * t = w ;
ObjectWaiter * u = NULL ;
// 將_cxq佇列反轉,s為反轉之後的_cxq
while (t != NULL) {
guarantee (t->TState == ObjectWaiter::TS_CXQ, "invariant") ;
t->TState = ObjectWaiter::TS_ENTER ;
u = t->_next ;
t->_prev = u ;
t->_next = s ;
s = t;
t = u ;
}
// 將反轉倒序之後的_cxq放進_EntryList中
_EntryList = s ;
assert (s != NULL, "invariant") ;
} else {
// QMode == 0 or QMode == 2
_EntryList = w ;
ObjectWaiter * q = NULL ;
ObjectWaiter * p ;
// 將_cxq由單向連結串列轉為雙向連結串列
for (p = w ; p != NULL ; p = p->_next) {
guarantee (p->TState == ObjectWaiter::TS_CXQ, "Invariant") ;
p->TState = ObjectWaiter::TS_ENTER ;
p->_prev = q ;
q = p ;
}
}
if (_succ != NULL) continue;
w = _EntryList ;
if (w != NULL) {
guarantee (w->TState == ObjectWaiter::TS_ENTER, "invariant") ;
ExitEpilog (Self, w) ;
return ;
}
}
}
解鎖第二階段
喚醒繼承者,讓它去嘗試獲取鎖。
// 選取繼承者、喚醒繼承者佇列的頭部執行緒(程式碼就不看了):
void ObjectMonitor::ExitEpilog (Thread * Self, ObjectWaiter * Wakee) {
// Exit protocol:
// 1. ST _succ = wakee
// 2. membar #loadstore|#storestore;
// 2. ST _owner = NULL
// 3. unpark(wakee)
}
總結
1:無論偏向鎖、輕量級鎖、重量級鎖,都是可重入的。所以熟悉JAVA併發包的ReentrantLock重入鎖機制是有必要的。
2:只有重量級鎖需要作業系統去進行排程競爭鎖的執行緒。
3:偏向鎖的撤銷不是為了使鎖降級為無鎖狀態,而是需要先降級再轉變為輕量級鎖狀態。
4:偏向鎖的撤銷需要等待全域性安全點,且鎖撤銷有一定的開銷。所以在多執行緒競爭激烈的情況下,可以實現關閉偏向鎖來進行效能調優。
想看原始碼的看這些檔案。
其他優化
JDK1.6 對鎖的實現引入了大量的優化,如偏向鎖、輕量級鎖、自旋鎖、適應性自旋鎖、鎖消除、鎖粗化等技術來減少鎖操作的開銷。
①:適應性自旋
升級為重量級鎖之前,會嘗試自旋一定次數(預設10次,可通過引數-XX : PreBlockSpin
來更改)來延緩進入重量級鎖的過程。
優點:若真的成功則可以避免鎖升級,減少執行緒進入monitor從而帶來的一系列開銷。同時當前執行緒不會經歷掛起-喚醒的過程,可以更快響應。
缺點:會一直佔用cpu,若自旋失敗則是額外的浪費。
②:鎖粗化
將連在一起的加鎖、解鎖操作擴大範圍,只進行一次性加鎖、解鎖操作。
如:
Object lock = new Object();
List<String> list = new ArrayList();
synchronized(lock){
list.add("a");
}
synchronized(lock){
list.add("b");
}
synchronized(lock){
list.add("c");
}
優化為:
Object lock = new Object();
List<String> list = new ArrayList();
synchronized(lock){
list.add("a");
list.add("b");
list.add("c");
}
③:鎖消除
若當前執行緒建立的物件分配在堆,但不會被其他執行緒使用,那麼這段程式碼就可以不加鎖。
或者根據逃逸分析,當前執行緒new的物件不會被其他執行緒使用,那麼也不需要加鎖。
其他問題
①:當所狀態為偏向鎖時,如何儲存hashcode資訊?
若hashCode方法的呼叫是在物件已經處於偏向鎖狀態時呼叫,它的偏向狀態會被立即撤銷,並且鎖會升級為重量級鎖。
②:什麼執行緒複用?
兩個執行緒間隔5s啟動,markword中thread資訊一摸一樣這個現象實際上就是JVM執行緒複用。
本文參考文章:
①: 小米資訊部技術團隊-synchronized 實現原理
②:synchronized的jvm原始碼加鎖流程分析聊鎖的意義
③:Java物件的記憶體佈局
④:盤一盤 synchronized (二)—— 偏向鎖批量重偏向與批量撤銷
⑤:https://www.bbsmax.com/A/xl56qY9rJr/
⑥:Java併發程式設計:Synchronized底層優化(偏向鎖、輕量級鎖)
感觸:上網搜很難看到自己想要的內容,甚至有的文章還會起誤導性作用。果然還是要好好學習,厲害的大佬比比皆是。在效能調優上哪有什麼最優解,只有合適與不合適,重在選擇與取捨。