1. 程式人生 > >java 併發(執行緒&鎖)

java 併發(執行緒&鎖)

java 併發(執行緒&鎖)

##執行緒
###執行緒概念

作業系統排程的最小單元是執行緒,也叫輕量級程序(LightWeight Process),在一個程序裡可以建立多個執行緒,這些執行緒都擁有各自的計數器、堆疊和區域性變數等屬性,並且能夠訪問共享的記憶體變數。處理器在這些執行緒上高速切換,讓使用者感覺到這些執行緒在同時執行

###執行緒優先順序

setPriority(int)方法來修改優先順序,預設優先順序是5,優先順序高的執行緒分
配時間片的數量要多於優先順序低的執行緒。設定執行緒優先順序時,針對頻繁阻塞(休眠或者I/O操作)的執行緒需要設定較高優先順序,而偏重計算(需要較多CPU時間或者偏運算)的執行緒則設定較低的優先順序,確保處理器不會被獨佔

###執行緒的狀態

  • new
  • runnable(就緒,允許狀態)
  • blocked
  • waitting(進入等待狀態,需要觸發條件)
  • timewaiting(進入等待狀態,但是在指定時間內會放回)
  • terminated

    在這裡插入圖片描述
    ###守護執行緒

Daemon執行緒是一種支援型執行緒,因為它主要被用作程式中後臺排程以及支援性工作。這意味著,當一個Java虛擬機器中不存在非Daemon執行緒的時候,Java虛擬機器將會退出。可以通過呼叫Thread.setDaemon(true)將執行緒設定為Daemon執行緒
Daemon執行緒被用作完成支援性工作,但是在Java虛擬機器退出時Daemon執行緒中的finally塊(優雅停機,執行緒池安全退出)

###已過期方法
stop
resume
suspend

###執行緒間的通訊

在這裡插入圖片描述

  • 等待通知機制(notity/wait/wati(time)/notyfyAll)
  • Thread.join 線上程中呼叫另外執行緒的join方法,該執行緒會等到另外的那個執行緒返回之後再繼續執行
  • ThreadLocal

###ThreadLocal

  • 為每個使用該變數的執行緒提供的一個副本,可以做到資料隔離
  • 資料不一致問題:儲存了同一個物件的引用
  • 記憶體洩漏問題:
    1.記憶體洩漏是導致記憶體溢位的原因之一,但兩者並不等價,記憶體洩漏更多的是程式中不再持有某個物件的引用,但是該物件仍然無法被垃圾回收器回收,根本原因是該物件到引用根Root的鏈路是可達的
    2.舉例–java 高併發程式設計詳解270頁

-w428

在這裡插入圖片描述
引申問題:

  1. JVM 引用與垃圾回收

    • 強引用 strongRefrence:
      建立一個物件並把這個物件賦給一個引用變數。強引用有引用變數指向時永遠不會被垃圾回收。即使記憶體不足的時候
    • 弱引用 weakRefrence: weakReference 在JVM觸發任意的gc 都會導 致Entry的回收
      如果一個物件只具有弱引用,那就類似於可有可無的生活用品。弱引用與軟引用的區別在於:只具有弱引用的物件擁有更短暫的生命週期。在垃圾回收器執行緒掃描它所管轄的記憶體區域的過程中,一旦發現了只具有弱引用的物件,不管當前記憶體空間足夠與否,都會回收它的記憶體
    • 軟引用 softRefrence:
      如果一個物件只具有軟引用。如果記憶體空間足夠,垃圾回收器就不會回收它,如果記憶體空間不足了,就會回收這些物件的記憶體
    • PhantomReference(幻影引用)
      可以很好的獲取到哪個物件即將被垃圾回收
      參考:《java高併發設計模式》 304頁
  2. ThreadLocal與InheritableThreadLocal區別及應用場景

  3. ThreadLocal的擴容過程

private void resize() {
13             Entry[] oldTab = table;
14             int oldLen = oldTab.length;
15             int newLen = oldLen * 2;
16             Entry[] newTab = new Entry[newLen];
17             int count = 0;
18             // 遍歷Entry[]陣列
19             for (int j = 0; j < oldLen; ++j) {
20                 Entry e = oldTab[j];
21                 if (e != null) {
22                     ThreadLocal<?> k = e.get();
23                     if (k == null) {// 如果key=null
24                         e.value = null; // 把value也置null,有助於GC回收物件
25                     } else {// 如果key!=null
26                         int h = k.threadLocalHashCode & (newLen - 1);// 計算hash值 
27                         while (newTab[h] != null)// 如果這個位置已使用
28                             h = nextIndex(h, newLen);// 線性往後查詢,直到找到一個沒有使用的位置,h遞增
29                         newTab[h] = e;//在第一個空節點上塞入Entry e
30                         count++;// 計數++
31                     }
32                 }
33             }
34 
35             setThreshold(newLen);// 設定新的閾值(實際set方法用了2/3的newLen作為閾值)
36             size = count;// 設定ThreadLocalMap的元素個數
37             table = newTab;// 把新table賦值給ThreadLocalMap的Entry[] table
38         }

