1. 程式人生 > 其它 >JUC併發包

JUC併發包

Java併發包(java.util.concurrent包,簡稱J.U.C)的構成:

J.U.C核心由5大塊組成:atomic包、locks包、collections包、tools包(AQS)、executor包(執行緒池)

一、Atomic包

Atomic包是java.util.concurrent下的另一個專門為執行緒安全設計的Java包,包含多個原子操作類。這個包裡面提供了一組原子變數類。其基本的特性就是在多執行緒環境下,當有多個執行緒同時執行這些類的例項包含的方法時,具有排他性,即當某個執行緒進入方法,執行其中的指令時,不會被其他執行緒打斷,而別的執行緒就像自旋鎖一樣,一直等到該方法執行完成,才由JVM從等待佇列中選擇一個另一個執行緒進入,這只是一種邏輯上的理解。實際上是藉助硬體的相關指令來實現的,不會阻塞執行緒(或者說只是在硬體級別上阻塞了)。可以對基本資料、陣列中的基本資料、對類中的基本資料進行操作。原子變數類相當於一種泛化的volatile

變數,能夠支援原子的和有條件的讀-改-寫操作。

Atomic中標量類AtomicBoolean,AtomicInteger,AtomicLong,AtomicReference這四種基本型別用來處理布林,整數,長整數,物件四種資料,其內部實現不是簡單的使用synchronized,而是一個更為高效的方式CAS (compare and swap) + volatile和native方法,從而避免了synchronized的高開銷,執行效率大為提升。
CAS(Compare and swap)比較和替換是設計併發演算法時用到的一種技術。簡單來說,比較和替換是使用一個期望值和一個變數的當前值進行比較,如果當前變數的值與我們期望的值相等,就使用一個新值替換當前變數的值

(一)AtomicBoolean原始碼分析

AtomicBoolean 內部的屬性

// 設定為使用Unsafe.compareAndSwapInt進行更新
private static final Unsafe unsafe = Unsafe.getUnsafe();
//儲存修改變數的實際記憶體地址,通過unsafe.objectFieldOffset讀取
private static final long valueOffset;

 // 初始化的時候,執行靜態程式碼塊,計算出儲存的value的記憶體地址便於直接進行記憶體操作
 //objectFieldOffset(Final f):返回給定的非靜態屬性在它的類的儲存分配中的位置(偏移地址)。
static {
    try { 
            valueOffset = unsafe.objectFieldOffset  
                (AtomicBoolean.class.getDeclaredField("value"));  
        } catch (Exception ex) {
            throw new Error(ex);
        }
}
private volatile int value;

  建構函式

/** 
* Creates a new {@code AtomicBoolean} with the given initial value.
* 通過給定的初始值,將boolean轉為int後初始化value
* @param initialValue the initial value 
*/
public AtomicBoolean(boolean initialValue) { 
    value = initialValue ? 1 : 0;
}


/**
* Creates a new {@code AtomicBoolean} with initial value {@code false}. 
* 初始化為預設值,預設為false,因為int的預設值是0
*/
public AtomicBoolean() {}

  

/** 
* Atomically sets to the given value and returns the previous value. 
* 
* @param newValue the new value 
* @return the previous value 
*/
// 通過原子的方式設定給定的值,並返回之前的值
public final boolean getAndSet(boolean newValue) {
    boolean prev;
    do {
        //先get()到原來的值,再進行原子更新,會一直迴圈直到更新成功
        prev = get(); 
    } while (!compareAndSet(prev, newValue));
    return prev;
}

  compareAndSet

public final boolean compareAndSet(boolean expect, boolean 
update) {
    int e = expect ? 1 : 0;
    int u = update ? 1 : 0;
    //unsafe.compareAndSwapInt:原子性地更新偏移地址為valueOffset的屬性值為u,當且僅當偏移地址為alueOffset的屬性的當前值為e才會更新成功,否則返回false。
    return unsafe.compareAndSwapInt(this, valueOffset, e, u);
}

