1. 程式人生 > 其它 >環形佇列、 條帶環形佇列 Striped-RingBuffer (史上最全)

環形佇列、 條帶環形佇列 Striped-RingBuffer (史上最全)

文章很長,而且持續更新,建議收藏起來,慢慢讀!瘋狂創客圈總目錄 部落格園版 為您奉上珍貴的學習資源 :

免費贈送 :《尼恩Java面試寶典》 持續更新+ 史上最全 + 面試必備 2000頁+ 面試必備 + 大廠必備 +漲薪必備
免費贈送 經典圖書:《Java高併發核心程式設計(卷1)加強版》 面試必備 + 大廠必備 +漲薪必備 加尼恩免費領
免費贈送 經典圖書:《Java高併發核心程式設計(卷2)加強版》 面試必備 + 大廠必備 +漲薪必備 加尼恩免費領
免費贈送 經典圖書:《Java高併發核心程式設計(卷3)加強版》 面試必備 + 大廠必備 +漲薪必備 加尼恩免費領
免費贈送 經典圖書:尼恩Java面試寶典 最新版

面試必備 + 大廠必備 +漲薪必備 加尼恩免費領
免費贈送 資源寶庫: Java 必備 百度網盤資源大合集 價值>10000元 加尼恩領取


高效能 BoundedBuffer 條帶環形佇列

