1. 程式人生 > 程式設計 >Java併發工具類LongAdder原理例項解析

Java併發工具類LongAdder原理例項解析

LongAdder實現原理圖

Java併發工具類LongAdder原理例項解析

Java併發工具類LongAdder原理例項解析

  高併發下N多執行緒同時去操作一個變數會造成大量執行緒CAS失敗,然後處於自旋狀態,導致嚴重浪費CPU資源,降低了併發性。既然AtomicLong效能問題是由於過多執行緒同時去競爭同一個變數的更新而降低的,那麼如果把一個變數分解為多個變數,讓同樣多的執行緒去競爭多個資源。

  LongAdder則是內部維護一個Cells陣列,每個Cell裡面有一個初始值為0的long型變數,在同等併發量的情況下,爭奪單個變數的執行緒會減少,這是變相的減少了爭奪共享資源的併發量,另外多個執行緒在爭奪同一個原子變數時候,如果失敗並不是自旋CAS重試,而是嘗試獲取其他原子變數的鎖,最後當獲取當前值時候是把所有變數的值累加後再加上base的值返回的。

  LongAdder維護了要給延遲初始化的原子性更新陣列和一個基值變數base陣列的大小保持是2的N次方大小,陣列表的下標使用每個執行緒的hashcode值的掩碼錶示,數組裡面的變數實體是Cell型別。

  Cell 型別是Atomic的一個改進,用來減少快取的爭用,對於大多數原子操作位元組填充是浪費的,因為原子操作都是無規律的分散在記憶體中進行的,多個原子性操作彼此之間是沒有接觸的,但是原子性陣列元素彼此相鄰存放將能經常共享快取行,也就是偽共享。所以這在效能上是一個提升。

  另外由於Cells佔用記憶體是相對比較大的,所以一開始並不建立,而是在需要時候再建立,也就是惰性載入,當一開始沒有空間時候,所有的更新都是操作base變數。

  java.util.concurrency.atomic.LongAdder是Java8新增的一個類,提供了原子累計值的方法。根據文件的描述其效能要優於AtomicLong

  這裡測試時基於JDK1.8進行的,AtomicLong 從Java8開始針對x86平臺進行了優化,使用XADD替換了CAS操作,我們知道JUC下面提供的原子類都是基於Unsafe類實現的,並由Unsafe來提供CAS的能力。CAS (compare-and-swap)本質上是由現代CPU在硬體級實現的原子指令,允許進行無阻塞,多執行緒的資料操作同時兼顧了安全性以及效率。大部分情況下,CAS都能夠提供不錯的效能,但是在高競爭的情況下開銷可能會成倍增長,具體的研究可以參考這篇文章,我們直接看下程式碼:

public class AtomicLong {
public final long incrementAndGet() {
    return unsafe.getAndAddLong(this,valueOffset,1L) + 1L;
  }
}

public final class Unsafe {
public final long getAndAddLong(Object var1,long var2,long var4) {
    long var6;
    do {
      var6 = this.getLongVolatile(var1,var2);
    } while(!this.compareAndSwapLong(var1,var2,var6,var6 + var4));
    return var6;
  }
}

  getAndAddLong方法會以volatile的語義去讀需要自增的域的最新值,然後通過CAS去嘗試更新,正常情況下會直接成功後返回,但是在高併發下可能會同時有很多執行緒同時嘗試這個過程,也就是說執行緒A讀到的最新值可能實際已經過期了,因此需要在while迴圈中不斷的重試,造成很多不必要的開銷,而xadd的相對來說會更高效一點,偽碼如下,最重要的是下面這段程式碼是原子的,也就是說其他執行緒不能打斷它的執行或者看到中間值,這條指令是在硬體級直接支援的:

function FetchAndAdd(address location,int inc) {
  int value := *location
  *location := value + inc
  return value
}

  而LongAdder的效能比上面那種還要好很多,於是就研究了一下。首先它有一個基礎的值base,在發生競爭的情況下,會有一個Cell陣列用於將不同執行緒的操作離散到不同的節點上去(會根據需要擴容,最大為CPU核數),sum()會將所有Cell陣列中的value和base累加作為返回值。核心的思想就是將AtomicLong一個value的更新壓力分散到多個value中去,從而降低更新熱點。