AtomicBoolean的使用舉例

//如果想讓某種操作只執行一次,初始atomicBoolean為false
AtomicBoolean atomicBoolean = new AtomicBoolean(false);
//如果當前值為false,設定當前值為true,如果設定成功,返回true
if (atomicBoolean.compareAndSet(false,true)){
    //執行操作
}

(二)AtomicInteger原始碼分析

構造方法

public AtomicBoolean(boolean initialValue){
    value = initialValue ? 1 : 0;
}
public AtomicBoolean(){}

  一個有參,一個無參,無參時成員變數value值為0,也就是false.

set()和lazySet()

public final void set(int newValue) {
    value = newValue;
}

  

lazySet()方法實現了對value的非volatile賦值,通過呼叫unsafe.putOrderedInt()方法,直接向固定偏移量的記憶體上寫入資料,但不使其對其他執行緒立刻可見(putOrderedInt()是putIntVolatile()的延遲實現)。

public final void lazySet(int newValue) {
    unsafe.putOrderedInt(this, valueOffset, newValue);
}

  

想要獲取value的值就需要使用get()方法,AtomicInteger除了提供基本的get()方法之外,還提供了getAndSet(),getAndIncrement(),getAndDecrement(),getAndAdd(),getAndUpdate()和getAndAccumulate()等方法。

getAndSet()方法實際上呼叫getAndSetInt()方法,它的底層實現邏輯是利用getIntVolatile()方法獲取value後進行的自旋CAS操作。

getAndIncrement(),getAndDecrement()和getAndAdd()

這兩個方法實現的原理和getAndSet()方法基本是一樣的,只是將get出來的value加1或減1了而已。

getAndUpdate()和getAndAccumulate()分別以函數語言程式設計介面IntUnaryOperator和IntBinaryOperator為入參,實現AtomicInteger的自定義變換。使用者可以自定義一個函式,讓value按函式計算結果遞增,也可以定義兩個可以互相翻轉的int,使value交替變換。

(三)AtomicStampedReference &AtomicMarkableReference

防止ABA問題,分別靠時間戳和Mark標記位來方式ABA問題的發生。

AtomicMarkableReference和 AtomicStampedReference原始碼幾乎相同,唯一區別就在於一個是int型的時間戳,而這個類則是布林型的標記值。
兩者區別在於AtomicStampedReference可以知道修改了多少次,而AtomicMarkableReference則只知道有沒有被修改過

cas的ABA問題就是 假設初始值為A,執行緒3和執行緒1都獲取到了初始值A,然後執行緒1將A改為了B,執行緒2將B又改回了A,這時候執行緒3做修改時,是感知不到這個值從A改為了B又改回了A的過程:

AtomicStampedReference 本質是有一個int 值作為版本號,每次更改前先取到這個int值的版本號,等到修改的時候,比較當前版本號與當前執行緒持有的版本號是否一致,如果一致,則進行修改,並將版本號+1(當然加多少或減多少都是可以自己定義的),在zookeeper中保持資料的一致性也是用的這種方式;

