1. 程式人生 > >JAVA 併發類(六) ReentrantLock 原始碼分析

JAVA 併發類(六) ReentrantLock 原始碼分析

ReentrantLock實現了Lock介面,是一種遞迴無阻塞的同步機制。

首先ReentrantLock有公平鎖和非公平鎖機制,使用構造方法ReentrantLock(boolean)指定,預設是非公平鎖。

公平鎖的含義是執行緒按照發出請求的先後順序獲取鎖,而非公平鎖的機制下,執行緒可以插隊,後到的執行緒可以直接跳到佇列頭部獲得鎖

================================================

ps:前天看到有同學提問為什麼非公平鎖的效率比公平鎖的效率高?
原因是在恢復一個被掛起的執行緒到該執行緒真正執行之間存在著延遲

舉個栗子,假設執行緒A持有鎖,執行緒B請求該鎖,由於鎖已經被佔用,B被阻塞掛起,知道A釋放鎖時才會喚醒B,然後B才會重新嘗試去獲取該鎖。但是此時,如果執行緒C也請求這個鎖,那麼如果是非公平鎖的話,C會在B被完全喚醒之前或者使用以及釋放該鎖,這樣B獲得鎖的時間沒有推遲,而C也更早的獲得了鎖,並且吞吐量提高了,當然這是在C持有鎖時間較短的情況下,如果執行緒持有鎖的情況較長應該使用公平鎖,OVER

=================================================

繼續:
ReentrantLock內有三個內部類,分別是Sync,FairSync和NoFairSync,其中FairSync和NoFairSync是Sync的子類。

/**
 * 該鎖同步控制的一個基類.下邊有兩個子類:非公平機制和公平機制.使用了AbstractQueuedSynchronizer類的
 */
static abstract class Sync extends AbstractQueuedSynchronizer

/**
 * 非公平鎖同步器
 */
final static class
NonfairSync extends Sync
/** * 公平鎖同步器 */ final static class FairSync extends Sync

因此使用ReentrantLock的加鎖lock操作時,又分為了兩種情況,分別是公平鎖的lock和非公平鎖的lock;

非公平鎖的核心思想

1.基於CAS嘗試把鎖數量從0設定為1
2.如果設定成功,當前執行緒為獨佔鎖的執行緒
3.設定失敗,會再次嘗試獲取鎖數量
4.如果鎖數量為0,再基於CAS嘗試把鎖數量從0設定為1,如果設定成功,當前執行緒為獨佔鎖的執行緒
5.如果鎖數量不為0,檢視當前執行緒是不是已經是獨佔鎖的執行緒了,如果是,則將當前的鎖數量+1;如果不是,則將該執行緒封裝在一個Node內,並加入到等待佇列中去。等待被其前一個執行緒節點喚醒。

非公平鎖原始碼分析

final void lock() {
   if (compareAndSetState(0, 1))
        setExclusiveOwnerThread(Thread.currentThread());
    else
        acquire(1);
}

首先通過CAS操作嘗試把鎖數量從0設定為1,如果成功,當前執行緒為獨佔鎖的執行緒,否則執行一個acquire(1)方法