Java併發工具類LongAdder原理例項解析

public class LongAdder extends Striped64 implements Serializable {
//...
}

  LongAdder繼承自Striped64,Striped64內部維護了一個懶載入的陣列以及一個額外的base例項域,陣列的大小是2的N次方,使用每個執行緒Thread內部的雜湊值訪問。

abstract class Striped64 extends Number {
/** Number of CPUS,to place bound on table size */
  static final int NCPU = Runtime.getRuntime().availableProcessors();

  /**
   * Table of cells. When non-null,size is a power of 2.
   */
  transient volatile Cell[] cells;
   
@sun.misc.Contended static final class Cell {
    volatile long value;
    Cell(long x) { value = x; }
    final boolean cas(long cmp,long val) {
      return UNSAFE.compareAndSwapLong(this,cmp,val);
    }

    // Unsafe mechanics
    private static final sun.misc.Unsafe UNSAFE;
    private static final long valueOffset;
    static {
      try {
        UNSAFE = sun.misc.Unsafe.getUnsafe();
        Class<?> ak = Cell.class;
        valueOffset = UNSAFE.objectFieldOffset
          (ak.getDeclaredField("value"));
      } catch (Exception e) {
        throw new Error(e);
      }
    }
  }

}

  陣列的元素是Cell類,可以看到Cell類用Contended註解修飾,這裡主要是解決false sharing(偽共享的問題),不過個人認為偽共享翻譯的不是很好,或者應該是錯誤的共享,比如兩個volatile變數被分配到了同一個快取行,但是這兩個的更新在高併發下會競爭,比如執行緒A去更新變數a,執行緒B去更新變數b,但是這兩個變數被分配到了同一個快取行,因此會造成每個執行緒都去爭搶快取行的所有權,例如A獲取了所有權然後執行更新這時由於volatile的語義會造成其重新整理到主存,但是由於變數b也被快取到同一個快取行,因此就會造成cache miss,這樣就會造成極大的效能損失,因此有一些類庫的作者,例如JUC下面的、Disruptor等都利用了插入dummy 變數的方式,使得快取行被其獨佔,比如下面這種程式碼:

static final class Cell {
    volatile long p0,p1,p2,p3,p4,p5,p6;
    volatile long value;
    volatile long q0,q1,q2,q3,q4,q5,q6;
    Cell(long x) { value = x; }

    final boolean cas(long cmp,val);
    }

    // Unsafe mechanics
    private static final sun.misc.Unsafe UNSAFE;
    private static final long valueOffset;
    static {
      try {
        UNSAFE = getUnsafe();
        Class<?> ak = Cell.class;
        valueOffset = UNSAFE.objectFieldOffset
          (ak.getDeclaredField("value"));
      } catch (Exception e) {
        throw new Error(e);
      }
    }
 }

  但是這種方式畢竟不通用,例如32、64位作業系統的快取行大小不一樣,因此JAVA8中就增加了一個注@sun.misc.Contended解用於解決這個問題,由JVM去插入這些變數,具體可以參考openjdk.java.net/jeps/142 ,但是通常來說物件是不規則的分配到記憶體中的,但是陣列由於是連續的記憶體,因此可能會共享快取行,因此這裡加一個Contended註解以防cells陣列發生偽共享的情況。

/**
 * 底競爭下直接更新base,類似AtomicLong
 * 高併發下,會將每個執行緒的操作hash到不同的
 * cells陣列中,從而將AtomicLong中更新
 * 一個value的行為優化之後,分散到多個value中
 * 從而降低更新熱點,而需要得到當前值的時候,直接
 * 將所有cell中的value與base相加即可,但是跟
 * AtomicLong(compare and change -> xadd)的CAS不同,
 * incrementAndGet操作及其變種
 * 可以返回更新後的值,而LongAdder返回的是void
 */
