1. 程式人生 > >多執行緒之ReentrantLock篇(五)

多執行緒之ReentrantLock篇(五)

昨天有說過後面講ReentrantLock,今天我們這篇幅就全域性的講解下,我們在Lock出來前,解決併發問題沒得選只能用Synchronized。

一.ReentrantLock PK synchronized

      (1)synchronized是獨佔鎖,加鎖和解鎖的過程自動進行,易於操作,但不夠靈活。ReentrantLock也是獨佔鎖,加鎖和解鎖的過程需要手動進行,不易操作,但非常靈活。

      (2)synchronized可重入,因為加鎖和解鎖自動進行,不必擔心最後是否釋放鎖;ReentrantLock也可重入,但加鎖和解鎖需要手動進行,且次數需一樣,否則其他執行緒無法獲得鎖。

      (3)synchronized不可響應中斷,一個執行緒獲取不到鎖就一直等著;ReentrantLock可以相應中斷。

ReentrantLock好像比synchronized關鍵字沒好太多,我們再去看看synchronized所沒有的,一個最主要的就是ReentrantLock還可以實現公平鎖機制。什麼叫公平鎖呢?也就是在鎖上等待時間最長的執行緒將獲得鎖的使用權。通俗的理解就是誰排隊時間最長誰先執行獲取鎖。

 Lock介面的一些方法:

 

 

  1. lock():是最常用的獲取鎖的方法,若鎖被其他執行緒獲取,則等待(阻塞)。 
  2.  lockInterruptibly():獲取鎖,如果鎖可用則執行緒繼續執行;如果鎖不可用則執行緒進入阻塞狀態,此時可以在其它執行緒執行時呼叫這個執行緒的interrupt方法打斷它的阻塞狀態。
  3. tryLock():嘗試非阻塞地獲取鎖,立即返回。獲取成功返回true;獲取失敗返回false,但不會阻塞。 (這個方法比synchronized好)
  4. tryLock(long time, TimeUnit unit):阻塞嘗試鎖。引數代表時長,在指定時長內嘗試鎖。
  5. unlock():如果沒有獲取鎖標記就放鎖,會丟擲異常。

  二.  Lock實現類介紹

        1.ReentrantLock(重入鎖)

public class ReentrantLockDemo {

    private static int count=0;

    //重入鎖(如何實現的?)
    static Lock lock=new ReentrantLock();

    public static void inc(){
        lock.lock(); //獲得鎖(互斥鎖) ThreadA 獲得了鎖
        try {
            Thread.sleep(1);
            count++;
        } catch (InterruptedException e) {
            e.printStackTrace();
        }finally {
            lock.unlock();//釋放鎖 ThreadA釋放鎖  state=1-1=0
        }
    }

    public static void main(String[] args) throws InterruptedException {


        for (int i = 0; i < 1000; i++) {

            new Thread(()-> ReentrantLockDemo.inc()).start();
        }
        Thread.sleep(4000);
        System.out.println("result:"+count);
    }
}

  2.ReentrantReadWriteLock(重入讀寫鎖)

          讀多寫少的情況下,讀和讀不互斥,讀和寫互斥,寫和寫互斥

public class ReentrantReadWriteLockDemo {
    static Map<String,Object> cacheMap=new HashMap<>();
    static ReentrantReadWriteLock rwl=new ReentrantReadWriteLock();
    static Lock read=rwl.readLock();
    static Lock write=rwl.writeLock();

    public static Object get(String key){
        read.lock(); //讀鎖 ThreadA 阻塞
        try{
            return cacheMap.get(key);
        }finally {
            read.unlock(); //釋放讀鎖
        }
    }
    public static Object write(String key,Object value){
        write.lock(); //Other Thread 獲得了寫鎖
        try{
            return cacheMap.put(key,value);
        }finally {
            write.unlock();
        }
    }
}

 三.思考鎖的實現

      關於鎖我們講了很多,也寫了很多案例,下面我們就底層是怎麼實現鎖的機制來進行一個猜想設計然後帶著我們的猜想去看大佬們的原始碼是不是和我們的猜想一樣:       

        1.首先鎖的互斥的原理是多個執行緒訪問同一個共享資源只有一個能進去訪問,我們這裡要分析鎖的互斥特性是怎麼實現的:要實現互斥性首先我們要有一個共享變數,然後在設計時用一個狀態來標記共享資源的狀態(例如0,1)

        2.沒有搶佔到鎖的執行緒怎麼玩,沒有搶到鎖的執行緒就要阻塞等待,想到等待就很容易想起前面篇幅講的wait(等待、喚醒),但是這裡不是用wait因為wait/notify不能喚醒指定的執行緒,所以我們想到了另一個方案,LockSupport.park()

        3.等待中的執行緒是怎麼儲存的,這裡面想到的是雙向連結串列

        4.公平和非公平(能否插隊)

        5.鎖的重入的特性(識別是否是同一個執行緒)重入次數可以用數字累加

