1. 程式人生 > 其它 >Java併發學習-1-ReentrantLock

Java併發學習-1-ReentrantLock

背景

Doug Lea’s Home Page

如果IT的歷史,是以人為主體串接起來的話,那麼肯定少不了Doug Lea。這個鼻樑掛著眼鏡,留著德王威廉二世的鬍子,臉上永遠掛著謙遜靦腆笑容,服務於紐約州立大學Oswego分校計算機科學系的老大爺。

說他是這個世界上對Java影響力最大的個人,一點也不為過。因為兩次Java歷史上的大變革,他都間接或直接的扮演了舉足輕重的角色。一次是由JDK 1.1到JDK 1.2,JDK1.2很重要的一項新創舉就是Collections,其Collections的概念可以說承襲自Doug Lea於1995年釋出的第一個被廣泛應用的collections;一次是2004年所推出的Tiger。Tiger廣納了15項JSRs(Java Specification Requests)的語法及標準,其中一項便是JSR-166。JSR-166是來自於Doug編寫的util.concurrent包。(摘自百度)

用了很多年的jdk1.8,對java的執行緒還是不清不楚,準備用一點時間,看一下JUC的原始碼看一下 ,然後來理解一下AQS的設計。

AQS是什麼,Java中比較重要的是鎖的機制,鎖機制有兩種,一種是synchronized 一種是lock兩種,前面是java物件鎖機制實現的,另一種則是AQS的基礎上實現出來的。通過不使用鎖,而獲得更多鎖的功能,自然好好理解一下。

接下來我會通過不同場景的程式碼閱讀來理解一下AQS的不同側面。

1、Lock來了解一下AQS獨佔鎖模式

2、通過CountDownLatch來了解一下AQS的共享鎖模式

Lock來了解一下AQS獨佔鎖模式

閱讀原始碼是通過使用場景來進行驅動,從而更好的理解作者的思路,更快的理解程式碼,理解AQS目前主要從兩個角度

1、通過看原始碼的註釋,先做基本的瞭解

2、通過場景深入瞭解每行程式碼的意思

3、通過整體來理解程式碼的設計。

1、看註釋

ReentrantLock類結構

AbstractQueuedSynchronizer類結構

private static final Unsafe unsafe = Unsafe.getUnsafe();
private static final long stateOffset;
private static final long headOffset;
private static final long tailOffset;
private static final long waitStatusOffset;
private static final long nextOffset;

/**
  * Head of the wait queue, lazily initialized.  Except for
  * initialization, it is modified only via method setHead.  Note:
  * If head exists, its waitStatus is guaranteed not to be
  * CANCELLED.
  */
private transient volatile Node head;
/**
  * Tail of the wait queue, lazily initialized.  Modified only via
  * method enq to add new wait node.
  */
private transient volatile Node tail;
/**
  * The synchronization state.
  */
private volatile int state;
private transient Thread exclusiveOwnerThread;

最重要的是 Unsafe 和 Node 和 state,以及 Thread exclusiveOwnerThread,理解這幾個,理解unsafe可以參考Java魔法類:Unsafe應用解析

其中說的很詳細。

AQS中其中最重要的就是

是用Node實現的,等待佇列是“CLH”(Craig、Landin 和Hagersten) 鎖佇列。CLH 鎖通常用於自旋鎖。這裡的使用的是雙向連結串列

state來表示獲取執行緒的狀態

exclusiveOwnerThread ,儲存當前持有鎖的執行緒。

通過這三個屬性來實現了獨佔鎖,和公平鎖。

2、看程式碼

1、程式碼入口

這裡需要說一點的是,多執行緒競爭同一個鎖的時候,才能更好的理解這個模式,如果只有一個執行緒,一個鎖是不能很好理解AQS,為此了寫了下面的程式碼,方便帶入原始碼,同時可以debug幫助理解。主要的流程在test方法中。其中邏輯大體是先獲取鎖,然後休息60s,模擬執行邏輯,最後釋放鎖,使用10個執行緒來進行執行。