AtomicMarkableReference則是將一個boolean值作是否有更改的標記,本質就是它的版本號只有兩個,true和false,修改的時候在這兩個版本號之間來回切換,這樣做並不能解決ABA的問題,只是會降低ABA問題發生的機率而已;

    它裡面只有一個成員變數,要做原子更新的物件會被封裝為Pair物件,並賦值給pair;  
    private volatile Pair<V> pair;  
      
    先看它的一個內部類Pair ,要進行原子操作的物件會被封裝為Pair物件  
    private static class Pair<T> {  
        final T reference;     //要進行原子操作的物件  
        final int stamp;       //當前的版本號  
        private Pair(T reference, int stamp) {  
            this.reference = reference;  
            this.stamp = stamp;  
        }  
        static <T> Pair<T> of(T reference, int stamp) { //該靜態方法會在AtomicStampedReference的構造方法中被呼叫,返回一個Pair物件;  
            return new Pair<T>(reference, stamp);  
        }  
    }  
    現在再看構造方法就明白了,就是將原子操作的物件封裝為pair物件  
    public AtomicStampedReference(V initialRef, int initialStamp) {  
        pair = Pair.of(initialRef, initialStamp);  
    }  
             
           獲取版本號      
           就是返回成員變數pair的stamp的值         
    public int getStamp() {  
        return pair.stamp;  
    }  
      
            原子修改操作,四個引數分別是舊的物件,將要修改的新的物件,原始的版本號,新的版本號  
            這個操作如果成功就會將expectedReference修改為newReference,將版本號expectedStamp修改為newStamp;  
    public boolean compareAndSet(V   expectedReference,  
                                 V   newReference,  
                                 int expectedStamp,  
                                 int newStamp) {  
        Pair<V> current = pair;  

  二、LinkedBlockingQueue

java.util.concurrent.LinkedBlockingQueue是一個基於單向連結串列的、範圍任意的(其實是有界的)、FIFO 阻塞佇列。訪問與移除操作是在隊頭進行,新增操作是在隊尾進行,並分別使用不同的鎖進行保護,只有在可能涉及多個節點的操作才同時對兩個鎖進行加鎖。

佇列是否為空、是否已滿仍然是通過元素數量的計數器(count)進行判斷的,由於可以同時在隊頭、隊尾併發地進行訪問、新增操作,所以這個計數器必須是執行緒安全的,這裡使用了一個原子類AtomicInteger,這就決定了它的容量範圍是: 1 – Integer.MAX_VALUE。

由於同時使用了兩把鎖,在需要同時使用兩把鎖時,加鎖順序與釋放順序是非常重要的:必須以固定的順序進行加鎖,再以與加鎖順序的相反的順序釋放鎖。

1.offer操作

向佇列尾部插入一個元素,如果佇列有空閒容量則插入成功後返回true,如果佇列已滿則丟棄當前元素然後返回false,如果 e元素為null,則丟擲空指標異常(NullPointerException),還有一點就是,該方法是非阻塞的

put 操作

向佇列尾部插入一個元素,如果佇列有空閒則插入後直接返回true,如果佇列已經滿則阻塞當前執行緒知道佇列有空閒插入成功後返回true,如果在阻塞的時候被其他執行緒設定了中斷標誌

take 操作

獲取當前佇列頭部元素並從佇列裡面移除,如果佇列為空則阻塞呼叫執行緒。如果佇列為空則阻塞當前執行緒知道佇列不為空,然後返回元素,如果在阻塞的時候被其他執行緒設定了中斷標誌,則被阻塞執行緒會丟擲InterruptedException 異常而返回。

peek 操作

獲取佇列頭部元素但是不從佇列裡面移除,如果佇列為空則返回 null,該方法是不阻塞的

poll操作

從佇列頭部獲取並移除一個元素,如果佇列為空則返回 null,該方法是不阻塞的。

remove 操作

 移除指定元素。由於移除元素涉及該結點前後兩個結點的訪問與修改,
對兩把鎖加鎖簡化了同步管理。

三、LinkedBlockingDueue

LinkedBlockingDeque是一個由連結串列結構組成的雙向阻塞佇列,即可以從佇列的兩端插入和移除元素。雙向佇列因為多了一個操作佇列的入口,在多執行緒同時入隊時,也就減少了一半的競爭。

相比於其他阻塞佇列,LinkedBlockingDeque多了addFirst、addLast、peekFirst、peekLast等方法,以first結尾的方法,表示插入、獲取獲移除雙端佇列的第一個元素。以last結尾的方法,表示插入、獲取獲移除雙端佇列的最後一個元素。

LinkedBlockingDeque是可選容量的,在初始化時可以設定容量防止其過度膨脹,如果不設定,預設容量大小為Integer.MAX_VALUE。
該類繼承自AbstractQueue抽象類,又實現了BlockingDeque介面

LinkedBlockingDeque類對元素的操作方法比較多,我們下面以putFirst、putLast、pollFirst、pollLast方法來對元素的入隊、出隊操作進行分析。

入隊

putFirst(E e)方法是將指定的元素插入雙端佇列的開頭

入隊操作是通過linkFirst(E e)方法來完成的

若入隊成功,則linkFirst(E e)方法返回true,否則,返回false。若該方法返回false,則當前執行緒會阻塞在notFull條件上。

putLast(E e)方法是將指定的元素插入到雙端佇列的末尾

該方法和putFirst(E e)方法幾乎一樣,不同點在於,putLast(E e)方法通過呼叫linkLast(E e)方法來插入節點

若入隊成功,則linkLast(E e)方法返回true,否則,返回false。若該方法返回false,則當前執行緒會阻塞在notFull條件上。

出隊

pollFirst()方法是獲取並移除此雙端佇列的首節點,若不存在,則返回null,移除首節點的操作是通過unlinkFirst()方法來完成的

pollLast()方法是獲取並移除此雙端佇列的尾節點,若不存在,則返回null

移除尾節點的操作是通過unlinkLast()方法來完成的

其實LinkedBlockingDeque類的入隊、出隊操作都是通過linkFirst、linkLast、unlinkFirst、unlinkLast這幾個方法來實現的

四、DelayQueue

DelayQueue是一個無界的BlockingQueue,用於放置實現了Delayed介面的物件,其中的物件只能在其到期時才能從佇列中取走。這種佇列是有序的,即隊頭物件的延遲到期時間最長。注意:不能將null元素放置到這種佇列中。

主要構造方法
public DelayQueue() {}

public DelayQueue(Collection<? extends E> c) {
    this.addAll(c);
}

入隊

因為DelayQueue是阻塞佇列,且優先順序佇列是無界的,所以入隊不會阻塞不會超時,因此它的四個入隊方法是一樣的。

public boolean add(E e) {
    return offer(e);
}

public void put(E e) {
    offer(e);
}

public boolean offer(E e, long timeout, TimeUnit unit) {
    return offer(e);
}

public boolean offer(E e) {
    final ReentrantLock lock = this.lock;
    lock.lock();
    try {
        q.offer(e);
        if (q.peek() == e) {
            leader = null;
            available.signal();
        }
        return true;
    } finally {
        lock.unlock();
    }
}

  

入隊方法比較簡單:

(1)加鎖;

(2)新增元素到優先順序佇列中;

(3)如果新增的元素是堆頂元素,就把leader置為空,並喚醒等待在條件available上的執行緒;

(4)解鎖;

出隊

因為DelayQueue是阻塞佇列,所以它的出隊有四個不同的方法,有丟擲異常的,有阻塞的,有不阻塞的,有超時的。

我們這裡主要分析兩個,poll()和take()方法。

poll()方法比較簡單:

(1)加鎖;

(2)檢查第一個元素,如果為空或者還沒到期,就返回null;

(3)如果第一個元素到期了就呼叫優先順序佇列的poll()彈出第一個元素;

(4)解鎖。

take()方法稍微要複雜一些:

(1)加鎖;

(2)判斷堆頂元素是否為空,為空的話直接阻塞等待;

(3)判斷堆頂元素是否到期,到期了直接呼叫優先順序佇列的poll()彈出元素;

(4)沒到期,再判斷前面是否有其它執行緒在等待,有則直接等待;

(5)前面沒有其它執行緒在等待,則把自己當作第一個執行緒等待delay時間後喚醒,再嘗試獲取元素;

(6)獲取到元素之後再喚醒下一個等待的執行緒;

(7)解鎖;