1. 程式人生 > >六個面試題層層剖析——LongAddr原子類

六個面試題層層剖析——LongAddr原子類

併發程式設計面試題

(1)LongAddr的結構是怎樣的?

(2)當前執行緒應該訪問Cell數組裡面的哪一個Cell元素?

(3)如何初始化Cell陣列?

(4)Cell陣列如何擴容?

(5)執行緒訪問分配的Cell元素有衝突後如何處理?

(6)如何保證執行緒操作被分配的Cell元素的原子性?

(1)LongAddr的結構是怎樣的?

由LongAddr的類圖可知:LongAddr繼承自Striped64類,在Striped64內部維護了三個變數。

LongAddr的真實值=base的值+Cell陣列所有Cell元素的value值。

base是個基礎值,預設為0,cellsBusy用來實現自旋鎖,狀態只要0和1。

(6)如何保證執行緒操作被分配的Cell元素的原子性?

當建立Cell元素,擴容Cell陣列或者是初始化Cell陣列時,使用CAS操作該變數來保證同時只有一個執行緒可以進行其中之一的操作。

下面看下Cell的構造

/**
 * Padded variant of AtomicLong supporting only raw accesses plus CAS.
 *
 * JVM intrinsics note: It would be possible to use a release-only
 * form of CAS here, if it were provided.
 */
@sun.misc.Contended static final class Cell {
    volatile long value;
    Cell(long x) { value = x; }
    final boolean cas(long cmp, long val) {
        return UNSAFE.compareAndSwapLong(this, valueOffset, cmp, val);
    }

    // Unsafe mechanics
    private static final sun.misc.Unsafe UNSAFE;
    private static final long valueOffset;
    static {
        try {
            UNSAFE = sun.misc.Unsafe.getUnsafe();
            Class<?> ak = Cell.class;
            valueOffset = UNSAFE.objectFieldOffset
                (ak.getDeclaredField("value"));
        } catch (Exception e) {
            throw new Error(e);
        }
    }
}

可以看到,Cell的構造很簡單,其內部維護一個被宣告為Volatile的變數,保證了記憶體的可見性。

另外使用CAS演算法操作,保證了當前執行緒更新時被分配的Cell元素中的value值的原子性。

另外,Cell類使用 @sun.misc.Contended修飾是為了避免偽共享。

偽共享解釋:陣列的元素是Cell類,可以看到Cell類用Contended註解修飾,這裡主要是解決false sharing(偽共享的問題),不過個人認為偽共享翻譯的不是很好,或者應該是錯誤的共享,比如兩個volatile變數被分配到了同一個快取行,但是這兩個的更新在高併發下會競爭,比如執行緒A去更新變數a,執行緒B去更新變數b,但是這兩個變數被分配到了同一個快取行,因此會造成每個執行緒都去爭搶快取行的所有權,例如A獲取了所有權然後執行更新這時由於volatile的語義會造成其重新整理到主存,但是由於變數b也被快取到同一個快取行,因此就會造成cache miss,這樣就會造成極大的效能損失,因此有一些類庫的作者,例如JUC下面的、Disruptor等都利用了插入dummy 變數的方式,使得快取行被其獨佔,比如下面這種程式碼:

static final class Cell {
        volatile long p0, p1, p2, p3, p4, p5, p6;
        volatile long value;
        volatile long q0, q1, q2, q3, q4, q5, q6;
        Cell(long x) { value = x; }

        final boolean cas(long cmp, long val) {
            return UNSAFE.compareAndSwapLong(this, valueOffset, cmp, val);
        }

        // Unsafe mechanics
        private static final sun.misc.Unsafe UNSAFE;
        private static final long valueOffset;
        static {
            try {
                UNSAFE = getUnsafe();
                Class<?> ak = Cell.class;
                valueOffset = UNSAFE.objectFieldOffset
                    (ak.getDeclaredField("value"));
            } catch (Exception e) {
                throw new Error(e);
            }
        }
 }

但是這種方式畢竟不通用,例如32、64位作業系統的快取行大小不一樣,因此JAVA8中就增加了一個注@sun.misc.Contended解用於解決這個問題,由JVM去插入這些變數,具體可以參考openjdk.java.net/jeps/142 ,但是通常來說物件是不規則的分配到記憶體中的,但是陣列由於是連續的記憶體,因此可能會共享快取行,因此這裡加一個Contended註解以防cells陣列發生偽共享的情況。

(2)當前執行緒應該訪問Cell數組裡面的哪一個Cell元素?