public class LongAdder {
  public void add(long x) {
    Cell[] as; long b,v; int m; Cell a;
    /**
     * 如果是第一次執行,則直接case操作base
     */
    if ((as = cells) != null || !casBase(b = base,b + x)) {
      boolean uncontended = true;
      /**
       * as陣列為空(null或者size為0)
       * 或者當前執行緒取模as陣列大小為空
       * 或者cas更新Cell失敗
       */
      if (as == null || (m = as.length - 1) < 0 ||
        (a = as[getProbe() & m]) == null ||
        !(uncontended = a.cas(v = a.value,v + x)))
        longAccumulate(x,null,uncontended);
    }
  }

  public long sum() {
    //通過累加base與cells陣列中的value從而獲得sum
    Cell[] as = cells; Cell a;
    long sum = base;
    if (as != null) {
      for (int i = 0; i < as.length; ++i) {
        if ((a = as[i]) != null)
          sum += a.value;
      }
    }
    return sum;
  }
}

/**
 * openjdk.java.net/jeps/142
 */
@sun.misc.Contended static final class Cell {
  volatile long value;
  Cell(long x) { value = x; }
  final boolean cas(long cmp,long val) {
    return UNSAFE.compareAndSwapLong(this,val);
  }

  // Unsafe mechanics
  private static final sun.misc.Unsafe UNSAFE;
  private static final long valueOffset;
  static {
    try {
      UNSAFE = sun.misc.Unsafe.getUnsafe();
      Class<?> ak = Cell.class;
      valueOffset = UNSAFE.objectFieldOffset
        (ak.getDeclaredField("value"));
    } catch (Exception e) {
      throw new Error(e);
    }
  }
}

abstract class Striped64 extends Number {

  final void longAccumulate(long x,LongBinaryOperator fn,boolean wasUncontended) {
    int h;
    if ((h = getProbe()) == 0) {
      /**
       * 若getProbe為0,說明需要初始化
       */
      ThreadLocalRandom.current(); // force initialization
      h = getProbe();
      wasUncontended = true;
    }
    boolean collide = false;        // True if last slot nonempty
    /**
     * 失敗重試
     */
    for (;;) {
      Cell[] as; Cell a; int n; long v;
      if ((as = cells) != null && (n = as.length) > 0) {
        /**
         * 若as陣列已經初始化,(n-1) & h 即為取模操作,相對 % 效率要更高
         */
        if ((a = as[(n - 1) & h]) == null) {
          if (cellsBusy == 0) {    // Try to attach new Cell
            Cell r = new Cell(x);  // Optimistically create
            if (cellsBusy == 0 && casCellsBusy()) {//這裡casCellsBusy的作用其實就是一個spin lock
              //可能會有多個執行緒執行了`Cell r = new Cell(x);`,//因此這裡進行cas操作,避免執行緒安全的問題,同時前面在判斷一次
              //避免正在初始化的時其他執行緒再進行額外的cas操作
              boolean created = false;
              try {        // Recheck under lock
                Cell[] rs; int m,j;
                //重新檢查一下是否已經建立成功了
                if ((rs = cells) != null &&
                  (m = rs.length) > 0 &&
                  rs[j = (m - 1) & h] == null) {
                  rs[j] = r;
                  created = true;
                }
              } finally {
                cellsBusy = 0;
              }
              if (created)
                break;
              continue;      // Slot 現在是非空了,continue到下次迴圈重試
            }
          }
          collide = false;
        }
        else if (!wasUncontended)    // CAS already known to fail
          wasUncontended = true;   // Continue after rehash
        else if (a.cas(v = a.value,((fn == null) ? v + x :
                       fn.applyAsLong(v,x))))
          break;//若cas更新成功則跳出迴圈,否則繼續重試
        else if (n >= NCPU || cells != as) // 最大隻能擴容到CPU數目, 或者是已經擴容成功,這裡只有的本地引用as已經過期了
          collide = false;      // At max size or stale
        else if (!collide)
          collide = true;
        else if (cellsBusy == 0 && casCellsBusy()) {
          try {
            if (cells == as) {   // 擴容
              Cell[] rs = new Cell[n << 1];
              for (int i = 0; i < n; ++i)
                rs[i] = as[i];
              cells = rs;
            }
          } finally {
            cellsBusy = 0;
          }
          collide = false;
          continue;          // Retry with expanded table
        }
        //重新計算hash(異或)從而嘗試找到下一個空的slot
        h = advanceProbe(h);
      }
      else if (cellsBusy == 0 && cells == as && casCellsBusy()) {
        boolean init = false;
        try {              // Initialize table
          if (cells == as) {
            /**
             * 預設size為2
             */
            Cell[] rs = new Cell[2];
            rs[h & 1] = new Cell(x);
            cells = rs;
            init = true;
          }
        } finally {
          cellsBusy = 0;
        }
        if (init)
          break;
      }
      else if (casBase(v = base,((fn == null) ? v + x : // 若已經有另一個執行緒在初始化,那麼嘗試直接更新base
                    fn.applyAsLong(v,x))))
        break;             // Fall back on using base
    }
  }