參考:https://www.cnblogs.com/dennyzhangdd/p/7978455.html 原始碼分析
###synchronized與lock的區別

  • synchronized 在成功完成功能或者丟擲異常時,虛擬機器會自動釋放執行緒佔有的鎖;而Lock物件在發生異常時,如果沒有主動呼叫unLock()方法去釋放鎖,則鎖物件會一直持有,因此使用Lock時需要在finally塊中釋放鎖
  • lock介面鎖可以通過多種方法來嘗試獲取鎖包括立即返回是否成功的tryLock(),以及一直嘗試獲取的lock()方法和嘗試等待指定時間長度獲取的方法,相對靈活了許多比synchronized;
  • 通過在讀多,寫少的高併發情況下,我們用ReentrantReadWriteLock分別獲取讀鎖和寫鎖來提高系統的效能,因為讀鎖是共享鎖,即可以同時有多個執行緒讀取共享資源,而寫鎖則保證了對共享資源的修改只能是單執行緒的
  • lock可以非阻塞方式獲取鎖
    使用推薦:少量同步使用synchronized,簡單.
    併發高,爭搶激烈使用lock

###synchronized實現原理
synchronized是基於Monitor來實現同步的。

在這裡插入圖片描述
在這裡插入圖片描述
Monitor 的工作機理

  1. 執行緒進入同步方法中。
  2. 為了繼續執行臨界區程式碼,執行緒必須獲取 Monitor 鎖。如果獲取鎖成功,將成為該監視者物件的擁有者。任一時刻內,監視者物件只屬於一個活動執行緒(The Owner)
  3. 擁有監視者物件的執行緒可以呼叫 wait() 進入等待集合(Wait Set),同時釋放監視鎖,進入等待狀態。
  4. 其他執行緒呼叫 notify() / notifyAll() 介面喚醒等待集合中的執行緒,這些等待的執行緒需要重新獲取監視鎖後才能執行 wait() 之後的程式碼。
  5. 同步方法執行完畢了,執行緒退出臨界區,並釋放監視鎖
    參考:
    https://www.cnblogs.com/nsw2018/p/5821738.html
    https://www.toutiao.com/i6538662208836993544/

###LockSupport

  • 概念: LockSupport是JDK中比較底層的類,用來建立鎖和其他同步工具類的基本執行緒阻塞原語。LockSupport很類似於二元訊號量(只有1個許可證可供使用),如果這個許可還沒有被佔用,當前執行緒獲取許可並繼續執行;如果許可已經被佔用,當前執行緒阻塞,等待獲取許可

  • LockSupport阻塞和解除阻塞執行緒直接操作的是Thread,而Object的wait/notify它並不是直接對執行緒操作,它是被動的方法,它需要一個object來進行執行緒的掛起或喚醒.Thead在呼叫wait之前, 當前執行緒必須先獲得該物件的監視器(synchronized),被喚醒之後需要重新獲取到監視器才能繼續執行.而LockSupport可以隨意進行park或者unpark

  • 基本操作
    1. 先執行park,然後在執行unpark,進行同步,並且在unpark的前後都呼叫了getBlocker,可以看到兩次的結果不一樣,第二次呼叫的結果為null,這是因為在呼叫unpark之後,執行了Lock.park(Object blocker) 方法中的setBlocker(t, null) 方法,所以第二次呼叫getBlocker時為null
    2. 先執行unpark,在呼叫park,直接就沒被阻塞

在這裡插入圖片描述
參考:
https://blog.csdn.net/secsf/article/details/78560013 synchornized與locksupport區別
https://www.imooc.com/article/48378 含專案程式碼
https://www.jianshu.com/p/ceb8870ef2c5
https://www.jianshu.com/p/1add173ea703
https://yq.aliyun.com/articles/493552