/**
 * Adds the given value.
 *
 * @param x the value to add
 */
public void add(long x) {
    Cell[] as; long b, v; int m; Cell a;
    if ((as = cells) != null || !casBase(b = base, b + x)) { //——(1)
        boolean uncontended = true;
        if (as == null || (m = as.length - 1) < 0 || //——(2)
            (a = as[getProbe() & m]) == null || //——(3)
            !(uncontended = a.cas(v = a.value, v + x))) //——(4)
            longAccumulate(x, null, uncontended); //——(5)
    }
}

程式碼(1):判斷Cell是否為null,如果為null,則當前在基礎變數base上進行累加,類似AtomicLong的操作。

程式碼(2):如果Cells不為null或者執行緒執行程式碼(1)的CAS操作失敗了,則會去執行程式碼(2).

程式碼(2)(3):決定了當前執行緒應該訪問cells數組裡面的哪一個元素。

程式碼(4):如果當前執行緒對映的元素存在則執行程式碼(4),使用CAS操作去更新分配的Cell元素的value值。

程式碼(5):如果當前執行緒對映的元素不存在或者存在但是CAS操作失敗則執行程式碼(5)。

總結:當前執行緒應該訪問那個Cell元素是通過getProbe() & m進行計算的。

(3)如何初始化Cell陣列?

/**
 * Handles cases of updates involving initialization, resizing,
 * creating new Cells, and/or contention. See above for
 * explanation. This method suffers the usual non-modularity
 * problems of optimistic retry code, relying on rechecked sets of
 * reads.
 *
 * @param x the value
 * @param fn the update function, or null for add (this convention
 * avoids the need for an extra field or function in LongAdder).
 * @param wasUncontended false if CAS failed before call
 */
final void longAccumulate(long x, LongBinaryOperator fn,
                          boolean wasUncontended) {
  ....省略無關程式碼....

//*******************************初始化Cell陣列******************************************//
        else if (cellsBusy == 0 && cells == as && casCellsBusy()) {
            boolean init = false;
            try {                           // Initialize table
                if (cells == as) {
                    //——(1)
                    Cell[] rs = new Cell[2];
                    //——(2)
                    rs[h & 1] = new Cell(x);
                    cells = rs;
                    init = true;
                }
            } finally {
                //——(3)
                cellsBusy = 0;
            }
            if (init)
                break;
        }
        else if (casBase(v = base, ((fn == null) ? v + x :
                                    fn.applyAsLong(v, x))))
            break;                          // Fall back on using base
    }
}

程式碼(1):初始化Cells陣列元素個數為2,

程式碼(2):然後使用 h&1 計算當前執行緒應該訪問cell陣列的那一個位置。

程式碼(3):重置了cellsBusy標記。這裡沒用使用CAS演算法,因為cellsBusy是volatile型別的,保證了變數記憶體的可見性。

(4)Cell陣列如何擴容?

final void longAccumulate(long x, LongBinaryOperator fn,
                          boolean wasUncontended) {
    int h;
    if ((h = getProbe()) == 0) {
        ThreadLocalRandom.current(); // force initialization
        h = getProbe();
        wasUncontended = true;
    }
    boolean collide = false;                // True if last slot nonempty
    for (;;) {
       ......
            else if (!wasUncontended)       // CAS already known to fail
                wasUncontended = true;      // Continue after rehash
            else if (a.cas(v = a.value, ((fn == null) ? v + x :
                                         fn.applyAsLong(v, x))))
                break;
            else if (n >= NCPU || cells != as) //——(1)
                collide = false;            // At max size or stale
            else if (!collide) //——(2)
                collide = true;
            else if (cellsBusy == 0 && casCellsBusy()) { //——(3)
                try {
                    if (cells == as) {      // Expand table unless stale
                        Cell[] rs = new Cell[n << 1]; //——(4)
                        for (int i = 0; i < n; ++i)
                            rs[i] = as[i];
                        cells = rs;
                    }
                } finally {
                    cellsBusy = 0; //——(5)
                }
                collide = false; //——(6)
                continue;                   // Retry with expanded table
            }
            h = advanceProbe(h); //——(7)
        }
       .....
}

程式碼(1)(2):當前cells元素個數小於當前機器的CPU個數並且當前多個執行緒訪問cells中的同一個元素,從而導致衝突使其中一個執行緒CAS失敗時才會進行程式碼(3)的擴容操作。

程式碼(3)中的擴容操作也是通過CAS設定cellsBusy為1,然後才能進行擴容。