  final boolean casCellsBusy() {
    return UNSAFE.compareAndSwapInt(this,CELLSBUSY,1);
  }

  static final int getProbe() {
    /**
     * 通過Unsafe獲取Thread中threadLocalRandomProbe的值
     */
    return UNSAFE.getInt(Thread.currentThread(),PROBE);
  }

    // Unsafe mechanics
    private static final sun.misc.Unsafe UNSAFE;
    private static final long BASE;
    private static final long CELLSBUSY;
    private static final long PROBE;
    static {
      try {
        UNSAFE = sun.misc.Unsafe.getUnsafe();
        Class<?> sk = Striped64.class;
        BASE = UNSAFE.objectFieldOffset
          (sk.getDeclaredField("base"));
        CELLSBUSY = UNSAFE.objectFieldOffset
          (sk.getDeclaredField("cellsBusy"));
        Class<?> tk = Thread.class;
        //返回Field在記憶體中相對於物件記憶體地址的偏移量
        PROBE = UNSAFE.objectFieldOffset
          (tk.getDeclaredField("threadLocalRandomProbe"));
      } catch (Exception e) {
        throw new Error(e);
      }
    }
}

  由於Cell相對來說比較佔記憶體,因此這裡採用懶載入的方式,在無競爭的情況下直接更新base域,在第一次發生競爭的時候(CAS失敗)就會建立一個大小為2的cells陣列,每次擴容都是加倍,只到達到CPU核數。同時我們知道擴容陣列等行為需要只能有一個執行緒同時執行,因此需要一個鎖,這裡通過CAS更新cellsBusy來實現一個簡單的spin lock。

陣列訪問索引是通過Thread裡的threadLocalRandomProbe域取模實現的,這個域是ThreadLocalRandom更新的,cells的陣列大小被限制為CPU的核數,因為即使有超過核數個執行緒去更新,但是每個執行緒也只會和一個CPU繫結,更新的時候頂多會有cpu核數個執行緒,因此我們只需要通過hash將不同執行緒的更新行為離散到不同的slot即可。

我們知道執行緒、執行緒池會被關閉或銷燬,這個時候可能這個執行緒之前佔用的slot就會變成沒人用的,但我們也不能清除掉,因為一般web應用都是長時間執行的,執行緒通常也會動態建立、銷燬,很可能一段時間後又會被其他執行緒佔用,而對於短時間執行的,例如單元測試,清除掉有啥意義呢?

總結

  總的來說,LongAdder從效能上來說要遠遠好於AtomicLong,一般情況下是可以直接替代AtomicLong使用的,Netty也通過一個介面封裝了這兩個類,在Java8下直接採用LongAdder,但是AtomicLong的一系列方法不僅僅可以自增,還可以獲取更新後的值,如果是例如獲取一個全域性唯一的ID還是採用AtomicLong會方便一點。

以上就是本文的全部內容,希望對大家的學習有所幫助,也希望大家多多支援我們。