常用Java併發容器
阿新 • • 發佈:2019-08-31
併發容器介紹
- ConcurrentHashMap:併發版HashMap
- CopyOnWriteArrayList:併發版ArrayList
- CopyOnWriteArraySet:併發Set
- ConcurrentLinkedQueue:併發佇列(基於連結串列)
- ConcurrentLinkedDeque:併發佇列(基於雙向連結串列)
- ConcurrentSkipListMap:基於跳錶的併發Map
- ConcurrentSkipListSet:基於跳錶的併發Set
- ArrayBlockingQueue:阻塞佇列(基於陣列)
- LinkedBlockingQueue:阻塞佇列(基於連結串列)
- LinkedBlockingDeque:阻塞佇列(基於雙向連結串列)
- PriorityBlockingQueue:執行緒安全的優先佇列
- SynchronousQueue:讀寫成對的佇列
- LinkedTransferQueue:基於連結串列的資料交換佇列
- DelayQueue:延時佇列
1.ConcurrentHashMap 併發版HashMap
最常見的併發容器之一,可以用作併發場景下的快取。底層依然是雜湊表,但在JAVA 8中有了不小的改變,而JAVA 7和JAVA 8都是用的比較多的版本,因此經常會將這兩個版本的實現方式做一些比較(比如面試中)。 一個比較大的差異就是,JAVA 7中採用分段鎖來減少鎖的競爭,JAVA 8中放棄了分段鎖,採用CAS(一種樂觀鎖),同時為了防止雜湊衝突嚴重時退化成連結串列(衝突時會在該位置生成一個連結串列,雜湊值相同的物件就鏈在一起),會在連結串列長度達到閾值(8)後轉換成紅黑樹(比起連結串列,樹的查詢效率更穩定)。2.CopyOnWriteArrayList 併發版ArrayList
併發版ArrayList,底層結構也是陣列,和ArrayList不同之處在於:當新增和刪除元素時會建立一個新的陣列,在新的陣列中增加或者排除指定物件,最後用新增陣列替換原來的陣列。 適用場景:由於讀操作不加鎖,寫(增、刪、改)操作加鎖,因此適用於讀多寫少的場景。 侷限:由於讀的時候不會加鎖(讀的效率高,就和普通ArrayList一樣),讀取的當前副本,因此可能讀取到髒資料。如果介意,建議不用。 看看原始碼感受下:public class CopyOnWriteArrayList<E>
implements List<E>, RandomAccess, Cloneable, java.io.Serializable {
final transient ReentrantLock lock = new ReentrantLock();
private transient volatile Object[] array;
// 新增元素,有鎖
public boolean add(E e) {
final ReentrantLock lock = this.lock;
lock.lock(); // 修改時加鎖,保證併發安全
try {
Object[] elements = getArray(); // 當前陣列
int len = elements.length;
Object[] newElements = Arrays.copyOf(elements, len + 1); // 建立一個新陣列,比老的大一個空間
newElements[len] = e; // 要新增的元素放進新陣列
setArray(newElements); // 用新陣列替換原來的陣列
return true;
} finally {
lock.unlock(); // 解鎖
}
}
// 讀元素,不加鎖,因此可能讀取到舊資料
public E get(int index) {
return get(getArray(), index);
}
}
3.CopyOnWriteArraySet 併發Set
基於CopyOnWriteArrayList實現(內含一個CopyOnWriteArrayList成員變數),也就是說底層是一個數組,意味著每次add都要遍歷整個集合才能知道是否存在,不存在時需要插入(加鎖)。 適用場景:在CopyOnWriteArrayList適用場景下加一個,集合別太大(全部遍歷傷不起)。4.ConcurrentLinkedQueue 併發佇列(基於連結串列)
基於連結串列實現的併發佇列,使用樂觀鎖(CAS)保證執行緒安全。因為資料結構是連結串列,所以理論上是沒有佇列大小限制的,也就是說新增資料一定能成功。5.ConcurrentLinkedDeque 併發佇列(基於雙向連結串列)
基於雙向連結串列實現的併發佇列,可以分別對頭尾進行操作,因此除了先進先出(FIFO),也可以先進後出(FILO),當然先進後出的話應該叫它棧了。6.ConcurrentSkipListMap 基於跳錶的併發Map
SkipList即跳錶,跳錶是一種空間換時間的資料結構,通過冗餘資料,將連結串列一層一層索引,達到類似二分查詢的效果7.ConcurrentSkipListSet 基於跳錶的併發Set
類似HashSet和HashMap的關係,ConcurrentSkipListSet裡面就是一個ConcurrentSkipListMap,就不細說了。8.ArrayBlockingQueue 阻塞佇列(基於陣列)
基於陣列實現的可阻塞佇列,構造時必須制定陣列大小,往裡面放東西時如果陣列滿了便會阻塞直到有位置(也支援直接返回和超時等待),通過一個鎖ReentrantLock保證執行緒安全。 用offer操作舉個例子:public class ArrayBlockingQueue<E> extends AbstractQueue<E>
implements BlockingQueue<E>, java.io.Serializable {
/**
* 讀寫共用此鎖,執行緒間通過下面兩個Condition通訊
* 這兩個Condition和lock有緊密聯絡(就是lock的方法生成的)
* 類似Object的wait/notify
*/
final ReentrantLock lock;
/** 佇列不為空的訊號,取資料的執行緒需要關注 */
private final Condition notEmpty;
/** 佇列沒滿的訊號,寫資料的執行緒需要關注 */
private final Condition notFull;
// 一直阻塞直到有東西可以拿出來
public E take() throws InterruptedException {
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
while (count == 0)
notEmpty.await();
return dequeue();
} finally {
lock.unlock();
}
}
// 在尾部插入一個元素,佇列已滿時等待指定時間,如果還是不能插入則返回
public boolean offer(E e, long timeout, TimeUnit unit)
throws InterruptedException {
checkNotNull(e);
long nanos = unit.toNanos(timeout);
final ReentrantLock lock = this.lock;
lock.lockInterruptibly(); // 鎖住
try {
// 迴圈等待直到佇列有空閒
while (count == items.length) {
if (nanos <= 0)
return false;// 等待超時,返回
// 暫時放出鎖,等待一段時間(可能被提前喚醒並搶到鎖,所以需要迴圈判斷條件)
// 這段時間可能其他執行緒取走了元素,這樣就有機會插入了
nanos = notFull.awaitNanos(nanos);
}
enqueue(e);//插入一個元素
return true;
} finally {
lock.unlock(); //解鎖
}
}
乍一看會有點疑惑,讀和寫都是同一個鎖,那要是空的時候正好一個讀執行緒來了不會一直阻塞嗎?
答案就在notEmpty、notFull裡,這兩個出自lock的小東西讓鎖有了類似synchronized + wait + notify的功能。
9.LinkedBlockingQueue 阻塞佇列(基於連結串列)
基於連結串列實現的阻塞佇列,想比與不阻塞的ConcurrentLinkedQueue,它多了一個容量限制,如果不設定預設為int最大值。10.LinkedBlockingDeque 阻塞佇列(基於雙向連結串列)
類似LinkedBlockingQueue,但提供了雙向連結串列特有的操作。11.PriorityBlockingQueue 執行緒安全的優先佇列
構造時可以傳入一個比較器,可以看做放進去的元素會被排序,然後讀取的時候按順序消費。某些低優先順序的元素可能長期無法被消費,因為不斷有更高優先順序的元素進來。12.SynchronousQueue 資料同步交換的佇列
一個虛假的佇列,因為它實際上沒有真正用於儲存元素的空間,每個插入操作都必須有對應的取出操作,沒取出時無法繼續放入。 一個簡單的例子感受一下:import java.util.concurrent.*;
public class Main {
public static void main(String[] args) {
SynchronousQueue<Integer> queue = new SynchronousQueue<>();
new Thread(() -> {
try {
// 沒有休息,瘋狂寫入
for (int i = 0; ; i++) {
System.out.println("放入: " + i);
queue.put(i);
}
} catch (InterruptedException e) {
e.printStackTrace();
}
}).start();
new Thread(() -> {
try {
// 鹹魚模式取資料
while (true) {
System.out.println("取出: " + queue.take());
Thread.sleep((long) (Math.random() * 2000));
}
} catch (InterruptedException e) {
e.printStackTrace();
}
}).start();
}
}
/* 輸出:
放入: 0
取出: 0
放入: 1
取出: 1
放入: 2
取出: 2
放入: 3
取出: 3
*/
可以看到,寫入的執行緒沒有任何sleep,可以說是全力往佇列放東西,而讀取的執行緒又很不積極,讀一個又sleep一會。輸出的結果卻是讀寫操作成對出現。 JAVA中一個使用場景就是Executors.newCachedThreadPool(),建立一個快取執行緒池。
public static ExecutorService newCachedThreadPool() {
return new ThreadPoolExecutor(
0, // 核心執行緒為0,沒用的執行緒都被無情拋棄
Integer.MAX_VALUE, // 最大執行緒數理論上是無限了,還沒到這個值機器資源就被掏空了
60L, TimeUnit.SECONDS, // 閒置執行緒60秒後銷燬
new SynchronousQueue<Runnable>()); // offer時如果沒有空閒執行緒取出任務,則會失敗,執行緒池就會新建一個執行緒
}