解釋下cellsBusy:他是個標示,為0說明cells陣列沒用被初始化或者擴容,也沒有在新建Cell元素,為1則說明cells陣列在被初始化或者擴容,或者當前在建立新的Cell元素。

程式碼(4):假設CAS成功,則執行程式碼(4),將容量擴充為之前的兩倍,並複製Cell元素到新擴容的陣列。

(5)執行緒訪問分配的Cell元素有衝突後如何處理?

final void longAccumulate(long x, LongBinaryOperator fn,
                          boolean wasUncontended) {
   .......
    boolean collide = false;                // True if last slot nonempty
    for (;;) {
        Cell[] as; Cell a; int n; long v;
        if ((as = cells) != null && (n = as.length) > 0) { //——(1)
            if ((a = as[(n - 1) & h]) == null) { //——(2)
                if (cellsBusy == 0) {       // Try to attach new Cell
                    Cell r = new Cell(x);   // Optimistically create
                    if (cellsBusy == 0 && casCellsBusy()) {
                        boolean created = false;
                        try {               // Recheck under lock
                            Cell[] rs; int m, j;
                            if ((rs = cells) != null &&
                                (m = rs.length) > 0 &&
                                rs[j = (m - 1) & h] == null) {
                                rs[j] = r;
                                created = true;
                            }
                        } finally {
                            cellsBusy = 0;
                        }
                        if (created)
                            break;
                        continue;           // Slot is now non-empty
                    }
                }
                collide = false;
            }
          .....
            h = advanceProbe(h); //——(3)
        }
       .....
}

程式碼(3)實現

/**
 * Pseudo-randomly advances and records the given probe value for the
 * given thread.
 * Duplicated from ThreadLocalRandom because of packaging restrictions.
 */
static final int advanceProbe(int probe) {
    probe ^= probe << 13;   // xorshift
    probe ^= probe >>> 17;
    probe ^= probe << 5;
    UNSAFE.putInt(Thread.currentThread(), PROBE, probe);
    return probe;
}

程式碼(1)(2):當前執行緒呼叫add方法並根據當前執行緒的隨機數threadLocalRandomProbe和cells元素個數計算要訪問的Cell元素下標,然後如果發現對應下標的值為null,則新增一個Cell元素到cells陣列,並且將其新增到cells陣列之前要競爭設定cellsBusy為1.

程式碼(3):對CAS失敗的執行緒重新計算當前執行緒的隨機值threadLocalReadomProbe,以減少下次訪問cells元素時的衝突機會。

附錄

java.util.concurrency.atomic.LongAdder是Java8新增的一個類,提供了原子累計值的方法。根據文件的描述其效能要優於AtomicLong,下圖是一個簡單的測試對比(平臺:MBP):
image
這裡測試時基於JDK1.8進行的,AtomicLong 從Java8開始針對x86平臺進行了優化,使用XADD替換了CAS操作,我們知道JUC下面提供的原子類都是基於Unsafe類實現的,並由Unsafe來提供CAS的能力。CAS (compare-and-swap)本質上是由現代CPU在硬體級實現的原子指令,允許進行無阻塞,多執行緒的資料操作同時兼顧了安全性以及效率。大部分情況下,CAS都能夠提供不錯的效能,但是在高競爭的情況下開銷可能會成倍增長

總結

本章分析了JDK8新增的LongAddr原子性操作類,該類通過cells陣列分擔了高併發下多執行緒同時對一個原子變數進行更新時的競爭量,讓多個執行緒可以同時對cells數組裡面的元素進行並行的更新操作。

另外,陣列元素Cell使用@sun.misc.Contended註解進行修飾,這避免了cells陣列內多個原子變數被放入同一個快取行,也就避免了偽共享,提高了效能。

總的來說,LongAdder從效能上來說要遠遠好於AtomicLong,一般情況下是可以直接替代AtomicLong使用的,Netty也通過一個介面封裝了這兩個類,在Java8下直接採用LongAdder,但是AtomicLong的一系列方法不僅僅可以自增,還可以獲取更新後的值,如果是例如獲取一個全域性唯一的ID還是採用AtomicLong會方便一點。

參考書籍

Java併發程式設計之美

參考連結

  1. https://blogs.oracle.com/dave/atomic-fetch-and-add-vs-compare-and-swap
  2. https://en.wikipedia.org/wiki/Compare-and-swap
  3. http://ashkrit.blogspot.com/2014/02/atomicinteger-java-7-vs-java-8.html
  4. https://dzone.com/articles/adventures-atomiclong