public class LockTest {
    private static Lock lock = new ReentrantLock();
    /**
     * LGTODO water 2021/7/7 下午7:23
     *  1、同一個鎖,才會競爭資源,所以每個方法new Lock 其實並不會衝突
     *  2、嘗試通過執行緒,執行一個test方法,外部new Lock 發現idea 不能正常執行測試,這種嘗試場景,不能實現,原因目前還不清楚?
     *  3、在一個類中有一個屬性是鎖,並且方法公用這個鎖,這種可以正常觸發鎖的搶佔
     *
     */
    public static void main(String[] args) {
        ExecutorService service = Executors.newFixedThreadPool(10);
//        Lock lock = new ReentrantLock();
        for (int i = 0;i<10;i++) {
            service.submit(new Runnable() {
                @Override
                public void run() {
                    test();
                }
            });
        }
        service.shutdown();
    }

    public static void test(){
        lock.lock();
        try {
            System.out.println(Thread.currentThread().getName() + " 111 do something");
            System.out.println(lock.toString() + " 111 do something");
            TimeUnit.SECONDS.sleep(60);
            System.out.println(Thread.currentThread().getName() + " do something end");
        }catch (Exception e) {
            System.out.println("error ");
        }finally {
            lock.unlock();
        }
    }
}

2、嘗試獲得鎖

final void lock() {
    // LGTODO cas 判斷當前狀態是否是 0 ,是則獲取鎖,同時更新為1
    //  如果是多個執行緒來,compareAndSetState 能原子的處理,所以只有一個執行緒可以獲得鎖
    if (compareAndSetState(0, 1))
        setExclusiveOwnerThread(Thread.currentThread());
    else
        // LGTODO 如果鎖被佔用,則走下面,嘗試獲取
        acquire(1);
}

第一個執行緒獲得成功,並且60s不釋放,則第二個執行緒只能嘗試獲取鎖acquire(1);