##AQS(佇列同步器AbstractQueuedSynchronizer)

  • 概念:抽象的佇列式的同步器,AQS定義了一套多執行緒訪問共享資源的同步器框架,許多同步類實現都依賴於它,如常用的ReentrantLock/Semaphore/CountDownLatch
  • aqs維護著一個state屬性,代表共享的資源,和一個first-in-first-out (FIFO)等待佇列。基於此實現加鎖、同步的基礎框架。
    state資源可以表示鎖或同步的狀態,如在ReentrantLock中加鎖或者重入則state+=1,釋放鎖state-=1。當state>0時,其他執行緒會加入到FIFO佇列等待釋放鎖。在Semaphore中則是獲取資源state-=1,釋放資源state+=1。state為0代表資源已消耗完,阻塞等待

    在這裡插入圖片描述
  1. 資料結構
public abstract class AbstractQueuedSynchronizer
    extends AbstractOwnableSynchronizer
    implements java.io.Serializable {
    	...
    	//等待佇列頭
        private transient volatile Node head;
        //等待佇列尾
        private transient volatile Node tail;
        //同步狀態
        private volatile int state;
    	...
    }
  1. Node結構

變數waitStatus則表示當前被封裝成Node結點的等待狀態,共有4種取值CANCELLED、SIGNAL、CONDITION、PROPAGATE。

  • CANCELLED:值為1,在同步佇列中等待的執行緒等待超時或被中斷,需要從同步佇列中取消該Node的結點,其結點的waitStatus為CANCELLED,即結束狀態,進入該狀態後的結點將不會再變化。
  • SIGNAL:值為-1,被標識為該等待喚醒狀態的後繼結點,當其前繼結點的執行緒釋放了同步鎖或被取消,將會通知該後繼結點的執行緒執行。說白了,就是處於喚醒狀態,只要前繼結點釋放鎖,就會通知標識為SIGNAL狀態的後繼結點的執行緒執行。
  • CONDITION:值為-2,與Condition相關,該標識的結點處於等待佇列中,結點的執行緒等待在Condition上,當其他執行緒呼叫了Condition的signal()方法後,CONDITION狀態的結點將從等待佇列轉移到同步佇列中,等待獲取同步鎖。
  • PROPAGATE:值為-3,與共享模式相關,在共享模式中,該狀態標識結點的執行緒處於可執行狀態。
  • 0狀態:值為0,代表初始化狀態。AQS在判斷狀態時,通過用waitStatus>0表示取消狀態,而waitStatus<0表示有效狀態。
static final class Node {
 	static final Node SHARED = new Node();	
 	static final Node EXCLUSIVE = null;
 	volatile int waitStatus;
 	volatile Node prev;
 	volatile Node next;
 	volatile Thread thread;
 	Node nextWaiter;
}
  1. 狀態相關方法
protected final int getState() {
        return state;
    }
    protected final void setState(int newState) {
        state = newState;
    }
    protected final boolean compareAndSetState(int expect, int update) {
        return unsafe.compareAndSwapInt(this, stateOffset, expect, update);
    }

####獨佔式獲取與釋放
AQS模板方法之acquire(int arg)獨佔式獲取同步狀態

    public final void acquire(int arg) {
        if (!tryAcquire(arg) &&
            acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
            selfInterrupt();
    }
  1. 新增到等待佇列且自旋等待獲取
    private Node addWaiter(Node mode) {
        //新建一個Node,mode表示被獨佔或共享
        Node node = new Node(Thread.currentThread(), mode);
        //嘗試快速設定尾節點
        Node pred = tail;
        if (pred != null) {
            node.prev = pred;
            //CAS設定tail
            if (compareAndSetTail(pred, node)) {
                pred.next = node;
                return node;
            }
        }
        enq(node);
        return node;
    }

    private Node enq(final Node node) {
        for (;;) {
            Node t = tail;
            if (t == null) { // Must initialize
                if (compareAndSetHead(new Node()))
                    tail = head;
            } else {
                node.prev = t; 
                if (compareAndSetTail(t, node)) {
                    t.next = node;
                    return t;
                }
            }
        }
    }
  1. 新建一個node,先嚐試插入到尾節點後面。如果失敗就自旋,直到新增成功為止
final boolean acquireQueued(final Node node, int arg) {
        boolean failed = true;
        try {
            boolean interrupted = false;
            for (;;) {
                final Node p = node.predecessor();
                if (p == head && tryAcquire(arg)) {
                    setHead(node);
                    p.next = null; // help GC
                    failed = false;
                    return interrupted;
                }
                if (shouldParkAfterFailedAcquire(p, node) &&
                    parkAndCheckInterrupt())
                    interrupted = true;
            }
        } finally {
            if (failed)
                cancelAcquire(node);
        }
    }

  1. 自旋嘗試獲取同步狀態,成功則返回。
    shouldParkAfterFailedAcquire()檢測是否需要阻塞當前執行緒(根據前一個節點的waitStatus),刪除取消狀態的執行緒
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
        int ws = pred.waitStatus;
        //前一個節點是SINGNAL,則當前節點需要等待
        if (ws == Node.SIGNAL)
            return true;
        //ws>0 表示執行緒被中斷或超時,刪除等待執行緒
        if (ws > 0) {
            do {
                node.prev = pred = pred.prev;
            } while (pred.waitStatus > 0);
            pred.next = node;
        } else {
            //設定等待狀態
            compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
        }
        return false;
    }