我們下面看acquire(1)的具體實現

 public final void acquire(int arg) {
     if (!tryAcquire(arg) &&
         acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
         selfInterrupt();
 }

先呼叫了tryAcquire方法 看非公平鎖中的具體實現

final boolean nonfairTryAcquire(int acquires) {
    final Thread current = Thread.currentThread();
    int c = getState();
    if (c == 0) {
        if (compareAndSetState(0, acquires)) {
            setExclusiveOwnerThread(current);
            return true;
        }
    }
    else if (current == getExclusiveOwnerThread()) {
        int nextc = c + acquires;
        if (nextc < 0) // overflow
            throw new Error("Maximum lock count exceeded");
        setState(nextc);
        return true;
    }
    return false;
}

再次獲取當前的鎖數量,如果鎖數量為0,表示沒有執行緒持有鎖,那麼再嘗試把當前state設定為1,如果設定成功,那麼當前執行緒就持有該鎖,返回true;
鎖數量不為0話,意味著有執行緒持有該鎖,判斷是否是當前執行緒持有[可重入],是的話把當前鎖數量+1,否則的話返回false.
此時tryAcquire(1)方法返回false,會繼續執行 acquireQueued(addWaiter(Node.EXCLUSIVE), arg)方法,而又先呼叫了addWaiter方法

private Node addWaiter(Node mode) {
 Node node = new Node(Thread.currentThread(), mode);
 // Try the fast path of enq; backup to full enq on failure
 Node pred = tail;
 if (pred != null) {
     node.prev = pred;
     if (compareAndSetTail(pred, node)) {
         pred.next = node;
         return node;
     }
 }
 enq(node);
 return node;
}

這個方法把當前執行緒封裝成一個Node物件,然後把Node加入等待佇列。可以看到程式碼上的註釋寫著:先嚐試快速入隊,快速入隊失敗就嘗試普通入隊方法
根據程式碼看到,快速入隊就是使用CAS操作嘗試把尾節點tail設定成node,並把之前的尾節點插入到node之前
而普通入隊方式是

private Node enq(final Node node) {
for (;;) {
      Node t = tail;
      if (t == null) { // Must initialize
          if (compareAndSetHead(new Node()))
              tail = head;
      } else {
          node.prev = t;
          if (compareAndSetTail(t, node)) {
              t.next = node;
              return t;
          }
      }
  }
}

如果尾節點為空,進行初始化,如果非空,就一直CAS迴圈到node插入到隊尾為止

佇列入隊成功後返回node節點,我們再回到acquireQueued方法

final boolean acquireQueued(final Node node, int arg) {
    boolean failed = true;
    try {
        boolean interrupted = false;
        for (;;) {
            final Node p = node.predecessor();
            if (p == head && tryAcquire(arg)) {
                setHead(node);
                p.next = null; // help GC
                failed = false;
                return interrupted;
            }
            if (shouldParkAfterFailedAcquire(p, node) &&
                parkAndCheckInterrupt())
                interrupted = true;
        }
    } finally {
        if (failed)
            cancelAcquire(node);
    }
}

獲取node的前驅節點p,如果p是頭節點,就繼續使用tryAcquire(1)方法去嘗試請求成功,如果第一次請求就成功,不用中斷自己的執行緒,如果是之後的迴圈中將執行緒掛起之後又請求成功了,使用selfInterrupt()中斷自己

如果p不是頭節點,或者tryAcquire(1)請求不成功,就去執行shouldParkAfterFailedAcquire(Node pred, Node node)來檢測當前節點是不是可以安全的被掛起,如果node的前驅節點pred的等待狀態是SIGNAL(即可以喚醒下一個節點的執行緒),則node節點的執行緒可以安全掛起。如果node的前驅節點pred的等待狀態是CANCELLED,則pred的執行緒被取消了,我們會將pred之前的連續幾個被取消的前驅節點從佇列中剔除,返回false。

如果node的前驅節點pred的等待狀態是除了上述兩種的其他狀態,則使用CAS嘗試將前驅節點的等待狀態設為SIGNAL,並返回false(因為CAS可能會失敗,這裡不管失敗與否,都返回false,下一次執行該方法的之後,pred的等待狀態就是SIGNAL了)

如果可以安全掛起,就執行parkAndCheckInterrupt()掛起當前執行緒

最後,直到該節點的前驅節點p之前的所有節點都執行完畢為止,我們的p成為了頭節點,並且tryAcquire(1)請求成功,跳出迴圈,去執行

看下Node節點的具體結構

/**
* 同步等待佇列(雙向連結串列)中的節點
 */
static final class Node {
    /** 執行緒被取消了 */
    static final int CANCELLED = 1;
    /** 
     * 如果前驅節點的等待狀態是SIGNAL,表示當前節點將來可以被喚醒,那麼當前節點就可以安全的掛起了 
     * 否則,當前節點不能掛起 
     */
    static final int SIGNAL = -1;
    /**執行緒正在等待條件*/
    static final int CONDITION = -2;
    /**
     * waitStatus value to indicate the next acquireShared should
     * unconditionally propagate
     */
    static final int PROPAGATE = -3;
    /** Marker to indicate a node is waiting in shared mode */
    static final Node SHARED = new Node();
    /** 一個標記:用於表明該節點正在獨佔鎖模式下進行等待 */
    static final Node EXCLUSIVE = null;
    //值就是前四個int(CANCELLED/SIGNAL/CONDITION/PROPAGATE),再加一個0

    volatile int waitStatus;
    /**前驅節點*/
    volatile Node prev;

    /**後繼節點*/
    volatile Node next;

    /**節點中的執行緒*/
    volatile Thread thread;

    /**
     * Link to next node waiting on condition, or the special value SHARED.
     * Because condition queues are accessed only when holding in exclusive
     * mode, we just need a simple linked queue to hold nodes while they are
     * waiting on conditions. They are then transferred to the queue to
     * re-acquire. And because conditions can only be exclusive, we save a
     * field by using special value to indicate shared mode.
     */
    Node nextWaiter;

    /**
     * Returns true if node is waiting in shared mode
     */
    final boolean isShared() {
        return nextWaiter == SHARED;
    }

    /**
     * 返回該節點前一個節點
     */
    final Node predecessor() throws NullPointerException {
        Node p = prev;
        if (p == null)
            throw new NullPointerException();
        else
            return p;
    }

    Node() { // Used to establish initial head or SHARED marker
    }

    Node(Thread thread, Node mode) { // 用於addWaiter中
        this.nextWaiter = mode;
        this.thread = thread;
    }

    Node(Thread thread, int waitStatus) { // Used by Condition
        this.waitStatus = waitStatus;
        this.thread = thread;
    }
}

公平鎖的核心

獲取一次鎖數量

1.如果鎖數量為0,如果當前執行緒是等待佇列中的頭節點,基於CAS嘗試將state(鎖數量)從0設定為1一次,如果設定成功,設定當前執行緒為獨佔鎖的執行緒;

2.如果鎖數量不為0或者當前執行緒不是等待佇列中的頭節點或者上邊的嘗試又失敗了,檢視當前執行緒是不是已經是獨佔鎖的執行緒了,如果是,則將當前的鎖數量+1;如果不是,則將該執行緒封裝在一個Node內,並加入到等待佇列中去。等待被其前一個執行緒節點喚醒。


        /**
         * 獲取公平鎖的方法
         * 1)獲取鎖數量c
         * 1.1)如果c==0,如果當前執行緒是等待佇列中的頭節點,使用CAS將state(鎖數量)從0設定為1,如果設定成功,當前執行緒獨佔鎖-->請求成功
         * 1.2)如果c!=0,判斷當前的執行緒是不是就是當下獨佔鎖的執行緒,如果是,就將當前的鎖數量狀態值+1(這也就是可重入鎖的名稱的來源)-->請求成功
         * 最後,請求失敗後,將當前執行緒鏈入隊尾並掛起,之後等待被喚醒。
         */
        protected final boolean tryAcquire(int acquires) {
            final Thread current = Thread.currentThread();
            int c = getState();
            if (c == 0) {
                if (isFirst(current) && compareAndSetState(0, acquires)) {
                    setExclusiveOwnerThread(current);
                    return true;
                }
            }
            else if (current == getExclusiveOwnerThread()) {
                int nextc = c + acquires;
                if (nextc < 0)
                    throw new Error("Maximum lock count exceeded");
                setState(nextc);
                return true;
            }
            return false;
        }

如果請求失敗也是把執行緒放入隊尾掛起等待喚醒,和非公平鎖一樣

總結二者的區別

公平鎖的lock加鎖少了插隊部分(CAS嘗試把state從0設定為1從而獲得鎖的過程)
公平鎖的tryAcquire有判斷當前執行緒是否在等待佇列隊首的邏輯
ReentrantLock是基於AbstractQueuedSynchronizer實現的,AbstractQueuedSynchronizer可以實現獨佔鎖也可以實現共享鎖,ReentrantLock只是使用了其中的獨佔鎖模式

前面介紹了ReentrantLock的公平鎖和非公鎖的加鎖機制,下面再簡單說下解鎖機制。
首先解鎖的流程:
1.獲取當前的鎖數量,然後用這個鎖數量減去解鎖的數量(1),最後得出結果c
2.判斷當前執行緒是不是獨佔鎖的執行緒,如果不是,丟擲異常
3.如果c為0,表示鎖被成功釋放,把當前獨佔的執行緒設定為null,鎖數量設定為0,返回true
4.如果c不為0,說明鎖釋放失敗,鎖數量設定為c,返回false
5.如果鎖被釋放成功的話,喚醒距離頭結點最近的一個非取消的節點

 public void unlock() {
   sync.release(1);
 }

呼叫的release方法,目的是將鎖數量減1

public final boolean release(int arg) {
    if (tryRelease(arg)) {//如果成功釋放鎖
        Node h = head;//獲取頭節點:(注意:這裡的頭節點就是當前正在釋放鎖的節點)
        if (h != null && h.waitStatus != 0)//頭結點存在且等待狀態不是取消
            unparkSuccessor(h);//喚醒距離頭節點最近的一個非取消的節點
        return true;
    }
    return false;
}

呼叫了tryRelease嘗試釋放鎖

protected final boolean tryRelease(int releases) {
 int c = getState() - releases;//獲取現在的鎖數量-傳入的解鎖數量(這裡為1)
    if (Thread.currentThread() != getExclusiveOwnerThread())//當前執行緒不持有鎖
        throw new IllegalMonitorStateException();
    boolean free = false;
    if (c == 0) {//鎖被釋放
        free = true;
        setExclusiveOwnerThread(null);
    }//如果不為0,怎麼辦,不釋放了嗎?
    setState(c);
    return free;
}

嘗試釋放鎖成功就喚醒等待佇列中最近的一個節點,釋放失敗就把鎖數量重置

private void unparkSuccessor(Node node) {

        int ws = node.waitStatus;
        if (ws < 0)//將ws設為0狀態(即什麼狀態都不是)
            compareAndSetWaitStatus(node, ws, 0);

        /*
         * 獲取頭節點的下一個等待狀態不是cancel的節點
         */
        Node s = node.next;//頭節點的下一個節點
        if (s == null || s.waitStatus > 0) {
            s = null;
            /*
             * 注意:從後往前遍歷找到離頭節點最近的一個非取消的節點,從後往前遍歷據說是在入隊(enq())的時候,可能nodeX.next==null,但是在讀原始碼的時候沒看出來
             */
            for (Node t = tail; t != null && t != node; t = t.prev)
                if (t.waitStatus <= 0)
                    s = t;
        }
        if (s != null)
            LockSupport.unpark(s.thread);//喚醒離頭節點最近的一個非取消的節點
    }

相關推薦

JAVA 併發 ReentrantLock 原始碼分析

ReentrantLock實現了Lock介面,是一種遞迴無阻塞的同步機制。 首先ReentrantLock有公平鎖和非公平鎖機制,使用構造方法ReentrantLock(boolean)指定,預設是非公平鎖。 公平鎖的含義是執行緒按照發出請求的先後順序獲取鎖

Java併發系列4AbstractQueuedSynchronizer原始碼分析之條件佇列

通過前面三篇的分析,我們深入瞭解了AbstractQueuedSynchronizer的內部結構和一些設計理念,知道了AbstractQueuedSynchronizer內部維護了一個同步狀態和兩個排隊區,這兩個排隊區分別是同步佇列和條件佇列。我們還是拿公共廁所做比喻,同步佇

Java併發程式設計ReentrantLock

一、ReentrantLock簡介 ReentrantLock可重入鎖,全名java.util.concurrent.locks.ReentrantLock,相當於是個最基礎版本的Lock的實現,針對公平鎖和非公平鎖,ReentrantLock都有實現。 二、ReentrantLock特性

Java併發程式設計Lock介面

一、Lock介面的引入 由於synchronized關鍵字有些缺陷,如無法響應中斷等,出現了Lock介面。相對於synchronized,Lock有如下補充: Lock可以響應中斷; Lock可以得知執行緒是否已經獲得鎖; Lock可以提供更為複雜的讀寫鎖,以應對讀寫同時存

定製併發自定義在計劃的執行緒池內執行的任務

宣告:本文是《 Java 7 Concurrency Cookbook 》的第七章, 作者: Javier Fernández González 譯者:鄭玉婷 自定義在計劃的執行緒池內執行的任務 計劃的執行緒池是 Executor 框架的基本執行緒池的擴充套件,允許你定製一個計劃來執行一段時

Java併發程式設計阻塞佇列

前言 在 Android多執行緒(一)執行緒池這篇文章時,當我們要建立ThreadPoolExecutor的時候需要傳進來一個型別為BlockingQueue的引數,它就是阻塞佇列,在這一篇文章裡我們會介紹阻塞佇列的定義、種類、實現原理以及應用。 1

Java容器2List原始碼解析

定義 在Java API中,官方給出的前兩段話如下: An ordered collection (also known as a sequence). The user of this interface has precise control over where i

java併發程式設計之讀寫鎖

一、讀寫鎖我們知道在多個執行緒訪問同一個資料的時候是存線上程安全問題的,而在僅僅是讀取資料的時候,是沒有安全問題的,那麼多個執行緒同時讀取資料我們就可以讓其不互斥;而多個執行緒都在修改(寫)資料或有的在讀取有的在寫入的時候再讓其互斥,這樣不但保證執行緒安全而且提高效能。Rea

Java併發程式設計-Phaser

phaser         英文意思移相器,相位器,表示“階段器”,用來解決控制多個執行緒分階段共同完成任務的情景問題,其作用相比CountDownLatch和CyclicBarrier更加靈活。如100個人參加高考需要考四場考試,早上考語文,需要等100人都

併發程式設計—— ThreadLocal原始碼分析及記憶體洩露預防

今天我們一起探討下ThreadLocal的實現原理和原始碼分析。首先,本文先談一下對ThreadLocal的理解,然後根據ThreadLocal類的原始碼分析了其實現原理和使用需要注意的地方,最後給出了兩個應用場景。相信本文一定能讓大家完全瞭解ThreadLocal。 ThreadL

HLS學習HLSDownloader原始碼分析5解析Media PlayList

解析Media PlayList     PlayList就是m3u8檔案或者索引檔案,Media PlayList也叫媒體播放列表或者媒體索引檔案解析Media PlayList的流程如下:1、如果hls_media_playlist結構體中媒體資訊存在,那麼先刪除2

RPC框架dubbo原始碼分析--dubbo服務提供者初始化

一、概述 dubbo服務提供者由dubbo:service來定義,從前面可以看到,Spring把dubbo:service解析成一個ServiceBean,ServiceBean實現了ApplicationListener和InitializingB

聊聊高併發四十解析java.util.concurrent各個元件 ThreadPoolExecutor原始碼分析

ThreadPoolExecutor是Executor執行框架最重要的一個實現類,提供了執行緒池管理和任務管理是兩個最基本的能力。這篇通過分析ThreadPoolExecutor的原始碼來看看如何設計和實現一個基於生產者消費者模型的執行器。 生產者消費者模型 生產者消費者

java學習筆記:變量

animal 單獨使用 div 位置 fin strong pub 局部變量 變量聲明 java一共三種變量: 局部變量(本地變量):方法調用時創建,方法結束時銷毀 實例變量(全局變量):類創建時創建,類銷毀時銷毀 類變量(靜態變量):程序啟動是創建,程序銷毀時銷毀

併發程式設計—— ReentrantLock實現原理及原始碼分析

  ReentrantLock是Java併發包中提供的一個可重入的互斥鎖。ReentrantLock和synchronized在基本用法,行為語義上都是類似的,同樣都具有可重入性。只不過相比原生的Synchronized,ReentrantLock增加了一些高階的擴充套件功能,比如它可以實現公平鎖,同時也可以

Java基礎鞏固-

繼承 關鍵字extent,implement java 不支援 多繼承 但支援多重繼承 特性 子類擁有父類非private的屬性,方法。 子類可以擁有自己的屬性和方法,即子類可以對父類進行擴充套件。 子類可以用自己的

Java載入過程 ——Thinking in Java學習筆記

java中一個類從被載入開始,一直到被銷燬為止,類的整個生命週期包括:載入、驗證、準備、解析、初始化、使用和解除安裝七個階段。 其中,類載入過程包括載入、驗證、準備、解析、初始化,其中,驗證、準備、解析又被合稱為連線過程。 1、載入階段 載入過程的主要工作有:

併發程式設計—— Java 併發佇列 BlockingQueue 實現之 ArrayBlockingQueue 原始碼分析

開篇先介紹下 BlockingQueue 這個介面的規則,後面再看其實現。 阻塞佇列概要 阻塞佇列與我們平常接觸的普通佇列(LinkedList或ArrayList等)的最大不同點,在於阻塞佇列的阻塞新增和阻塞刪除方法。 阻塞新增 所謂的阻塞新增是指當阻塞佇列元素已滿時,佇列會阻塞加入元素的執行緒,直佇

Java併發原始碼學習之執行緒池ThreadPoolExecutor原始碼分析

Java中使用執行緒池技術一般都是使用Executors這個工廠類,它提供了非常簡單方法來建立各種型別的執行緒池: public static ExecutorService newFixedThreadPool(int nThreads) public static ExecutorService

Java併發原始碼學習之AQS框架AbstractQueuedSynchronizer原始碼分析

經過前面幾篇文章的鋪墊,今天我們終於要看看AQS的廬山真面目了,建議第一次看AbstractQueuedSynchronizer 類原始碼的朋友可以先看下我前面幾篇文章: 分析原始碼是非常枯燥乏味的一件事,其實程式碼本身其實就是最好的說明了,因此基本都是貼出一些程式碼加上一些註釋, 因為Abstract