環形佇列、 條帶環形佇列 Striped-RingBuffer (史上最全)
文章很長,而且持續更新,建議收藏起來,慢慢讀!瘋狂創客圈總目錄 部落格園版 為您奉上珍貴的學習資源 :
免費贈送 :《尼恩Java面試寶典》 持續更新+ 史上最全 + 面試必備 2000頁+ 面試必備 + 大廠必備 +漲薪必備
免費贈送 經典圖書:《Java高併發核心程式設計(卷1)加強版》 面試必備 + 大廠必備 +漲薪必備 加尼恩免費領
免費贈送 經典圖書:《Java高併發核心程式設計(卷2)加強版》 面試必備 + 大廠必備 +漲薪必備 加尼恩免費領
免費贈送 經典圖書:《Java高併發核心程式設計(卷3)加強版》 面試必備 + 大廠必備 +漲薪必備 加尼恩免費領
免費贈送 經典圖書:《尼恩Java面試寶典 最新版
免費贈送 資源寶庫: Java 必備 百度網盤資源大合集 價值>10000元 加尼恩領取
高效能 BoundedBuffer 條帶環形佇列
Caffeine 原始碼中,用到幾個高效能資料結構要講
- 一個是 條帶環狀 佇列 (超高效能、無鎖佇列)
- 一個是mpsc佇列 (超高效能、無鎖佇列)
- 一個是 多級時間輪
這裡給大家 介紹 環形佇列、 條帶環形佇列 Striped-RingBuffer 。
剩下的兩個結構, 稍後一點 ,使用專門的 博文介紹。
CAS 的優勢與核心問題
由於JVM重量級鎖使用了Linux核心態下的互斥鎖(Mutex),這是重量級鎖開銷很大的原因。
搶佔與釋放的過程中,涉及到 程序的 使用者態和 核心態, 程序的 使用者空間 和核心空間之間的切換, 效能非常低。
而CAS進行自旋搶鎖,這些CAS操作都處於使用者態下,程序不存在使用者態和核心態之間的執行切換,因此JVM輕量級鎖開銷較小。這是 CAS 的優勢。
但是, 任何事情,都有兩面性。
CAS 的核心問題是什麼呢?
在爭用激烈的場景下,會導致大量的CAS空自旋。
比如,在大量的執行緒同時併發修改一個AtomicInteger時,可能有很多執行緒會不停地自旋,甚至有的執行緒會進入一個無限重複的迴圈中。
大量的CAS空自旋會浪費大量的CPU資源,大大降低了程式的效能。
除了存在CAS空自旋之外,在SMP架構的CPU平臺上,大量的CAS操作還可能導致“匯流排風暴”,具體可參見《Java高併發核心程式設計 卷2 加強版》第5章的內容。
在高併發場景下如何提升CAS操作效能/ 解決CAS惡性空自旋 問題呢?
較為常見的方案有兩種:
- 分散操作熱點、
- 使用佇列削峰。
比如,在自增的場景中, 可以使用LongAdder替代AtomicInteger。
這是一種 分散操作熱點 ,空間換時間 方案,
也是 分而治之的思想。
以空間換時間:LongAdder 以及 Striped64
Java 8提供一個新的類LongAdder,以空間換時間的方式提升高併發場景下CAS操作效能。
LongAdder核心思想就是熱點分離,與ConcurrentHashMap的設計思想類似:將value值分離成一個數組,當多執行緒訪問時,通過Hash演算法將執行緒對映到陣列的一個元素進行操作;而獲取最終的value結果時,則將陣列的元素求和。
最終,通過LongAdder將內部操作物件從單個value值“演變”成一系列的陣列元素,從而減小了內部競爭的粒度。LongAdder的演變如圖3-10所示。
圖3-10 LongAdder的操作物件由單個value值“演變”成了陣列
LongAdder的分治思想和架構
LongAdder的操作物件由單個value值“演變”成了陣列
LongAdder 繼承了 Striped64,核心原始碼在 Striped64中。
條帶累加Striped64的結構和原始碼
/**
* A package-local class holding common representation and mechanics
* for classes supporting dynamic striping on 64bit values. The class
* extends Number so that concrete subclasses must publicly do so.
*/
@SuppressWarnings("serial")
abstract class Striped64 extends Number {
/**
* Padded variant of AtomicLong supporting only raw accesses plus CAS.
*
* JVM intrinsics note: It would be possible to use a release-only
* form of CAS here, if it were provided.
*/
@sun.misc.Contended static final class Cell {
volatile long value;
Cell(long x) { value = x; }
final boolean cas(long cmp, long val) {
return UNSAFE.compareAndSwapLong(this, valueOffset, cmp, val);
}
// Unsafe mechanics
private static final sun.misc.Unsafe UNSAFE;
private static final long valueOffset;
static {
try {
UNSAFE = sun.misc.Unsafe.getUnsafe();
Class<?> ak = Cell.class;
valueOffset = UNSAFE.objectFieldOffset
(ak.getDeclaredField("value"));
} catch (Exception e) {
throw new Error(e);
}
}
}
/** Number of CPUS, to place bound on table size */
static final int NCPU = Runtime.getRuntime().availableProcessors();
/**
* Table of cells. When non-null, size is a power of 2.
*/
transient volatile Cell[] cells;
/**
* Base value, used mainly when there is no contention, but also as
* a fallback during table initialization races. Updated via CAS.
*/
transient volatile long base;
/**
* Spinlock (locked via CAS) used when resizing and/or creating Cells.
*/
transient volatile int cellsBusy;
/**
* Package-private default constructor
*/
Striped64() {
}
以上原始碼的特別複雜,請參見 《Java高併發核心程式設計 卷2 加強版》
BoundedBuffer 的核心原始碼
/**
* A striped, non-blocking, bounded buffer.
*
* @author [email protected] (Ben Manes)
* @param <E> the type of elements maintained by this buffer
*/
final class BoundedBuffer<E> extends StripedBuffer<E>
它是一個 striped、非阻塞、有界限的 buffer,繼承於StripedBuffer類。
下面看看StripedBuffer的實現:
/**
* A base class providing the mechanics for supporting dynamic striping of bounded buffers. This
* implementation is an adaption of the numeric 64-bit {@link java.util.concurrent.atomic.Striped64}
* class, which is used by atomic counters. The approach was modified to lazily grow an array of
* buffers in order to minimize memory usage for caches that are not heavily contended on.
*
* @author [email protected] (Doug Lea)
* @author [email protected] (Ben Manes)
*/
abstract class StripedBuffer<E> implements Buffer<E>
StripedBuffer (條帶緩衝)的架構
解決CAS惡性空自旋的有效方式之一是以空間換時間,較為常見的方案有兩種:
- 分散操作熱點、
- 使用佇列削峰。
這個StripedBuffer設計的思想是跟Striped64類似的,通過擴充套件結構把分散操作熱點(/競爭熱點分離)。
具體實現是這樣的,StripedBuffer維護一個Buffer[]陣列,叫做table,每個元素就是一個RingBuffer,
每個執行緒用自己id屬性作為 hash 值的種子產生hash值,這樣就相當於每個執行緒都有自己“專屬”的RingBuffer,
在hash分散很均衡的場景下,就不會盡量的降低競爭,避免空自旋,
、
看看StripedBuffer的屬性
/** Table of buffers. When non-null, size is a power of 2. */
//RingBuffer陣列
transient volatile Buffer<E> @Nullable[] table;
//當進行resize時,需要整個table鎖住。tableBusy作為CAS的標記。
static final long TABLE_BUSY = UnsafeAccess.objectFieldOffset(StripedBuffer.class, "tableBusy");
static final long PROBE = UnsafeAccess.objectFieldOffset(Thread.class, "threadLocalRandomProbe");
/** Number of CPUS. */
static final int NCPU = Runtime.getRuntime().availableProcessors();
/** The bound on the table size. */
//table最大size
static final int MAXIMUM_TABLE_SIZE = 4 * ceilingNextPowerOfTwo(NCPU);
/** The maximum number of attempts when trying to expand the table. */
//如果發生競爭時(CAS失敗)的嘗試次數
static final int ATTEMPTS = 3;
/** Table of buffers. When non-null, size is a power of 2. */
//核心資料結構
transient volatile Buffer<E> @Nullable[] table;
/** Spinlock (locked via CAS) used when resizing and/or creating Buffers. */
transient volatile int tableBusy;
/** CASes the tableBusy field from 0 to 1 to acquire lock. */
final boolean casTableBusy() {
return UnsafeAccess.UNSAFE.compareAndSwapInt(this, TABLE_BUSY, 0, 1);
}
/**
* Returns the probe value for the current thread. Duplicated from ThreadLocalRandom because of
* packaging restrictions.
*/
static final int getProbe() {
return UnsafeAccess.UNSAFE.getInt(Thread.currentThread(), PROBE);
}
offer方法,當沒初始化或存在競爭時,則擴容為 2 倍。最大為不小於 CPU核數的 2冪值。
/**
* The bound on the table size.
*/
static final int MAXIMUM_TABLE_SIZE = 4 * ceilingPowerOfTwo(NCPU);
實際是呼叫RingBuffer的 offer 方法,把資料追加到RingBuffer後面。
@Override
public int offer(E e) {
int mask;
int result = 0;
Buffer<E> buffer;
//是否不存在競爭
boolean uncontended = true;
Buffer<E>[] buffers = table
//是否已經初始化
if ((buffers == null)
|| (mask = buffers.length - 1) < 0
//用thread的隨機值作為hash值,得到對應位置的RingBuffer
|| (buffer = buffers[getProbe() & mask]) == null
//檢查追加到RingBuffer是否成功
|| !(uncontended = ((result = buffer.offer(e)) != Buffer.FAILED))) {
//其中一個符合條件則進行擴容
expandOrRetry(e, uncontended);
}
return result;
}
/**
* Handles cases of updates involving initialization, resizing, creating new Buffers, and/or
* contention. See above for explanation. This method suffers the usual non-modularity problems of
* optimistic retry code, relying on rechecked sets of reads.
*
* @param e the element to add
* @param wasUncontended false if CAS failed before call
*/
//這個方法比較長,但思路還是相對清晰的。
@SuppressWarnings("PMD.ConfusingTernary")
final void expandOrRetry(E e, boolean wasUncontended) {
int h;
if ((h = getProbe()) == 0) {
ThreadLocalRandom.current(); // force initialization
h = getProbe();
wasUncontended = true;
}
boolean collide = false; // True if last slot nonempty
for (int attempt = 0; attempt < ATTEMPTS; attempt++) {
Buffer<E>[] buffers;
Buffer<E> buffer;
int n;
if (((buffers = table) != null) && ((n = buffers.length) > 0)) {
if ((buffer = buffers[(n - 1) & h]) == null) {
if ((tableBusy == 0) && casTableBusy()) { // Try to attach new Buffer
boolean created = false;
try { // Recheck under lock
Buffer<E>[] rs;
int mask, j;
if (((rs = table) != null) && ((mask = rs.length) > 0)
&& (rs[j = (mask - 1) & h] == null)) {
rs[j] = create(e);
created = true;
}
} finally {
tableBusy = 0;
}
if (created) {
break;
}
continue; // Slot is now non-empty
}
collide = false;
} else if (!wasUncontended) { // CAS already known to fail
wasUncontended = true; // Continue after rehash
} else if (buffer.offer(e) != Buffer.FAILED) {
break;
} else if (n >= MAXIMUM_TABLE_SIZE || table != buffers) {
collide = false; // At max size or stale
} else if (!collide) {
collide = true;
} else if (tableBusy == 0 && casTableBusy()) {
try {
if (table == buffers) { // Expand table unless stale
table = Arrays.copyOf(buffers, n << 1);
}
} finally {
tableBusy = 0;
}
collide = false;
continue; // Retry with expanded table
}
h = advanceProbe(h);
} else if ((tableBusy == 0) && (table == buffers) && casTableBusy()) {
boolean init = false;
try { // Initialize table
if (table == buffers) {
@SuppressWarnings({"unchecked", "rawtypes"})
Buffer<E>[] rs = new Buffer[1];
rs[0] = create(e);
table = rs;
init = true;
}
} finally {
tableBusy = 0;
}
if (init) {
break;
}
}
}
}
環形佇列
我們知道,佇列伴隨著生產和消費,而佇列一般也是由陣列或連結串列來實現的,
佇列是一個先進先出的結構,那麼隨著遊標在陣列上向後移動,
前面已經消費了的資料已沒有意義,但是他們依然佔據著記憶體空間,浪費越來越大,
所以:環形佇列就很好的解決了這個問題。
環形佇列是在實際程式設計極為有用的資料結構,它採用陣列的線性空間,資料組織簡單,能很快知道佇列是否滿或空,能以很快速度的來存取資料。
從順時針看,環形佇列 有隊頭 head 和隊尾 tail。
生產的流程是:
生產者順時針向隊尾 tail 插入元素,這會導致 head 位置不變,tail 位置在後移;
消費的流程是:
消費者則從隊頭 head 開始消費,這會導致 head 向後移動,而tail 位置不變,如果佇列滿了就不能寫入。
環形佇列的特點:
隊頭 head 和隊尾 tail 的位置是不定的,位置一直在迴圈流動,空間就被重複利用起來了。
因為有簡單高效的原因,甚至在硬體都實現了環形佇列.。
環形佇列廣泛用於網路資料收發,和不同程式間資料交換(比如核心與應用程式大量交換資料,從硬體接收大量資料)均使用了環形佇列。
環形佇列的參考實現
下面的環形佇列, 參考了 快取之王 Caffeine 原始碼中的 命名
package com.crazymakercircle.queue;
public class SimpleRingBufferDemo {
public static void main(String[] args) {
//建立一個環形佇列
SimpleRingBuffer queue = new SimpleRingBuffer(4);
queue.offer(11);
queue.offer(12);
queue.offer(13);
System.out.println("queue = " + queue);
int temp = queue.poll();
System.out.println("temp = " + temp);
System.out.println("queue = " + queue);
temp = queue.poll();
System.out.println("temp = " + temp);
System.out.println("queue = " + queue);
temp = queue.poll();
System.out.println("temp = " + temp);
System.out.println("queue = " + queue);
}
}
class SimpleRingBuffer {
private int maxSize;//表示陣列的最大容量
private int head; // 模擬 快取之王 Caffeine 原始碼命名
//head就指向佇列的第一個元素,也就是arr[head]就是佇列的第一個元素
//head的初始值=0
private int tail; // 模擬 快取之王 Caffeine 原始碼命名
//tail指向佇列的最後一個元素的後一個位置,因為希望空出一個空間做為約定
//tail的初始化值=0
private int[] buffer;//該資料用於存放資料
public SimpleRingBuffer(int arrMaxSize) {
maxSize = arrMaxSize;
buffer = new int[maxSize];
}
//判斷佇列是否滿
public boolean isFull() {
return (tail + 1) % maxSize == head;
}
//判斷佇列是否為空
public boolean isEmpty() {
return tail == head;
}
//新增資料到佇列
public void offer(int n) {
//判斷佇列是否滿
if (isFull()) {
System.out.println("佇列滿,不能加入資料");
return;
}
//直接將資料加入
buffer[tail] = n;
//將tail後移,這裡必須考慮取模
tail = (tail + 1) % maxSize;
}
//獲取佇列的資料,出佇列
public int poll() {
//判斷佇列是否空
if (isEmpty()) {
//通過丟擲異常
throw new RuntimeException("佇列空,不能取資料");
}
//這裡需要分析出head是指向佇列的第一個元素
//1.先把head對應的值保留到一個臨時變數
//2.將head後移,考慮取模
//3.將臨時儲存的變數返回
int value = buffer[head];
head = (head + 1) % maxSize;
return value;
}
//求出當前佇列有效資料的個數
public int size() {
return (tail + maxSize - head) % maxSize;
}
@Override
public String toString() {
return String.format("head=%d , tail =%d\n",head,tail);
}
}
測試的結果
環形核心的結構和流程說明
-
約定head指向佇列的第一個元素,
也就是說data[head]就是隊頭資料,head初始值為0。
-
約定tail指向佇列的最後一個元素的後一個位置,
也就是說data[tail-1]就是隊尾資料,tail初始值為0。
-
佇列滿的條件是:
( tail+1 )% maxSize == head
-
佇列空的條件是:
tail == head
-
佇列中的元素個數為:
( tail + maxsize - head) % maxSize
-
有效資料只有maxSize-1個
因為tail指向隊尾的後面一個位置,這個位置就不能存資料,因此有效資料只有maxSize-1個
環形佇列核心操作:判滿
寫入的時候,當前位置的下一位置是(tail+1)% maxSize
由圖可知:
當head剛好指向tail的下一個位置時佇列滿,而tail的下一個位置是 (tail+1)% maxSize,
所以當( tail + 1 )% maxSize == head 時,佇列就滿了。
環形佇列核心操作:判空
佇列為空的情況如下圖所示,當隊頭隊尾都指向一個位置,即 head == tail 時,佇列為空。
當head == tail時,佇列為空
因為tail指向隊尾的後面一個位置,這個位置就不能存資料,
因此, 環形佇列的有效資料只有maxSize-1個
RingBuffer 原始碼
caffeine原始碼中, 注意RingBuffer是BoundedBuffer的內部類。
/** The maximum number of elements per buffer. */
static final int BUFFER_SIZE = 16;
// Assume 4-byte references and 64-byte cache line (16 elements per line)
//256長度,但是是以16為單位,所以最多存放16個元素
static final int SPACED_SIZE = BUFFER_SIZE << 4;
static final int SPACED_MASK = SPACED_SIZE - 1;
static final int OFFSET = 16;
//RingBuffer陣列
final AtomicReferenceArray<E> buffer;
//插入方法
@Override
public int offer(E e) {
long head = readCounter;
long tail = relaxedWriteCounter();
//用head和tail來限制個數
long size = (tail - head);
if (size >= SPACED_SIZE) {
return Buffer.FULL;
}
//tail追加16
if (casWriteCounter(tail, tail + OFFSET)) {
//用tail“取餘”得到下標
int index = (int) (tail & SPACED_MASK);
//用unsafe.putOrderedObject設值
buffer.lazySet(index, e);
return Buffer.SUCCESS;
}
//如果CAS失敗則返回失敗
return Buffer.FAILED;
}
//用consumer來處理buffer的資料
@Override
public void drainTo(Consumer<E> consumer) {
long head = readCounter;
long tail = relaxedWriteCounter();
//判斷資料多少
long size = (tail - head);
if (size == 0) {
return;
}
do {
int index = (int) (head & SPACED_MASK);
E e = buffer.get(index);
if (e == null) {
// not published yet
break;
}
buffer.lazySet(index, null);
consumer.accept(e);
//head也跟tail一樣,每次遞增16
head += OFFSET;
} while (head != tail);
lazySetReadCounter(head);
}
注意,ring buffer 的 size(固定是 16 個)是不變的,變的是 head 和 tail 而已。
Striped-RingBuffer 有如下特點:
總的來說 Striped-RingBuffer 有如下特點:
- 使用 Striped-RingBuffer來提升對 buffer 的讀寫
- 用 thread 的 hash 來避開熱點 key 的競爭
- 允許寫入的丟失
推薦閱讀:
參考文獻
-
ThreadLocal(史上最全)
https://www.cnblogs.com/crazymakercircle/p/14491965.html -
3000頁《尼恩 Java 面試寶典 》的 35個面試專題 :
https://www.cnblogs.com/crazymakercircle/p/13917138.html -
價值10W的架構師知識圖譜
https://www.processon.com/view/link/60fb9421637689719d246739
4、尼恩 架構師哲學
https://www.processon.com/view/link/616f801963768961e9d9aec8
5、尼恩 3高架構知識宇宙
https://www.processon.com/view/link/635097d2e0b34d40be778ab4
Guava Cache主頁:https://github.com/google/guava/wiki/CachesExplained
Caffeine的官網:https://github.com/ben-manes/caffeine/wiki/Benchmarks
https://gitee.com/jd-platform-opensource/hotkey
https://developer.aliyun.com/article/788271?utm_content=m_1000291945
https://b.alipay.com/page/account-manage-oc/approval/setList
Caffeine: https://github.com/ben-manes/caffeine
這裡: https://albenw.github.io/posts/df42dc84/
Benchmarks: https://github.com/ben-manes/caffeine/wiki/Benchmarks