private final boolean parkAndCheckInterrupt() {
        LockSupport.park(this);
        return Thread.interrupted(); 
    }

####AQS模板方法

AQS模板方法之acquire(int arg)獨佔式獲取同步狀態
    public final void acquire(int arg) {
        if (!tryAcquire(arg) &&
            acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
            selfInterrupt();
    }

先嚐試獲取鎖,成功則返回。不成功新增到等待佇列,然後迴圈等待直到獲取成功。

AQS模板方法之acquireInterruptibly(int arg),獨佔式獲取鎖,可響應中斷
    public final void acquireInterruptibly(int arg)
            throws InterruptedException {
        if (Thread.interrupted())
            throw new InterruptedException();
        if (!tryAcquire(arg))
            doAcquireInterruptibly(arg);
    }

    private void doAcquireInterruptibly(int arg)
        throws InterruptedException {
        //新增到等待佇列中
        final Node node = addWaiter(Node.EXCLUSIVE);
        boolean failed = true;
        try {
            for (;;) {
                //只有當前節點的pre是head,才可以嘗試獲取鎖。
                final Node p = node.predecessor();
                if (p == head && tryAcquire(arg)) {
                    setHead(node);
                    p.next = null; // help GC
                    failed = false;
                    return;
                }
                if (shouldParkAfterFailedAcquire(p, node) &&
                    parkAndCheckInterrupt())
                    //如果執行緒呼叫了中斷,丟擲異常,實現中斷
                    throw new InterruptedException();
            }
        } finally {
            if (failed)
                //中斷後,取消等待
                cancelAcquire(node);
        }
    }

先判斷執行緒是否被中斷,如果中斷丟擲InterruptedException異常
嘗試獲取鎖,成功直接返回。失敗執行doAcquireInterruptibly。
doAcquireInterruptiblu先新增到等待佇列中,在自旋中,只有當前節點的pre是head才嘗試獲取鎖(因為頭節點是獲取同步狀態成功的節點,頭節點釋放鎖後喚醒下一個節點),

#####AQS模板方法之tryAcquireNanos(int arg, long nanosTimeout),超時等待

    public final boolean tryAcquireNanos(int arg, long nanosTimeout)
            throws InterruptedException {
        if (Thread.interrupted())
            throw new InterruptedException();
        return tryAcquire(arg) ||
            doAcquireNanos(arg, nanosTimeout);
    }

     private boolean doAcquireNanos(int arg, long nanosTimeout)
            throws InterruptedException {
        if (nanosTimeout <= 0L)
            return false;
        final long deadline = System.nanoTime() + nanosTimeout;
        //新增一個獨佔節點到等待佇列
        final Node node = addWaiter(Node.EXCLUSIVE);
        boolean failed = true;
        try {
            //自旋嘗試獲取鎖,檢測超時
            for (;;) {
                //嘗試獲取鎖,獲取成功返回true
                final Node p = node.predecessor();
                if (p == head && tryAcquire(arg)) {
                    setHead(node);
                    p.next = null; // help GC
                    failed = false;
                    return true;
                }
                nanosTimeout = deadline - System.nanoTime();
                if (nanosTimeout <= 0L)
                    return false;
                //大於spinForTimeoutThreshold時間間隔就park一段時間
                if (shouldParkAfterFailedAcquire(p, node) &&
                    nanosTimeout > spinForTimeoutThreshold)
                    LockSupport.parkNanos(this, nanosTimeout);
                //執行緒中斷,丟擲異常
                if (Thread.interrupted())
                    throw new InterruptedException();
            }
        } finally {
            if (failed)
                cancelAcquire(node);
        }
    }

先嚐試獲取鎖,獲取失敗執行doAcquireNanos()方法
doAcquireNanos()除了嘗試獲取鎖同時記錄超時,超過時間沒有成功,返回false

AQS模板方法之release(int arg)釋放鎖
    public final boolean release(int arg) {
        if (tryRelease(arg)) {
            Node h = head;
            if (h != null && h.waitStatus != 0)
                unparkSuccessor(h);
            return true;
        }
        return false;
    }

    private void unparkSuccessor(Node node) {
        int ws = node.waitStatus;
        if (ws < 0)
            compareAndSetWaitStatus(node, ws, 0);
        Node s = node.next;
        if (s == null || s.waitStatus > 0) {
            s = null;
            //從後往前早,狀態小於0 的節點,相當於找到離頭節點最近的有效未取消或中斷的節點
            for (Node t = tail; t != null && t != node; t = t.prev)
                if (t.waitStatus <= 0)
                    s = t;
        }
        if (s != null)
            //喚醒head之後的節點
            LockSupport.unpark(s.thread);
    }

####共享獲取

AQS模板方法之acquireShared(int arg)獲取共享鎖
    public final void acquireShared(int arg) {
        //先嚐試獲取共享鎖
        if (tryAcquireShared(arg) < 0)
            doAcquireShared(arg);
    }

    private void doAcquireShared(int arg) {
        //新增一個共享節點到佇列
        final Node node = addWaiter(Node.SHARED);
        boolean failed = true;
        try {
            boolean interrupted = false;
            for (;;) {
                final Node p = node.predecessor();
                if (p == head) {
                    int r = tryAcquireShared(arg);
                    if (r >= 0) {
                        setHeadAndPropagate(node, r);
                        p.next = null; // help GC
                        if (interrupted)
                            selfInterrupt();
                        failed = false;
                        return;
                    }
                }
                if (shouldParkAfterFailedAcquire(p, node) &&
                    parkAndCheckInterrupt())
                    interrupted = true;
            }
        } finally {
            if (failed)
                cancelAcquire(node);
        }
    }
private void setHeadAndPropagate(Node node, int propagate) {
    Node h = head; 
    setHead(node);//head指向自己
     //如果還有剩餘量,繼續喚醒下一個鄰居執行緒
    if (propagate > 0 || h == null || h.waitStatus < 0) {
        Node s = node.next;
        if (s == null || s.isShared())
            doReleaseShared();
    }
}


在這裡插入圖片描述

參考:https://www.cnblogs.com/waterystone/p/4920797.html

###ReentrantLock 原始碼分析

###ReentrantReadWriteLock 原始碼分析

預設初始化readlock ,writelock,其中sync 同步器繼承AQS


public class ReentrantReadWriteLock implements ReadWriteLock, java.io.Serializable {
    private final ReentrantReadWriteLock.ReadLock readerLock;
    /** Inner class providing writelock */
    private final ReentrantReadWriteLock.WriteLock writerLock;
    /** Performs all synchronization mechanics */
    final Sync sync;

    public ReentrantReadWriteLock(boolean fair) {
        sync = fair ? new FairSync() : new NonfairSync();
        readerLock = new ReadLock(this);
        writerLock = new WriteLock(this);
    }

    public ReentrantReadWriteLock.WriteLock writeLock() { return writerLock; }
    public ReentrantReadWriteLock.ReadLock  readLock()  { return readerLock; }


    abstract static class Sync extends AbstractQueuedSynchronizer {}

    static final class NonfairSync extends Sync {}

    static final class FairSync extends Sync {}

    public static class ReadLock implemen·ts Lock, java.io.Serializable {}

    public static class WriteLock implements Lock, java.io.Serializable {}
}
  • Sync類中變數
    因為state 值是32位int型,前16位表示讀鎖個數,後16位表示寫鎖個數
 static final int SHARED_SHIFT   = 16;
        //因為寫鎖上前16位,每次要讓共享鎖+1,就應該讓state加 1<<16,       
        static final int SHARED_UNIT    = (1 << SHARED_SHIFT); / static final int MAX_COUNT      = (1 << SHARED_SHIFT) - 1;  //每種鎖的最大重入數量
        static final int EXCLUSIVE_MASK = (1 << SHARED_SHIFT) - 1;
        /** Returns the number of shared holds represented in count  */
        //獲取讀鎖個數。向右移16得到的資料為讀鎖個數
        static int sharedCount(int c)    { return c >>> SHARED_SHIFT; }
        /** Returns the number of exclusive holds represented in count  */
        //獲取寫鎖個數
        static int exclusiveCount(int c) { return c & EXCLUSIVE_MASK; }
protected final int tryAcquireShared(int unused) {
            Thread current = Thread.currentThread();
            int c = getState();
            if (exclusiveCount(c) != 0 &&
                getExclusiveOwnerThread() != current)
                return -1; //如果獨佔模式被佔且不是當前執行緒持有,則獲取失敗
            int r = sharedCount(c);
            //如果公平策略沒有要求阻塞且重入數沒有到達最大值,則直接嘗試CAS更新state
            if (!readerShouldBlock() &&
                r < MAX_COUNT &&
                compareAndSetState(c, c + SHARED_UNIT)) {
                //更新成功後會在firstReaderHoldCount中或readHolds(ThreadLocal型別的)的本執行緒副本中記錄當前執行緒重入數(淺藍色程式碼),這是為了實現jdk1.6中加入的getReadHoldCount()方法的,這個方法能獲取當前執行緒重入共享鎖的次數(state中記錄的是多個執行緒的總重入次數),加入了這個方法讓程式碼複雜了不少,但是其原理還是很簡單的:如果當前只有一個執行緒的話,還不需要動用ThreadLocal,直接往firstReaderHoldCount這個成員變數裡存重入數,當有第二個執行緒來的時候,就要動用ThreadLocal變數readHolds了,每個執行緒擁有自己的副本,用來儲存自己的重入數。
                if (r == 0) {
                    firstReader = current;
                    firstReaderHoldCount = 1;
                } else if (firstReader == current) {
                    firstReaderHoldCount++;
                } else {
                    HoldCounter rh = cachedHoldCounter;
                    if (rh == null || rh.tid != getThreadId(current))
                        cachedHoldCounter = rh = readHolds.get();
                    else if (rh.count == 0)
                        readHolds.set(rh);
                    rh.count++;
                }
                return 1;
            }
            return fullTryAcquireShared(current); //用來處理CAS沒成功的情況,邏輯和上面的邏輯是類似的,就是加了無限迴圈
        }

獲取讀鎖失敗的話,加入到等待佇列

public final boolean releaseShared(int arg) {
        if (tryReleaseShared(arg)) {
        //獲取失敗之後,加入到等待佇列,一直自旋獲取鎖,頭節點獲取到鎖後需要喚醒它下一個節點,第二個節點需要不斷的自旋詢問是否可以拿到鎖
            doReleaseShared();
            return true;
        }
        return false;
    }
protected final boolean tryReleaseShared(int unused) {
            Thread current = Thread.currentThread();
            //淺藍色程式碼也是為了實現jdk1.6中加入的getReadHoldCount()方法,在更新當前執行緒的重入數。
            if (firstReader == current) {
                // assert firstReaderHoldCount > 0;
                if (firstReaderHoldCount == 1)
                    firstReader = null;
                else
                    firstReaderHoldCount--;
            } else {
                HoldCounter rh = cachedHoldCounter;
                if (rh == null || rh.tid != getThreadId(current))
                    rh = readHolds.get();
                int count = rh.count;
                if (count <= 1) {
                    readHolds.remove();
                    if (count <= 0)
                        throw unmatchedUnlockException();
                }
                --rh.count;
            }
            //這裡是真正的釋放同步狀態的邏輯,就是直接同步狀態-SHARED_UNIT,然後CAS更新,沒啥好說的
            for (;;) {
                int c = getState();
                int nextc = c - SHARED_UNIT;
                if (compareAndSetState(c, nextc))
                    // Releasing the read lock has no effect on readers,
                    // but it may allow waiting writers to proceed if
                    // both read and write locks are now free.
                    return nextc == 0;
            }
        }

參考:https://www.cnblogs.com/sheeva/p/6480116.html
參考:https://www.cnblogs.com/leesf456/p/5419132.html
###Condition

-w600
在這裡插入圖片描述

Condition配合ReentrantLock可以實現等待/通知的機制。await()在await()前需要先獲取鎖。進入await()執行緒釋放鎖,進入等待狀態直到被中斷或者被signle()/singleAll()/single()/single()喚醒在Condition等待的執行緒,執行之前需要獲取鎖

####Condition 原始碼解析

Condtion只是一個介面。主要分析在AQS中Condtion的實現,ConditionObject。分析過程中主要解決。await()如何釋放鎖?如何等待?如何被喚醒?喚醒後如何繼續執行等問題

AQS等待佇列與Condition佇列是兩個相互獨立的佇列
await()就是在當前執行緒持有鎖的基礎上釋放鎖資源,並新建Condition節點加入到Condition的佇列尾部,阻塞當前執行緒
signal()就是將Condition的頭節點移動到AQS等待節點尾部,讓其等待再次獲取鎖

在這裡插入圖片描述
II.節點1執行Condition.await()
1.將head後移
2.釋放節點1的鎖並從AQS等待佇列中移除
3.將節點1加入到Condition的等待佇列中
4.更新lastWaiter為節點1


在這裡插入圖片描述

III.節點2執行signal()操作
5.將firstWaiter後移
6.將節點4移出Condition佇列
7.將節點4加入到AQS的等待佇列中去
8.更新AQS的等待佇列的tail

在這裡插入圖片描述

public final void await() throws InterruptedException {
    // 1.如果當前執行緒被中斷,則丟擲中斷異常
    if (Thread.interrupted())
        throw new InterruptedException();
    // 2.將節點加入到Condition佇列中去,這裡如果lastWaiter是cancel狀態,那麼會把它踢出Condition佇列。
    Node node = addConditionWaiter();
    // 3.呼叫tryRelease,釋放當前執行緒的鎖
    long savedState = fullyRelease(node);
    int interruptMode = 0;
    // 4.為什麼會有在AQS的等待佇列的判斷?
    // 解答:signal操作會將Node從Condition佇列中拿出並且放入到等待佇列中去,在不在AQS等待佇列就看signal是否執行了
    // 如果不在AQS等待佇列中,就park當前執行緒,如果在,就退出迴圈,這個時候如果被中斷,那麼就退出迴圈
    while (!isOnSyncQueue(node)) {
        LockSupport.park(this);
        if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
            break;
    }
    // 5.這個時候執行緒已經被signal()或者signalAll()操作給喚醒了,退出了4中的while迴圈
    // 自旋等待嘗試再次獲取鎖,呼叫acquireQueued方法
    if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
        interruptMode = REINTERRUPT;
    if (node.nextWaiter != null)
        unlinkCancelledWaiters();
    if (interruptMode != 0)
        reportInterruptAfterWait(interruptMode);
}

將condition佇列的node,放到AQSd佇列中

final boolean transferForSignal(Node node) {
    /*
     * 設定node的waitStatus:Condition->0
     */
    if (!compareAndSetWaitStatus(node, Node.CONDITION, 0))
        return false;

    /*
     * 加入到AQS的等待佇列,讓節點繼續獲取鎖
     * 設定前置節點狀態為SIGNAL
     */
    Node p = enq(node);
    int c = p.waitStatus;
    if (c > 0 || !compareAndSetWaitStatus(p, c, Node.SIGNAL))
        LockSupport.unpark(node.thread);
    return true;
}

參考:https://blog.csdn.net/coslay/article/details/45217069
參考:https://www.jianshu.com/p/be2dc7c878dc
###valotile

  • 概念:volatile是輕量級的synchronized,它在多處理器開發中保證了共享變數的“可見性”。可見性的意思是當一個執行緒修改一個共享變數時,另外一個執行緒能讀到這個修改的值。如果volatile變數修飾符使用恰當的話,它比synchronized的使用和執行成本更低,因為它不會引起執行緒上下文的切換和排程
    兩個原則:
  • volateile 的變數在寫入的時候,回直接寫到記憶體上
  • 其他執行緒通過嗅探匯流排,檢測自己快取對的記憶體塊上的資料是否過期,過期就會設定為無效,當需要操作該資料時候,會從記憶體中從新載入進來

    在這裡插入圖片描述
    ##CAS(Compare And Swap)
  • 當多個執行緒同時使用CAS操作一個變數時,只有一個會勝出,併成功更新,其餘均會失敗,但失敗的執行緒並不會被掛起,僅是被告知失敗,並且允許再次嘗試,當然也允許失敗的執行緒放棄操作,這點從圖中也可以看出來。基於這樣的原理,CAS操作即使沒有鎖,同樣知道其他執行緒對共享資源操作影響,並執行相應的處理措施。同時從這點也可以看出,由於無鎖操作中沒有鎖的存在,因此不可能出現死鎖的情況,也就是說無鎖操作天生免疫死鎖


在這裡插入圖片描述
##鎖
###公平鎖與非公平鎖
公平鎖即儘量以請求鎖的順序來獲取鎖。比如同是有多個執行緒在等待一個鎖,當這個鎖被釋放時,等待時間最久的執行緒(最先請求的執行緒)會獲得該所,這種就是公平鎖。
  非公平鎖即無法保證鎖的獲取是按照請求鎖的順序進行的。這樣就可能導致某個或者一些執行緒永遠獲取不到鎖。
  在Java中,synchronized就是非公平鎖,它無法保證等待的執行緒獲取鎖的順序。
而對於ReentrantLock和ReentrantReadWriteLock,它預設情況下是非公平鎖,但是可以設定為公平鎖。設定方法如下:ReentrantReadWriteLock lock = new ReentrantReadWriteLock(true);
參考:
https://blog.csdn.net/yanyan19880509/article/details/52345422/ (生動形象)

###可重入鎖
synchronized/ReentrantReadWriteLock

參考:
https://blog.csdn.net/yanyan19880509/article/details/52345422/ (生動形象)
https://blog.csdn.net/u012545728/article/details/80843595
###獨享鎖/共享鎖

  • 獨享鎖:該鎖每一次只能被一個執行緒所持有
  • 共享鎖:該鎖可被多個執行緒共有,典型的就是ReentrantReadWriteLock裡的讀鎖,它的讀鎖是可以被共享的,但是它的寫鎖確每次只能被獨佔

###互斥鎖/讀寫鎖/自旋鎖

  •  互斥鎖:共享資源的使用是互斥的,即一個執行緒獲得資源的使用權後就會將該資源加鎖,使用完後會將其解鎖,如果在使用過程中有其他執行緒想要獲取該資源的鎖,那麼它就會被阻塞陷入睡眠狀態,直到該資源被解鎖才會被喚醒,如果被阻塞的資源不止一個,那麼它們都會被喚醒,但是獲得資源使用權的是第一個被喚醒的執行緒,其它執行緒又陷入沉睡.
    
  • 讀寫鎖:它擁有讀狀態加鎖、寫狀態加鎖、不加鎖這三種狀態。只有一個執行緒可以佔有寫狀態的鎖,但可以有多個執行緒同時佔有讀狀態鎖,這也是它可以實現高併發的原因。當其處於寫狀態鎖下,任何想要嘗試獲得鎖的執行緒都會被阻塞,直到寫狀態鎖被釋放;如果是處於讀狀態鎖下,允許其它執行緒獲得它的讀狀態鎖,但是不允許獲得它的寫狀態鎖,直到所有執行緒的讀狀態鎖被釋放;為了避免想要嘗試寫操作的執行緒一直得不到寫狀態鎖,當讀寫鎖感知到有執行緒想要獲得寫狀態鎖時,便會阻塞其後所有想要獲得讀狀態鎖的執行緒。所以讀寫鎖非常適合資源的讀操作遠多於寫操作的情況
    
  • 自旋鎖
    在Java中,自旋鎖是指嘗試獲取鎖的執行緒不會立即阻塞,而是採用迴圈的方式去嘗試獲取鎖,這樣的好處是減少執行緒上下文切換的消耗,缺點是迴圈會消耗CPU

###樂觀鎖/悲觀鎖
樂觀鎖在Java中的使用,是無鎖程式設計,常常採用的是CAS演算法,典型的例子就是原子類,通過CAS自旋實現原子操作的更新,
CAS 利用作業系統底層,compare and sweep 的機制

###分段鎖
是一種鎖設計,例如ConcurrentHashMap
ConcurrentHashMap來說一下分段鎖的含義以及設計思想,
ConcurrentHashMap中的分段鎖稱為Segment,它即類似於HashMap(JDK7與JDK8中HashMap的實現)的結構,即內部擁有一個Entry陣列,陣列中的每個元素又是一個連結串列;同時又是一個ReentrantLock(Segment繼承了ReentrantLock)。
當需要put元素的時候,並不是對整個hashmap進行加鎖,而是先通過hashcode來知道他要放在那一個分段中,然後對這個分段進行加鎖,所以當多執行緒put的時候,只要不是放在一個分段中,就實現了真正的並行的插入。
但是,在統計size的時候,可就是獲取hashmap全域性資訊的時候,就需要獲取所有的分段鎖才能統計。
分段鎖的設計目的是細化鎖的粒度,當操作不需要更新整個陣列的時候,就僅僅針對陣列中的一項進行加鎖操作
###偏向鎖/輕量級鎖/重量級鎖
這三種鎖是指鎖的狀態,並且是針對Synchronized。在Java 5通過引入鎖升級的機制來實現高效Synchronized。這三種鎖的狀態是通過物件監視器在物件頭中的欄位來表明的。

  • 偏向鎖是指一段同步程式碼一直被一個執行緒所訪問,那麼該執行緒會自動獲取鎖。降低獲取鎖的代價
  • 輕量級鎖是指當鎖是偏向鎖的時候,被另一個執行緒所訪問,偏向鎖就會升級為輕量級鎖,其他執行緒會通過自旋的形式嘗試獲取鎖,不會阻塞,提高效能
  • 重量級鎖是指當鎖為輕量級鎖的時候,另一個執行緒雖然是自旋,但自旋不會一直持續下去,當自旋一定次數的時候,還沒有獲取到鎖,就會進入阻塞,該鎖膨脹為重量級鎖。重量級鎖會讓其他申請的執行緒進入阻塞,效能降低

參考:https://www.toutiao.com/i6538662208836993544/