Java併發學習-1-ReentrantLock
背景
如果IT的歷史,是以人為主體串接起來的話,那麼肯定少不了Doug Lea。這個鼻樑掛著眼鏡,留著德王威廉二世的鬍子,臉上永遠掛著謙遜靦腆笑容,服務於紐約州立大學Oswego分校計算機科學系的老大爺。
說他是這個世界上對Java影響力最大的個人,一點也不為過。因為兩次Java歷史上的大變革,他都間接或直接的扮演了舉足輕重的角色。一次是由JDK 1.1到JDK 1.2,JDK1.2很重要的一項新創舉就是Collections,其Collections的概念可以說承襲自Doug Lea於1995年釋出的第一個被廣泛應用的collections;一次是2004年所推出的Tiger。Tiger廣納了15項JSRs(Java Specification Requests)的語法及標準,其中一項便是JSR-166。JSR-166是來自於Doug編寫的util.concurrent包。(摘自百度)
用了很多年的jdk1.8,對java的執行緒還是不清不楚,準備用一點時間,看一下JUC的原始碼看一下 ,然後來理解一下AQS的設計。
AQS是什麼,Java中比較重要的是鎖的機制,鎖機制有兩種,一種是synchronized 一種是lock兩種,前面是java物件鎖機制實現的,另一種則是AQS的基礎上實現出來的。通過不使用鎖,而獲得更多鎖的功能,自然好好理解一下。
接下來我會通過不同場景的程式碼閱讀來理解一下AQS的不同側面。
1、Lock來了解一下AQS獨佔鎖模式
2、通過CountDownLatch來了解一下AQS的共享鎖模式
Lock來了解一下AQS獨佔鎖模式
閱讀原始碼是通過使用場景來進行驅動,從而更好的理解作者的思路,更快的理解程式碼,理解AQS目前主要從兩個角度
1、通過看原始碼的註釋,先做基本的瞭解
2、通過場景深入瞭解每行程式碼的意思
3、通過整體來理解程式碼的設計。
1、看註釋
ReentrantLock類結構
AbstractQueuedSynchronizer類結構
private static final Unsafe unsafe = Unsafe.getUnsafe(); private static final long stateOffset; private static final long headOffset; private static final long tailOffset; private static final long waitStatusOffset; private static final long nextOffset; /** * Head of the wait queue, lazily initialized. Except for * initialization, it is modified only via method setHead. Note: * If head exists, its waitStatus is guaranteed not to be * CANCELLED. */ private transient volatile Node head; /** * Tail of the wait queue, lazily initialized. Modified only via * method enq to add new wait node. */ private transient volatile Node tail; /** * The synchronization state. */ private volatile int state; private transient Thread exclusiveOwnerThread;
最重要的是 Unsafe 和 Node 和 state,以及 Thread exclusiveOwnerThread,理解這幾個,理解unsafe可以參考Java魔法類:Unsafe應用解析
其中說的很詳細。
AQS中其中最重要的就是
是用Node實現的,等待佇列是“CLH”(Craig、Landin 和Hagersten) 鎖佇列。CLH 鎖通常用於自旋鎖。這裡的使用的是雙向連結串列
state來表示獲取執行緒的狀態
exclusiveOwnerThread ,儲存當前持有鎖的執行緒。
通過這三個屬性來實現了獨佔鎖,和公平鎖。
2、看程式碼
1、程式碼入口
這裡需要說一點的是,多執行緒競爭同一個鎖的時候,才能更好的理解這個模式,如果只有一個執行緒,一個鎖是不能很好理解AQS,為此了寫了下面的程式碼,方便帶入原始碼,同時可以debug幫助理解。主要的流程在test方法中。其中邏輯大體是先獲取鎖,然後休息60s,模擬執行邏輯,最後釋放鎖,使用10個執行緒來進行執行。
public class LockTest {
private static Lock lock = new ReentrantLock();
/**
* LGTODO water 2021/7/7 下午7:23
* 1、同一個鎖,才會競爭資源,所以每個方法new Lock 其實並不會衝突
* 2、嘗試通過執行緒,執行一個test方法,外部new Lock 發現idea 不能正常執行測試,這種嘗試場景,不能實現,原因目前還不清楚?
* 3、在一個類中有一個屬性是鎖,並且方法公用這個鎖,這種可以正常觸發鎖的搶佔
*
*/
public static void main(String[] args) {
ExecutorService service = Executors.newFixedThreadPool(10);
// Lock lock = new ReentrantLock();
for (int i = 0;i<10;i++) {
service.submit(new Runnable() {
@Override
public void run() {
test();
}
});
}
service.shutdown();
}
public static void test(){
lock.lock();
try {
System.out.println(Thread.currentThread().getName() + " 111 do something");
System.out.println(lock.toString() + " 111 do something");
TimeUnit.SECONDS.sleep(60);
System.out.println(Thread.currentThread().getName() + " do something end");
}catch (Exception e) {
System.out.println("error ");
}finally {
lock.unlock();
}
}
}
2、嘗試獲得鎖
final void lock() {
// LGTODO cas 判斷當前狀態是否是 0 ,是則獲取鎖,同時更新為1
// 如果是多個執行緒來,compareAndSetState 能原子的處理,所以只有一個執行緒可以獲得鎖
if (compareAndSetState(0, 1))
setExclusiveOwnerThread(Thread.currentThread());
else
// LGTODO 如果鎖被佔用,則走下面,嘗試獲取
acquire(1);
}
第一個執行緒獲得成功,並且60s不釋放,則第二個執行緒只能嘗試獲取鎖acquire(1);
public final void acquire(int arg) {
// LGTODO tryAcquire 有執行緒佔用 返回 false
// LGTODO acquireQueued
// LGTODO addWaiter() ,新增獨佔節點
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
// 1、執行 addWaiter(Node.EXCLUSIVE), arg)
/**
* LGTODO water 2021/7/8 下午3:12
* 第一次執行緒進入,tail = null 直接進入enq
* 第二次執行緒進入,tail != null ,第二節點的prev 就是tail 是空節點,空節點的下一個節點是新新增節點
*/
private Node addWaiter(Node mode) {
Node node = new Node(Thread.currentThread(), mode);
// Try the fast path of enq; backup to full enq on failure
Node pred = tail;
// todo 第一次進入,tail = null, 直接enq
// todo 第二次進入 tail 有數值是第一執行緒節點Node ,將第二次進入的tail新增到tail ,第一次執行緒節點為其頭節點
if (pred != null) {
node.prev = pred;
if (compareAndSetTail(pred, node)) {
pred.next = node;
return node;
}
}
enq(node);
return node;
}
// enq 是用來初始化連結串列的,空節點是頭節點,第二個執行緒是尾節點,形成雙向連結串列
/**
* LGTODO water 2021/7/8 下午3:46
* 第一次進入的節點,
* for 迴圈一次 tail == null 空節點設定給tail 和 head
* for 迴圈二次 tail !=null 這次傳入節點的,前節點 是空的new node,並設定空節點的next 是傳入節點,然後迴圈停止
* 頭節點是空節點,next是新傳入的節點,
* 注意:compareAndSetHead 設定頭節點 compareAndSetTail 設定了尾節點
* head 和 tail 都是 transient的 設定的時候使用unsafe 保證不用加鎖
*
*/
private Node enq(final Node node) {
for (;;) {
Node t = tail;
if (t == null) { // Must initialize
/**
* LGTODO water 2021/7/8 下午3:43
* 猜測 compareAndSetHead 將new NOde 設定給head
* head 設定給 tail
*/
if (compareAndSetHead(new Node()))
tail = head;
} else {
node.prev = t;
// todo compareAndSetTail t 和node 設定尾部節點
if (compareAndSetTail(t, node)) {
t.next = node;
return t;
}
}
}
}
// 2、執行 acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
// 假如第一個執行緒仍然持有鎖,那麼第二個執行緒,進入到這裡,node
final boolean acquireQueued(final Node node, int arg) {
boolean failed = true;
try {
boolean interrupted = false;
for (;;) {
// todo 第一次進入 node 的前節點是空節點,但是node節點是第二個執行緒節點
// 所以 p == head ,但是 tryAcquire = false ,所以執行 shouldParkAfterFailedAcquire 將waitstus = -1
// 然後 進入 parkAndCheckInterrupt 進行等待
final Node p = node.predecessor();
if (p == head && tryAcquire(arg)) {
setHead(node);
p.next = null; // help GC
failed = false;
return interrupted;
}
// todo shouldParkAfterFailedAcquire = true && LockSupport.park(this); park
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}
3、釋放鎖
執行緒一釋放鎖,重發執行緒二獲得鎖
/**
* LGTODO water 2021/7/17 下午3:15
* 1、釋放鎖 tryRelease 修改狀態為 0
* 2、釋放成功,則 unparkSuccessor(h) 喚起等待的執行緒;
*/
public final boolean release(int arg) {
if (tryRelease(arg)) {
Node h = head;
if (h != null && h.waitStatus != 0)
unparkSuccessor(h);
return true;
}
return false;
}
喚醒等待佇列中的執行緒:LockSupport.unpark(s.thread);
private void unparkSuccessor(Node node) {
/*
* If status is negative (i.e., possibly needing signal) try
* to clear in anticipation of signalling. It is OK if this
* fails or if status is changed by waiting thread.
*/
int ws = node.waitStatus;
if (ws < 0)
compareAndSetWaitStatus(node, ws, 0);
/*
* Thread to unpark is held in successor, which is normally
* just the next node. But if cancelled or apparently null,
* traverse backwards from tail to find the actual
* non-cancelled successor.
*/
Node s = node.next;
if (s == null || s.waitStatus > 0) {
s = null;
for (Node t = tail; t != null && t != node; t = t.prev)
if (t.waitStatus <= 0)
s = t;
}
if (s != null)
LockSupport.unpark(s.thread);
}
以上就是全部流程,總結如下
3、整體看
1、使用state不僅用於獲取鎖,同時可以鎖的重入,獲取一層鎖加1
2、雙向連結串列的使用,如果有多個等待執行緒都會按照順序排隊,並且按照順序獲取鎖,當然這並不是說能保證鎖的順序,先到先得,只是排入佇列的先得到。
3、clh的自選鎖,並沒有一直的自旋,沒有一直佔用資源。
4、使用volatile的保證了節點的可見性,同時使用unsafe中的compareAndSetWaitStatus等各種方法,來保證了原子性,從而實現執行緒安全
留一個問題?
1、為什麼使用實現java.io.Serializable 同時 使用 transient ?
參考
1、Java AQS 核心資料結構 -CLH 鎖