Caffeine 原始碼中,用到幾個高效能資料結構要講

  • 一個是 條帶環狀 佇列 (超高效能、無鎖佇列
  • 一個是mpsc佇列 (超高效能、無鎖佇列
  • 一個是 多級時間輪

這裡給大家 介紹 環形佇列、 條帶環形佇列 Striped-RingBuffer 。

剩下的兩個結構, 稍後一點 ,使用專門的 博文介紹。

CAS 的優勢與核心問題

由於JVM重量級鎖使用了Linux核心態下的互斥鎖(Mutex),這是重量級鎖開銷很大的原因。

搶佔與釋放的過程中,涉及到 程序的 使用者態和 核心態, 程序的 使用者空間 和核心空間之間的切換, 效能非常低。

而CAS進行自旋搶鎖,這些CAS操作都處於使用者態下,程序不存在使用者態和核心態之間的執行切換,因此JVM輕量級鎖開銷較小。這是 CAS 的優勢。

但是, 任何事情,都有兩面性。

CAS 的核心問題是什麼呢?

在爭用激烈的場景下,會導致大量的CAS空自旋。

比如,在大量的執行緒同時併發修改一個AtomicInteger時,可能有很多執行緒會不停地自旋,甚至有的執行緒會進入一個無限重複的迴圈中。

大量的CAS空自旋會浪費大量的CPU資源,大大降低了程式的效能。

除了存在CAS空自旋之外,在SMP架構的CPU平臺上,大量的CAS操作還可能導致“匯流排風暴”,具體可參見《Java高併發核心程式設計 卷2 加強版》第5章的內容。

在高併發場景下如何提升CAS操作效能/ 解決CAS惡性空自旋 問題呢?

較為常見的方案有兩種:

  • 分散操作熱點、
  • 使用佇列削峰。

比如,在自增的場景中, 可以使用LongAdder替代AtomicInteger。

這是一種 分散操作熱點 ,空間換時間 方案,

也是 分而治之的思想。

以空間換時間:LongAdder 以及 Striped64

Java 8提供一個新的類LongAdder,以空間換時間的方式提升高併發場景下CAS操作效能。

LongAdder核心思想就是熱點分離,與ConcurrentHashMap的設計思想類似:將value值分離成一個數組,當多執行緒訪問時,通過Hash演算法將執行緒對映到陣列的一個元素進行操作;而獲取最終的value結果時,則將陣列的元素求和。

最終,通過LongAdder將內部操作物件從單個value值“演變”成一系列的陣列元素,從而減小了內部競爭的粒度。LongAdder的演變如圖3-10所示。

圖3-10 LongAdder的操作物件由單個value值“演變”成了陣列

LongAdder的分治思想和架構

LongAdder的操作物件由單個value值“演變”成了陣列

LongAdder 繼承了 Striped64,核心原始碼在 Striped64中。

條帶累加Striped64的結構和原始碼

/**
 * A package-local class holding common representation and mechanics
 * for classes supporting dynamic striping on 64bit values. The class
 * extends Number so that concrete subclasses must publicly do so.
 */
@SuppressWarnings("serial")
abstract class Striped64 extends Number {
   

    /**
     * Padded variant of AtomicLong supporting only raw accesses plus CAS.
     *
     * JVM intrinsics note: It would be possible to use a release-only
     * form of CAS here, if it were provided.
     */
    @sun.misc.Contended static final class Cell {
        volatile long value;
        Cell(long x) { value = x; }
        final boolean cas(long cmp, long val) {
            return UNSAFE.compareAndSwapLong(this, valueOffset, cmp, val);
        }

        // Unsafe mechanics
        private static final sun.misc.Unsafe UNSAFE;
        private static final long valueOffset;
        static {
            try {
                UNSAFE = sun.misc.Unsafe.getUnsafe();
                Class<?> ak = Cell.class;
                valueOffset = UNSAFE.objectFieldOffset
                    (ak.getDeclaredField("value"));
            } catch (Exception e) {
                throw new Error(e);
            }
        }
    }

    /** Number of CPUS, to place bound on table size */
    static final int NCPU = Runtime.getRuntime().availableProcessors();

    /**
     * Table of cells. When non-null, size is a power of 2.
     */
    transient volatile Cell[] cells;

    /**
     * Base value, used mainly when there is no contention, but also as
     * a fallback during table initialization races. Updated via CAS.
     */
    transient volatile long base;

    /**
     * Spinlock (locked via CAS) used when resizing and/or creating Cells.
     */
    transient volatile int cellsBusy;

    /**
     * Package-private default constructor
     */
    Striped64() {
    }

以上原始碼的特別複雜,請參見 《Java高併發核心程式設計 卷2 加強版》

BoundedBuffer 的核心原始碼

/**
 * A striped, non-blocking, bounded buffer.
 *
 * @author [email protected] (Ben Manes)
 * @param <E> the type of elements maintained by this buffer
 */
final class BoundedBuffer<E> extends StripedBuffer<E>

它是一個 striped、非阻塞、有界限的 buffer,繼承於StripedBuffer類。

下面看看StripedBuffer的實現:

/**
 * A base class providing the mechanics for supporting dynamic striping of bounded buffers. This
 * implementation is an adaption of the numeric 64-bit {@link java.util.concurrent.atomic.Striped64}
 * class, which is used by atomic counters. The approach was modified to lazily grow an array of
 * buffers in order to minimize memory usage for caches that are not heavily contended on.
 *
 * @author [email protected] (Doug Lea)
 * @author [email protected] (Ben Manes)
 */

abstract class StripedBuffer<E> implements Buffer<E>

StripedBuffer (條帶緩衝)的架構

解決CAS惡性空自旋的有效方式之一是以空間換時間,較為常見的方案有兩種:

  • 分散操作熱點、
  • 使用佇列削峰。

這個StripedBuffer設計的思想是跟Striped64類似的,通過擴充套件結構把分散操作熱點(/競爭熱點分離)

具體實現是這樣的,StripedBuffer維護一個Buffer[]陣列,叫做table,每個元素就是一個RingBuffer,

每個執行緒用自己id屬性作為 hash 值的種子產生hash值,這樣就相當於每個執行緒都有自己“專屬”的RingBuffer,

在hash分散很均衡的場景下,就不會盡量的降低競爭,避免空自旋,

看看StripedBuffer的屬性

/** Table of buffers. When non-null, size is a power of 2. */
//RingBuffer陣列
transient volatile Buffer<E> @Nullable[] table;

//當進行resize時,需要整個table鎖住。tableBusy作為CAS的標記。
static final long TABLE_BUSY = UnsafeAccess.objectFieldOffset(StripedBuffer.class, "tableBusy");
static final long PROBE = UnsafeAccess.objectFieldOffset(Thread.class, "threadLocalRandomProbe");

/** Number of CPUS. */
static final int NCPU = Runtime.getRuntime().availableProcessors();

/** The bound on the table size. */
//table最大size
static final int MAXIMUM_TABLE_SIZE = 4 * ceilingNextPowerOfTwo(NCPU);

/** The maximum number of attempts when trying to expand the table. */
//如果發生競爭時(CAS失敗)的嘗試次數
static final int ATTEMPTS = 3;

/** Table of buffers. When non-null, size is a power of 2. */
//核心資料結構
transient volatile Buffer<E> @Nullable[] table;

/** Spinlock (locked via CAS) used when resizing and/or creating Buffers. */
transient volatile int tableBusy;

/** CASes the tableBusy field from 0 to 1 to acquire lock. */
final boolean casTableBusy() {
  return UnsafeAccess.UNSAFE.compareAndSwapInt(this, TABLE_BUSY, 0, 1);
}

/**
 * Returns the probe value for the current thread. Duplicated from ThreadLocalRandom because of
 * packaging restrictions.
 */
static final int getProbe() {
  return UnsafeAccess.UNSAFE.getInt(Thread.currentThread(), PROBE);
}

offer方法,當沒初始化或存在競爭時,則擴容為 2 倍。最大為不小於 CPU核數的 2冪值。



    /**
     * The bound on the table size.
     */
    static final int MAXIMUM_TABLE_SIZE = 4 * ceilingPowerOfTwo(NCPU);


實際是呼叫RingBuffer的 offer 方法,把資料追加到RingBuffer後面。

@Override
public int offer(E e) {
  int mask;
  int result = 0;
  Buffer<E> buffer;
  //是否不存在競爭
  boolean uncontended = true;
  Buffer<E>[] buffers = table
  //是否已經初始化
  if ((buffers == null)
      || (mask = buffers.length - 1) < 0
      //用thread的隨機值作為hash值,得到對應位置的RingBuffer
      || (buffer = buffers[getProbe() & mask]) == null
      //檢查追加到RingBuffer是否成功
      || !(uncontended = ((result = buffer.offer(e)) != Buffer.FAILED))) {
    //其中一個符合條件則進行擴容
    expandOrRetry(e, uncontended);
  }
  return result;
}

/**
 * Handles cases of updates involving initialization, resizing, creating new Buffers, and/or
 * contention. See above for explanation. This method suffers the usual non-modularity problems of
 * optimistic retry code, relying on rechecked sets of reads.
 *
 * @param e the element to add
 * @param wasUncontended false if CAS failed before call
 */

//這個方法比較長,但思路還是相對清晰的。
@SuppressWarnings("PMD.ConfusingTernary")
final void expandOrRetry(E e, boolean wasUncontended) {
  int h;
  if ((h = getProbe()) == 0) {
    ThreadLocalRandom.current(); // force initialization
    h = getProbe();
    wasUncontended = true;
  }
  boolean collide = false; // True if last slot nonempty
  for (int attempt = 0; attempt < ATTEMPTS; attempt++) {
    Buffer<E>[] buffers;
    Buffer<E> buffer;
    int n;
    if (((buffers = table) != null) && ((n = buffers.length) > 0)) {
      if ((buffer = buffers[(n - 1) & h]) == null) {
        if ((tableBusy == 0) && casTableBusy()) { // Try to attach new Buffer
          boolean created = false;
          try { // Recheck under lock
            Buffer<E>[] rs;
            int mask, j;
            if (((rs = table) != null) && ((mask = rs.length) > 0)
                && (rs[j = (mask - 1) & h] == null)) {
              rs[j] = create(e);
              created = true;
            }
          } finally {
            tableBusy = 0;
          }
          if (created) {
            break;
          }
          continue; // Slot is now non-empty
        }
        collide = false;
      } else if (!wasUncontended) { // CAS already known to fail
        wasUncontended = true;      // Continue after rehash
      } else if (buffer.offer(e) != Buffer.FAILED) {
        break;
      } else if (n >= MAXIMUM_TABLE_SIZE || table != buffers) {
        collide = false; // At max size or stale
      } else if (!collide) {
        collide = true;
      } else if (tableBusy == 0 && casTableBusy()) {
        try {
          if (table == buffers) { // Expand table unless stale
            table = Arrays.copyOf(buffers, n << 1);
          }
        } finally {
          tableBusy = 0;
        }
        collide = false;
        continue; // Retry with expanded table
      }
      h = advanceProbe(h);
    } else if ((tableBusy == 0) && (table == buffers) && casTableBusy()) {
      boolean init = false;
      try { // Initialize table
        if (table == buffers) {
          @SuppressWarnings({"unchecked", "rawtypes"})
          Buffer<E>[] rs = new Buffer[1];
          rs[0] = create(e);
          table = rs;
          init = true;
        }
      } finally {
        tableBusy = 0;
      }
      if (init) {
        break;
      }
    }
  }
}

環形佇列

我們知道,佇列伴隨著生產和消費,而佇列一般也是由陣列或連結串列來實現的,

佇列是一個先進先出的結構,那麼隨著遊標在陣列上向後移動,

前面已經消費了的資料已沒有意義,但是他們依然佔據著記憶體空間,浪費越來越大,

所以:環形佇列就很好的解決了這個問題。

環形佇列是在實際程式設計極為有用的資料結構,它採用陣列的線性空間,資料組織簡單,能很快知道佇列是否滿或空,能以很快速度的來存取資料。

從順時針看,環形佇列 有隊頭 head 和隊尾 tail。

生產的流程是:

生產者順時針向隊尾 tail 插入元素,這會導致 head 位置不變,tail 位置在後移;

消費的流程是:

消費者則從隊頭 head 開始消費,這會導致 head 向後移動,而tail 位置不變,如果佇列滿了就不能寫入。

環形佇列的特點:

隊頭 head 和隊尾 tail 的位置是不定的,位置一直在迴圈流動,空間就被重複利用起來了。

因為有簡單高效的原因,甚至在硬體都實現了環形佇列.。

環形佇列廣泛用於網路資料收發,和不同程式間資料交換(比如核心與應用程式大量交換資料,從硬體接收大量資料)均使用了環形佇列。

環形佇列的參考實現

下面的環形佇列, 參考了 快取之王 Caffeine 原始碼中的 命名

package com.crazymakercircle.queue;


public class SimpleRingBufferDemo {
    public static void main(String[] args) {

        //建立一個環形佇列
        SimpleRingBuffer queue = new SimpleRingBuffer(4);
        queue.offer(11);
        queue.offer(12);
        queue.offer(13);
        System.out.println("queue = " + queue);
        int temp = queue.poll();
        System.out.println("temp = " + temp);
        System.out.println("queue = " + queue);
        temp = queue.poll();
        System.out.println("temp = " + temp);
        System.out.println("queue = " + queue);
        temp = queue.poll();
        System.out.println("temp = " + temp);
        System.out.println("queue = " + queue);
    }

}

class SimpleRingBuffer {
    private int maxSize;//表示陣列的最大容量
    private int head;  // 模擬 快取之王 Caffeine 原始碼命名
    //head就指向佇列的第一個元素,也就是arr[head]就是佇列的第一個元素
    //head的初始值=0
    private int tail; // 模擬 快取之王 Caffeine 原始碼命名
    //tail指向佇列的最後一個元素的後一個位置,因為希望空出一個空間做為約定
    //tail的初始化值=0
    private int[] buffer;//該資料用於存放資料

    public SimpleRingBuffer(int arrMaxSize) {
        maxSize = arrMaxSize;
        buffer = new int[maxSize];
    }

    //判斷佇列是否滿
    public boolean isFull() {
        return (tail + 1) % maxSize == head;
    }

    //判斷佇列是否為空
    public boolean isEmpty() {
        return tail == head;
    }

    //新增資料到佇列
    public void offer(int n) {
        //判斷佇列是否滿
        if (isFull()) {
            System.out.println("佇列滿,不能加入資料");
            return;
        }
        //直接將資料加入
        buffer[tail] = n;
        //將tail後移,這裡必須考慮取模
        tail = (tail + 1) % maxSize;
    }

    //獲取佇列的資料,出佇列
    public int poll() {
        //判斷佇列是否空
        if (isEmpty()) {
            //通過丟擲異常
            throw new RuntimeException("佇列空,不能取資料");
        }
        //這裡需要分析出head是指向佇列的第一個元素
        //1.先把head對應的值保留到一個臨時變數
        //2.將head後移,考慮取模
        //3.將臨時儲存的變數返回
        int value = buffer[head];
        head = (head + 1) % maxSize;
        return value;
    }

    //求出當前佇列有效資料的個數
    public int size() {
        return (tail + maxSize - head) % maxSize;
    }

    @Override
    public String toString() {
       return   String.format("head=%d , tail =%d\n",head,tail);

    }
}

測試的結果

環形核心的結構和流程說明

  1. 約定head指向佇列的第一個元素

    也就是說data[head]就是隊頭資料,head初始值為0。

  2. 約定tail指向佇列的最後一個元素的後一個位置

    也就是說data[tail-1]就是隊尾資料,tail初始值為0。

  3. 佇列滿的條件是:

    ( tail+1 )% maxSize == head

  4. 佇列空的條件是:

    tail == head

  5. 佇列中的元素個數為:

    ( tail + maxsize - head) % maxSize

  6. 有效資料只有maxSize-1個

    因為tail指向隊尾的後面一個位置,這個位置就不能存資料,因此有效資料只有maxSize-1個

環形佇列核心操作:判滿

寫入的時候,當前位置的下一位置是(tail+1)% maxSize

由圖可知:

當head剛好指向tail的下一個位置時佇列滿,而tail的下一個位置是 (tail+1)% maxSize

所以當( tail + 1 )% maxSize == head 時,佇列就滿了。

環形佇列核心操作:判空

佇列為空的情況如下圖所示,當隊頭隊尾都指向一個位置,即 head == tail 時,佇列為空。

當head == tail時,佇列為空

因為tail指向隊尾的後面一個位置,這個位置就不能存資料,

因此, 環形佇列的有效資料只有maxSize-1個

RingBuffer 原始碼

caffeine原始碼中, 注意RingBuffer是BoundedBuffer的內部類。

/** The maximum number of elements per buffer. */
static final int BUFFER_SIZE = 16;

// Assume 4-byte references and 64-byte cache line (16 elements per line)
//256長度,但是是以16為單位,所以最多存放16個元素
static final int SPACED_SIZE = BUFFER_SIZE << 4;
static final int SPACED_MASK = SPACED_SIZE - 1;
static final int OFFSET = 16;
//RingBuffer陣列
final AtomicReferenceArray<E> buffer;

 //插入方法
 @Override
 public int offer(E e) {
   long head = readCounter;
   long tail = relaxedWriteCounter();
   //用head和tail來限制個數
   long size = (tail - head);
   if (size >= SPACED_SIZE) {
     return Buffer.FULL;
   }
   //tail追加16
   if (casWriteCounter(tail, tail + OFFSET)) {
     //用tail“取餘”得到下標
     int index = (int) (tail & SPACED_MASK);
     //用unsafe.putOrderedObject設值
     buffer.lazySet(index, e);
     return Buffer.SUCCESS;
   }
   //如果CAS失敗則返回失敗
   return Buffer.FAILED;
 }

 //用consumer來處理buffer的資料
 @Override
 public void drainTo(Consumer<E> consumer) {
   long head = readCounter;
   long tail = relaxedWriteCounter();
   //判斷資料多少
   long size = (tail - head);
   if (size == 0) {
     return;
   }
   do {
     int index = (int) (head & SPACED_MASK);
     E e = buffer.get(index);
     if (e == null) {
       // not published yet
       break;
     }
     buffer.lazySet(index, null);
     consumer.accept(e);
     //head也跟tail一樣,每次遞增16
     head += OFFSET;
   } while (head != tail);
   lazySetReadCounter(head);
 }

注意,ring buffer 的 size(固定是 16 個)是不變的,變的是 head 和 tail 而已。

Striped-RingBuffer 有如下特點:

總的來說 Striped-RingBuffer 有如下特點:

  • 使用 Striped-RingBuffer來提升對 buffer 的讀寫
  • 用 thread 的 hash 來避開熱點 key 的競爭
  • 允許寫入的丟失

推薦閱讀:

參考文獻

  1. 瘋狂創客圈 JAVA 高併發 總目錄

    ThreadLocal(史上最全)
    https://www.cnblogs.com/crazymakercircle/p/14491965.html

  2. 3000頁《尼恩 Java 面試寶典 》的 35個面試專題 :
    https://www.cnblogs.com/crazymakercircle/p/13917138.html

  3. 價值10W的架構師知識圖譜
    https://www.processon.com/view/link/60fb9421637689719d246739

4、尼恩 架構師哲學
https://www.processon.com/view/link/616f801963768961e9d9aec8

5、尼恩 3高架構知識宇宙
https://www.processon.com/view/link/635097d2e0b34d40be778ab4

Guava Cache主頁:https://github.com/google/guava/wiki/CachesExplained

Caffeine的官網:https://github.com/ben-manes/caffeine/wiki/Benchmarks

https://gitee.com/jd-platform-opensource/hotkey

https://developer.aliyun.com/article/788271?utm_content=m_1000291945

https://b.alipay.com/page/account-manage-oc/approval/setList

Caffeine: https://github.com/ben-manes/caffeine

這裡: https://albenw.github.io/posts/df42dc84/

Benchmarks: https://github.com/ben-manes/caffeine/wiki/Benchmarks