public final void acquire(int arg) {
    // LGTODO tryAcquire 有執行緒佔用 返回 false
    // LGTODO acquireQueued
    // LGTODO addWaiter() ,新增獨佔節點
    if (!tryAcquire(arg) &&
        acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
        selfInterrupt();
}
// 1、執行 addWaiter(Node.EXCLUSIVE), arg)
/**
  * LGTODO water 2021/7/8 下午3:12
  *  第一次執行緒進入,tail = null  直接進入enq
  *  第二次執行緒進入,tail != null ,第二節點的prev 就是tail 是空節點,空節點的下一個節點是新新增節點
  */
private Node addWaiter(Node mode) {
  Node node = new Node(Thread.currentThread(), mode);
  // Try the fast path of enq; backup to full enq on failure
  Node pred = tail;
  // todo 第一次進入,tail = null, 直接enq
  // todo 第二次進入 tail 有數值是第一執行緒節點Node ,將第二次進入的tail新增到tail ,第一次執行緒節點為其頭節點
  if (pred != null) {
    node.prev = pred;
    if (compareAndSetTail(pred, node)) {
      pred.next = node;
      return node;
    }
  }
  enq(node);
  return node;
}

// enq 是用來初始化連結串列的,空節點是頭節點,第二個執行緒是尾節點,形成雙向連結串列
/**
     * LGTODO water 2021/7/8 下午3:46
     *  第一次進入的節點,
     *  for 迴圈一次 tail == null 空節點設定給tail 和 head
     *  for 迴圈二次 tail !=null 這次傳入節點的,前節點 是空的new node,並設定空節點的next 是傳入節點,然後迴圈停止
     *  頭節點是空節點,next是新傳入的節點,
     *  注意:compareAndSetHead 設定頭節點 compareAndSetTail 設定了尾節點
     *  head 和 tail 都是 transient的 設定的時候使用unsafe 保證不用加鎖
     *
     */
private Node enq(final Node node) {
  for (;;) {
    Node t = tail;
    if (t == null) { // Must initialize
      /**
                 * LGTODO water 2021/7/8 下午3:43
                 *  猜測 compareAndSetHead 將new NOde 設定給head
                 *  head 設定給 tail
                 */
      if (compareAndSetHead(new Node()))
        tail = head;
    	} else {
        node.prev = t;
        // todo compareAndSetTail  t 和node 設定尾部節點
        if (compareAndSetTail(t, node)) {
          t.next = node;
          return t;
        }
    }
  }
}


// 2、執行 acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
// 假如第一個執行緒仍然持有鎖,那麼第二個執行緒,進入到這裡,node
final boolean acquireQueued(final Node node, int arg) {
  boolean failed = true;
  try {
    boolean interrupted = false;
    for (;;) {
      // todo 第一次進入 node 的前節點是空節點,但是node節點是第二個執行緒節點
      // 所以 p == head ,但是 tryAcquire = false ,所以執行 shouldParkAfterFailedAcquire 將waitstus = -1
      // 然後 進入 parkAndCheckInterrupt 進行等待
      final Node p = node.predecessor();
      if (p == head && tryAcquire(arg)) {
        setHead(node);
        p.next = null; // help GC
        failed = false;
        return interrupted;
      }
      // todo shouldParkAfterFailedAcquire = true && LockSupport.park(this); park
      if (shouldParkAfterFailedAcquire(p, node) &&
          parkAndCheckInterrupt())
        interrupted = true;
    }
  } finally {
    if (failed)
      cancelAcquire(node);
  }
}

3、釋放鎖

執行緒一釋放鎖,重發執行緒二獲得鎖

/**
 * LGTODO water 2021/7/17 下午3:15
 *  1、釋放鎖 tryRelease 修改狀態為 0
 *  2、釋放成功,則 unparkSuccessor(h) 喚起等待的執行緒;
 */
public final boolean release(int arg) {
    if (tryRelease(arg)) {
        Node h = head;
        if (h != null && h.waitStatus != 0)
            unparkSuccessor(h);
        return true;
    }
    return false;
}

喚醒等待佇列中的執行緒:LockSupport.unpark(s.thread);

private void unparkSuccessor(Node node) {
    /*
     * If status is negative (i.e., possibly needing signal) try
     * to clear in anticipation of signalling.  It is OK if this
     * fails or if status is changed by waiting thread.
     */
    int ws = node.waitStatus;
    if (ws < 0)
        compareAndSetWaitStatus(node, ws, 0);

    /*
     * Thread to unpark is held in successor, which is normally
     * just the next node.  But if cancelled or apparently null,
     * traverse backwards from tail to find the actual
     * non-cancelled successor.
     */
    Node s = node.next;
    if (s == null || s.waitStatus > 0) {
        s = null;
        for (Node t = tail; t != null && t != node; t = t.prev)
            if (t.waitStatus <= 0)
                s = t;
    }
    if (s != null)
        LockSupport.unpark(s.thread);
}

以上就是全部流程,總結如下

3、整體看

1、使用state不僅用於獲取鎖,同時可以鎖的重入,獲取一層鎖加1

2、雙向連結串列的使用,如果有多個等待執行緒都會按照順序排隊,並且按照順序獲取鎖,當然這並不是說能保證鎖的順序,先到先得,只是排入佇列的先得到。

3、clh的自選鎖,並沒有一直的自旋,沒有一直佔用資源。

4、使用volatile的保證了節點的可見性,同時使用unsafe中的compareAndSetWaitStatus等各種方法,來保證了原子性,從而實現執行緒安全

留一個問題?

1、為什麼使用實現java.io.Serializable 同時 使用 transient ?

參考

1、Java AQS 核心資料結構 -CLH 鎖

https://www.infoq.cn/article/BVPvyVxjKM8ZSTSpTi0L

2、Java魔法類:Unsafe應用解析