下面我們就lock.lock(); 是怎麼實現的進行深入分析下:l

   我們在多執行緒訪問lock.lock()方法時如果獲取lock許可權的執行緒就可以向下執行,沒有獲取許可權的執行緒就會阻塞,這個方向是大方向

 

 

 下面我們就lock.lock()方法裡面做了什麼事情,首先看到他呼叫了sync.lock();

 

 我們看下類的關係圖,其中ReentantLock是Lock的一個實現我們從下面關係圖片中可以看出ReentrantLockK中定義了一個sync

 

 我們可以看到Sync是一個靜態的抽像內部類,他繼承了AbstractQueuedSynchronizer

我 們回退到sync.lock();方法,他實現了兩種鎖,一種是共平鎖一種是非公平鎖,類關係圖如下

 

 在sync.lock()中預設是非公平鎖,那麼我們在sync.lock()中進入NonfairSync方法中,首先他進來第一件事是搶佔資源,在這裡的判斷compareAndSetState保證了多執行緒下的原子性,這裡的compareAndSetState判斷是採用了樂觀鎖機制來進行加鎖,在很多原始碼中都有用到CAS操作,其中expect是預期值,update是更改值,這個操作是直接跟記憶體互動,這樣做的好處是保證只有一個執行緒能進入,進入後操作setExclusiveOwnerThread(Thread.currentThread());儲存當前執行緒

 

 我們進入他的判斷方法共享資源compareAndSetState中看下他是怎麼修改預期值的,stateOffset是當前state屬性成員在記憶體中的偏移量,他會通過記憶體中的偏移量去拿到記憶體中的值 和我們的預期值對比,如果相等就修改,這裡面設計的好處是直接跟記憶體互動,不讓我們java程式碼操作,可以在java層面解決多執行緒問題

 

 

 

 上面圖片是執行緒搶佔成功的邏輯,其它執行緒搶佔失敗就走下面acquire(1)的邏輯了,這個acquire邏輯是由AQS來實現的;

  • ! tryAcquire(arg)
  • addWaiter 將未獲得鎖的執行緒加入到佇列
  • acquireQueued(); 去搶佔鎖或者阻塞. 

 

我們先看下tryAcquire(arg)的實現,我們選擇它NonfairSync實現

 

這下面的邏輯是繼續去搶佔鎖的邏輯,

 final boolean nonfairTryAcquire(int acquires) {
//獲取當前執行緒 final Thread current = Thread.currentThread();
//判斷其狀態 int c = getState();
//條件成立表示無鎖, if (c == 0) {
//無鎖的操作一定要變成CAS操作,因為修改本身存在原子性問題 if (compareAndSetState(0, acquires)) { setExclusiveOwnerThread(current); return true; } }
// 這裡面是判斷重入的,判斷當前執行緒和我們有鎖的執行緒是否相等 else if (current == getExclusiveOwnerThread()) {
//如果相等就加一個次數 int nextc = c + acquires; if (nextc < 0) // overflow throw new Error("Maximum lock count exceeded");
// 因為當前是有鎖狀態,所以不用再用CAS操作 setState(nextc); return true; } return false; }

  條件! tryAcquire(arg)不成立就會進入acquireQueued(addWaiter(Node.EXCLUSIVE), arg))判斷中來,將未獲得鎖的執行緒加入到佇列;addWaiter是做一個連結串列然後加入acquireQueued中進行迴圈的判斷;Node.EXCLUSIVE表示節點互斥的一個特性;我們進入addWaiter方法

