JUC併發包
Java併發包(java.util.concurrent包,簡稱J.U.C)的構成:
J.U.C核心由5大塊組成:atomic包、locks包、collections包、tools包(AQS)、executor包(執行緒池)
一、Atomic包
Atomic包是java.util.concurrent下的另一個專門為執行緒安全設計的Java包,包含多個原子操作類。這個包裡面提供了一組原子變數類。其基本的特性就是在多執行緒環境下,當有多個執行緒同時執行這些類的例項包含的方法時,具有排他性,即當某個執行緒進入方法,執行其中的指令時,不會被其他執行緒打斷,而別的執行緒就像自旋鎖一樣,一直等到該方法執行完成,才由JVM從等待佇列中選擇一個另一個執行緒進入,這只是一種邏輯上的理解。實際上是藉助硬體的相關指令來實現的,不會阻塞執行緒(或者說只是在硬體級別上阻塞了)。可以對基本資料、陣列中的基本資料、對類中的基本資料進行操作。原子變數類相當於一種泛化的volatile
Atomic中標量類AtomicBoolean,AtomicInteger,AtomicLong,AtomicReference這四種基本型別用來處理布林,整數,長整數,物件四種資料,其內部實現不是簡單的使用synchronized,而是一個更為高效的方式CAS (compare and swap) + volatile和native方法,從而避免了synchronized的高開銷,執行效率大為提升。
CAS(Compare and swap)比較和替換是設計併發演算法時用到的一種技術。簡單來說,比較和替換是使用一個期望值和一個變數的當前值進行比較,如果當前變數的值與我們期望的值相等,就使用一個新值替換當前變數的值
(一)AtomicBoolean原始碼分析
AtomicBoolean 內部的屬性
// 設定為使用Unsafe.compareAndSwapInt進行更新 private static final Unsafe unsafe = Unsafe.getUnsafe(); //儲存修改變數的實際記憶體地址,通過unsafe.objectFieldOffset讀取 private static final long valueOffset; // 初始化的時候,執行靜態程式碼塊,計算出儲存的value的記憶體地址便於直接進行記憶體操作 //objectFieldOffset(Final f):返回給定的非靜態屬性在它的類的儲存分配中的位置(偏移地址)。 static { try { valueOffset = unsafe.objectFieldOffset (AtomicBoolean.class.getDeclaredField("value")); } catch (Exception ex) { throw new Error(ex); } } private volatile int value;
建構函式
/** * Creates a new {@code AtomicBoolean} with the given initial value. * 通過給定的初始值,將boolean轉為int後初始化value * @param initialValue the initial value */ public AtomicBoolean(boolean initialValue) { value = initialValue ? 1 : 0; } /** * Creates a new {@code AtomicBoolean} with initial value {@code false}. * 初始化為預設值,預設為false,因為int的預設值是0 */ public AtomicBoolean() {}
/** * Atomically sets to the given value and returns the previous value. * * @param newValue the new value * @return the previous value */ // 通過原子的方式設定給定的值,並返回之前的值 public final boolean getAndSet(boolean newValue) { boolean prev; do { //先get()到原來的值,再進行原子更新,會一直迴圈直到更新成功 prev = get(); } while (!compareAndSet(prev, newValue)); return prev; }
compareAndSet
public final boolean compareAndSet(boolean expect, boolean update) { int e = expect ? 1 : 0; int u = update ? 1 : 0; //unsafe.compareAndSwapInt:原子性地更新偏移地址為valueOffset的屬性值為u,當且僅當偏移地址為alueOffset的屬性的當前值為e才會更新成功,否則返回false。 return unsafe.compareAndSwapInt(this, valueOffset, e, u); }
AtomicBoolean的使用舉例
//如果想讓某種操作只執行一次,初始atomicBoolean為false AtomicBoolean atomicBoolean = new AtomicBoolean(false); //如果當前值為false,設定當前值為true,如果設定成功,返回true if (atomicBoolean.compareAndSet(false,true)){ //執行操作 }
(二)AtomicInteger原始碼分析
構造方法
public AtomicBoolean(boolean initialValue){ value = initialValue ? 1 : 0; } public AtomicBoolean(){}
一個有參,一個無參,無參時成員變數value值為0,也就是false.
set()和lazySet()
public final void set(int newValue) { value = newValue; }
lazySet()方法實現了對value的非volatile賦值,通過呼叫unsafe.putOrderedInt()方法,直接向固定偏移量的記憶體上寫入資料,但不使其對其他執行緒立刻可見(putOrderedInt()是putIntVolatile()的延遲實現)。
public final void lazySet(int newValue) { unsafe.putOrderedInt(this, valueOffset, newValue); }
想要獲取value的值就需要使用get()方法,AtomicInteger除了提供基本的get()方法之外,還提供了getAndSet(),getAndIncrement(),getAndDecrement(),getAndAdd(),getAndUpdate()和getAndAccumulate()等方法。
getAndSet()方法實際上呼叫getAndSetInt()方法,它的底層實現邏輯是利用getIntVolatile()方法獲取value後進行的自旋CAS操作。
getAndIncrement(),getAndDecrement()和getAndAdd()
這兩個方法實現的原理和getAndSet()方法基本是一樣的,只是將get出來的value加1或減1了而已。
getAndUpdate()和getAndAccumulate()分別以函數語言程式設計介面IntUnaryOperator和IntBinaryOperator為入參,實現AtomicInteger的自定義變換。使用者可以自定義一個函式,讓value按函式計算結果遞增,也可以定義兩個可以互相翻轉的int,使value交替變換。(三)AtomicStampedReference &AtomicMarkableReference
防止ABA問題,分別靠時間戳和Mark標記位來方式ABA問題的發生。
AtomicMarkableReference和 AtomicStampedReference原始碼幾乎相同,唯一區別就在於一個是int型的時間戳,而這個類則是布林型的標記值。
兩者區別在於AtomicStampedReference可以知道修改了多少次,而AtomicMarkableReference則只知道有沒有被修改過
cas的ABA問題就是 假設初始值為A,執行緒3和執行緒1都獲取到了初始值A,然後執行緒1將A改為了B,執行緒2將B又改回了A,這時候執行緒3做修改時,是感知不到這個值從A改為了B又改回了A的過程:
AtomicStampedReference 本質是有一個int 值作為版本號,每次更改前先取到這個int值的版本號,等到修改的時候,比較當前版本號與當前執行緒持有的版本號是否一致,如果一致,則進行修改,並將版本號+1(當然加多少或減多少都是可以自己定義的),在zookeeper中保持資料的一致性也是用的這種方式;
AtomicMarkableReference則是將一個boolean值作是否有更改的標記,本質就是它的版本號只有兩個,true和false,修改的時候在這兩個版本號之間來回切換,這樣做並不能解決ABA的問題,只是會降低ABA問題發生的機率而已;
它裡面只有一個成員變數,要做原子更新的物件會被封裝為Pair物件,並賦值給pair; private volatile Pair<V> pair; 先看它的一個內部類Pair ,要進行原子操作的物件會被封裝為Pair物件 private static class Pair<T> { final T reference; //要進行原子操作的物件 final int stamp; //當前的版本號 private Pair(T reference, int stamp) { this.reference = reference; this.stamp = stamp; } static <T> Pair<T> of(T reference, int stamp) { //該靜態方法會在AtomicStampedReference的構造方法中被呼叫,返回一個Pair物件; return new Pair<T>(reference, stamp); } } 現在再看構造方法就明白了,就是將原子操作的物件封裝為pair物件 public AtomicStampedReference(V initialRef, int initialStamp) { pair = Pair.of(initialRef, initialStamp); } 獲取版本號 就是返回成員變數pair的stamp的值 public int getStamp() { return pair.stamp; } 原子修改操作,四個引數分別是舊的物件,將要修改的新的物件,原始的版本號,新的版本號 這個操作如果成功就會將expectedReference修改為newReference,將版本號expectedStamp修改為newStamp; public boolean compareAndSet(V expectedReference, V newReference, int expectedStamp, int newStamp) { Pair<V> current = pair;
二、LinkedBlockingQueue
java.util.concurrent.LinkedBlockingQueue
是一個基於單向連結串列的、範圍任意的(其實是有界的)、FIFO 阻塞佇列。訪問與移除操作是在隊頭進行,新增操作是在隊尾進行,並分別使用不同的鎖進行保護,只有在可能涉及多個節點的操作才同時對兩個鎖進行加鎖。
佇列是否為空、是否已滿仍然是通過元素數量的計數器(count)進行判斷的,由於可以同時在隊頭、隊尾併發地進行訪問、新增操作,所以這個計數器必須是執行緒安全的,這裡使用了一個原子類AtomicInteger
,這就決定了它的容量範圍是: 1 – Integer.MAX_VALUE。
由於同時使用了兩把鎖,在需要同時使用兩把鎖時,加鎖順序與釋放順序是非常重要的:必須以固定的順序進行加鎖,再以與加鎖順序的相反的順序釋放鎖。
1.offer操作
向佇列尾部插入一個元素,如果佇列有空閒容量則插入成功後返回true,如果佇列已滿則丟棄當前元素然後返回false,如果 e元素為null,則丟擲空指標異常(NullPointerException),還有一點就是,該方法是非阻塞的put 操作
向佇列尾部插入一個元素,如果佇列有空閒則插入後直接返回true,如果佇列已經滿則阻塞當前執行緒知道佇列有空閒插入成功後返回true,如果在阻塞的時候被其他執行緒設定了中斷標誌
take 操作
獲取當前佇列頭部元素並從佇列裡面移除,如果佇列為空則阻塞呼叫執行緒。如果佇列為空則阻塞當前執行緒知道佇列不為空,然後返回元素,如果在阻塞的時候被其他執行緒設定了中斷標誌,則被阻塞執行緒會丟擲InterruptedException 異常而返回。
peek 操作
獲取佇列頭部元素但是不從佇列裡面移除,如果佇列為空則返回 null,該方法是不阻塞的
poll操作
從佇列頭部獲取並移除一個元素,如果佇列為空則返回 null,該方法是不阻塞的。
remove 操作
移除指定元素。由於移除元素涉及該結點前後兩個結點的訪問與修改,
對兩把鎖加鎖簡化了同步管理。
三、LinkedBlockingDueue
LinkedBlockingDeque是一個由連結串列結構組成的雙向阻塞佇列,即可以從佇列的兩端插入和移除元素。雙向佇列因為多了一個操作佇列的入口,在多執行緒同時入隊時,也就減少了一半的競爭。
相比於其他阻塞佇列,LinkedBlockingDeque多了addFirst、addLast、peekFirst、peekLast等方法,以first結尾的方法,表示插入、獲取獲移除雙端佇列的第一個元素。以last結尾的方法,表示插入、獲取獲移除雙端佇列的最後一個元素。
LinkedBlockingDeque是可選容量的,在初始化時可以設定容量防止其過度膨脹,如果不設定,預設容量大小為Integer.MAX_VALUE。
該類繼承自AbstractQueue抽象類,又實現了BlockingDeque介面
LinkedBlockingDeque類對元素的操作方法比較多,我們下面以putFirst、putLast、pollFirst、pollLast方法來對元素的入隊、出隊操作進行分析。
入隊
putFirst(E e)方法是將指定的元素插入雙端佇列的開頭
入隊操作是通過linkFirst(E e)方法來完成的
若入隊成功,則linkFirst(E e)方法返回true,否則,返回false。若該方法返回false,則當前執行緒會阻塞在notFull條件上。
putLast(E e)方法是將指定的元素插入到雙端佇列的末尾
該方法和putFirst(E e)方法幾乎一樣,不同點在於,putLast(E e)方法通過呼叫linkLast(E e)方法來插入節點
若入隊成功,則linkLast(E e)方法返回true,否則,返回false。若該方法返回false,則當前執行緒會阻塞在notFull條件上。
出隊
pollFirst()方法是獲取並移除此雙端佇列的首節點,若不存在,則返回null,移除首節點的操作是通過unlinkFirst()方法來完成的
pollLast()方法是獲取並移除此雙端佇列的尾節點,若不存在,則返回null
移除尾節點的操作是通過unlinkLast()方法來完成的
其實LinkedBlockingDeque類的入隊、出隊操作都是通過linkFirst、linkLast、unlinkFirst、unlinkLast這幾個方法來實現的
四、DelayQueue
DelayQueue是一個無界的BlockingQueue,用於放置實現了Delayed介面的物件,其中的物件只能在其到期時才能從佇列中取走。這種佇列是有序的,即隊頭物件的延遲到期時間最長。注意:不能將null元素放置到這種佇列中。
主要構造方法 public DelayQueue() {} public DelayQueue(Collection<? extends E> c) { this.addAll(c); }
入隊
因為DelayQueue是阻塞佇列,且優先順序佇列是無界的,所以入隊不會阻塞不會超時,因此它的四個入隊方法是一樣的。
public boolean add(E e) { return offer(e); } public void put(E e) { offer(e); } public boolean offer(E e, long timeout, TimeUnit unit) { return offer(e); } public boolean offer(E e) { final ReentrantLock lock = this.lock; lock.lock(); try { q.offer(e); if (q.peek() == e) { leader = null; available.signal(); } return true; } finally { lock.unlock(); } }
入隊方法比較簡單:
(1)加鎖;
(2)新增元素到優先順序佇列中;
(3)如果新增的元素是堆頂元素,就把leader置為空,並喚醒等待在條件available上的執行緒;
(4)解鎖;
出隊
因為DelayQueue是阻塞佇列,所以它的出隊有四個不同的方法,有丟擲異常的,有阻塞的,有不阻塞的,有超時的。
我們這裡主要分析兩個,poll()和take()方法。
poll()方法比較簡單:
(1)加鎖;
(2)檢查第一個元素,如果為空或者還沒到期,就返回null;
(3)如果第一個元素到期了就呼叫優先順序佇列的poll()彈出第一個元素;
(4)解鎖。
take()方法稍微要複雜一些:
(1)加鎖;
(2)判斷堆頂元素是否為空,為空的話直接阻塞等待;
(3)判斷堆頂元素是否到期,到期了直接呼叫優先順序佇列的poll()彈出元素;
(4)沒到期,再判斷前面是否有其它執行緒在等待,有則直接等待;
(5)前面沒有其它執行緒在等待,則把自己當作第一個執行緒等待delay時間後喚醒,再嘗試獲取元素;
(6)獲取到元素之後再喚醒下一個等待的執行緒;
(7)解鎖;