private Node addWaiter(Node mode) {
//進來第一件事是先構造一個節點,這個節點會先把當前執行緒和mode(表示獨佔)傳進來,如果有多個執行緒沒有搶到鎖那就有多個執行緒進入這個方法,也就代表了有多個Node節點 Node node = new Node(Thread.currentThread(), mode); // Try the fast path of enq; backup to full enq on failure
//這裡會拿到一個tail節點,tail表示尾部節點,一般連結串列都會有一個頭節點Head和尾節點Tail,這一步的頭尾節點還沒有初始化,還是空指向 Node pred = tail;
if (pred != null) { node.prev = pred; if (compareAndSetTail(pred, node)) { pred.next = node; return node; } }
//第一次進來尾節點一定是空的,所以第一次進來是走enq方法 enq(node); return node; }

  我們進入enq(node)方法

private Node enq(final Node node) {
//通過自旋的方式進行FOR迴圈 for (;;) {
//得到一個尾節點,此時尾節點還是空 Node t = tail; if (t == null) { // Must initialize
//初始化一個空的Node節點,這個compareAndSetHead只有在空的情況下才會替換,CAS保證只有一個執行緒能替換成功 if (compareAndSetHead(new Node()))
//將頭和尾都指向這個剛剛初始化的空節點,到這一步的時序圖如圖一;這一步完成後初始化就完成了,然後進入下一次迴圈t就不為空了走else邏輯 tail = head; } else {
//node表示當前進來的執行緒,我們假設是B執行緒進來了,此時因為t不為空了,所以當前執行緒的prev指向空的Node節點 node.prev = t; if (compareAndSetTail(t, node)) {
//操作尾部節點t.next表示上一個節點的指向指向當前節點,這樣一個雙向連結串列就形成了,在多個for迴圈後的時序圖就如圖二 t.next = node; return t; } } } }

  

圖一

 圖二

 

addWaiter(Node.EXCLUSIVE), arg)程式碼執行完成後,他會把引數返回新增到acquireQueued裡面去,我們進入acquireQueued,這裡面一定會做的一件事就是阻塞列表中的執行緒

final boolean acquireQueued(final Node node, int arg) {
        boolean failed = true;
        try {
            boolean interrupted = false;
//又是自旋 for (;;) {
// 假設這裡面的node是我們執行緒B的話,他的predecessor()方法可以點進去看下,會發現是當前執行緒的prev,由上面時序圖會發現其實就是Head節點 final Node p = node.predecessor();
//如果頭節點是head節點就會去搶佔一次鎖,成功就獲得鎖,失敗走下面 if (p == head && tryAcquire(arg)) { setHead(node); p.next = null; // help GC failed = false; return interrupted; }
//是否要掛起一個執行緒,我們進入shouldParkAfterFailedAcquire方法 if (shouldParkAfterFailedAcquire(p, node) &&
//parkAndCheckInterrupt是掛起(阻塞) parkAndCheckInterrupt()) interrupted = true; } } finally { if (failed) cancelAcquire(node); } }

  阻塞狀態是沒有必要去搶佔鎖的,下面就是通過判斷是不是偏鎖狀態來決定要不要去釋放鎖,如果是偏鎖就釋放鎖

private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
//等待狀態,如果執行緒出現異常會出來偏鎖狀態 int ws = pred.waitStatus;
//SIGNAL是喚醒狀態成立就可以放心掛起(-1) if (ws == Node.SIGNAL) /* * This node has already set status asking a release * to signal it, so it can safely park. */ return true;
//偏鎖狀態ws會大於o if (ws > 0) { /* * Predecessor was cancelled. Skip over predecessors and * indicate retry. */ do {
//將取消狀態的移除節點 node.prev = pred = pred.prev; } while (pred.waitStatus > 0); pred.next = node; } else { /* * waitStatus must be 0 or PROPAGATE. Indicate that we * need a signal, but don't park yet. Caller will need to * retry to make sure it cannot acquire before parking. */
//替換節點狀態改成SIGNAL compareAndSetWaitStatus(pred, ws, Node.SIGNAL); } return false; }

  

 

 

 前面的掛起完成後代表lock.lock()方法執行完成了,接下來我們就講下lock.unlock()釋放鎖的過程,這時候釋放鎖是執行緒A來釋放鎖,我們來看lock.unlock()的ReentrantLock實現

 

public final boolean release(int arg) {
//進入tryRelease方法 if (tryRelease(arg)) {
Node h = head; if (h != null && h.waitStatus != 0)
//重置資訊完成後會通過下面方法進行喚醒阻塞執行緒 unparkSuccessor(h); return true; } return false; }

  

protected final boolean tryRelease(int releases) {
//將state恢復原有值 int c = getState() - releases; if (Thread.currentThread() != getExclusiveOwnerThread()) throw new IllegalMonitorStateException(); boolean free = false; if (c == 0) { free = true;
//如果剛好c==0就釋放執行緒並把執行緒清空,如圖三 setExclusiveOwnerThread(null); } setState(c); return free; }

  

 

 圖三

我們進入unparkSuccessor方法中

private void unparkSuccessor(Node node) {
        /*
         * If status is negative (i.e., possibly needing signal) try
         * to clear in anticipation of signalling.  It is OK if this
         * fails or if status is changed by waiting thread.
         */
        int ws = node.waitStatus;
//如果成立 if (ws < 0)
//先恢復成初始狀態 compareAndSetWaitStatus(node, ws, 0); /* * Thread to unpark is held in successor, which is normally * just the next node. But if cancelled or apparently null, * traverse backwards from tail to find the actual * non-cancelled successor. */
//獲取下一個節點 Node s = node.next;
//如果下個節點為空,則除去無效節點 if (s == null || s.waitStatus > 0) { s = null; for (Node t = tail; t != null && t != node; t = t.prev) if (t.waitStatus <= 0) s = t; }
if (s != null)
// 喚醒下一個節點,喚醒後的執行緒又要搶佔鎖又會進入前面的acquireQueued方法進行自旋,搶佔失敗的執行緒又要掛起
//喚醒完成後喚醒的線會去執行程式碼程式
LockSupport.unpark(s.thread);
}