1. 程式人生 > >談談ConcurrentHashMap1.7和1.8的不同實現

談談ConcurrentHashMap1.7和1.8的不同實現

ConcurrentHashMap

在多執行緒環境下,使用HashMap進行put操作時存在丟失資料的情況,為了避免這種bug的隱患,強烈建議使用ConcurrentHashMap代替HashMap,為了對ConcurrentHashMap有更深入的瞭解,本文將對ConcurrentHashMap1.7和1.8的不同實現進行分析。

1.7實現

資料結構

jdk1.7中採用Segment + HashEntry的方式進行實現,結構如下:

ConcurrentHashMap初始化時,計算出Segment陣列的大小ssize和每個SegmentHashEntry陣列的大小cap,並初始化Segment

陣列的第一個元素;其中ssize大小為2的冪次方,預設為16,cap大小也是2的冪次方,最小值為2,最終結果根據根據初始化容量initialCapacity進行計算,計算過程如下:

1

2

3

4

5

if (c * ssize < initialCapacity)

++c;

int cap = MIN_SEGMENT_TABLE_CAPACITY;

while (cap < c)

cap <<= 1;

其中Segment在實現上繼承了ReentrantLock,這樣就自帶了鎖的功能。

put實現

當執行put方法插入資料時,根據key的hash值,在Segment陣列中找到相應的位置,如果相應位置的Segment還未初始化,則通過CAS進行賦值,接著執行Segment物件的put方法通過加鎖機制插入資料,實現如下:

場景:執行緒A和執行緒B同時執行相同Segment物件的put方法

1、執行緒A執行tryLock()方法成功獲取鎖,則把HashEntry物件插入到相應的位置; 2、執行緒B獲取鎖失敗,則執行scanAndLockForPut()方法,在scanAndLockForPut方法中,會通過重複執行tryLock()方法嘗試獲取鎖,在多處理器環境下,重複次數為64,單處理器重複次數為1,當執行tryLock()

方法的次數超過上限時,則執行lock()方法掛起執行緒B; 3、當執行緒A執行完插入操作時,會通過unlock()方法釋放鎖,接著喚醒執行緒B繼續執行;

size實現

因為ConcurrentHashMap是可以併發插入資料的,所以在準確計算元素時存在一定的難度,一般的思路是統計每個Segment物件中的元素個數,然後進行累加,但是這種方式計算出來的結果並不一樣的準確的,因為在計算後面幾個Segment的元素個數時,已經計算過的Segment同時可能有資料的插入或則刪除,在1.7的實現中,採用瞭如下方式:

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

try {

for (;;) {

if (retries++ == RETRIES_BEFORE_LOCK) {

for (int j = 0; j < segments.length; ++j)

ensureSegment(j).lock(); // force creation

}

sum = 0L;

size = 0;

overflow = false;

for (int j = 0; j < segments.length; ++j) {

Segment<K,V> seg = segmentAt(segments, j);

if (seg != null) {

sum += seg.modCount;

int c = seg.count;

if (c < 0 || (size += c) < 0)

overflow = true;

}

}

if (sum == last)

break;

last = sum;

}

} finally {

if (retries > RETRIES_BEFORE_LOCK) {

for (int j = 0; j < segments.length; ++j)

segmentAt(segments, j).unlock();

}

}

先採用不加鎖的方式,連續計算元素的個數,最多計算3次: 1、如果前後兩次計算結果相同,則說明計算出來的元素個數是準確的; 2、如果前後兩次計算結果都不同,則給每個Segment進行加鎖,再計算一次元素的個數;

1.8實現

資料結構

1.8中放棄了Segment臃腫的設計,取而代之的是採用Node + CAS + Synchronized來保證併發安全進行實現,結構如下:

只有在執行第一次put方法時才會呼叫initTable()初始化Node陣列,實現如下:

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

private final Node<K,V>[] initTable() {

Node<K,V>[] tab; int sc;

while ((tab = table) == null || tab.length == 0) {

if ((sc = sizeCtl) < 0)

Thread.yield(); // lost initialization race; just spin

else if (U.compareAndSwapInt(this, SIZECTL, sc, -1)) {

try {

if ((tab = table) == null || tab.length == 0) {

int n = (sc > 0) ? sc : DEFAULT_CAPACITY;

@SuppressWarnings("unchecked")

Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n];

table = tab = nt;

sc = n - (n >>> 2);

}

} finally {

sizeCtl = sc;

}

break;

}

}

return tab;

}

put實現

當執行put方法插入資料時,根據key的hash值,在Node陣列中找到相應的位置,實現如下:

1、如果相應位置的Node還未初始化,則通過CAS插入相應的資料;

1

2

3

4

else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) {

if (casTabAt(tab, i, null, new Node<K,V>(hash, key, value, null)))

break;                   // no lock when adding to empty bin

}

2、如果相應位置的Node不為空,且當前該節點不處於移動狀態,則對該節點加synchronized鎖,如果該節點的hash不小於0,則遍歷連結串列更新節點或插入新節點;

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

if (fh >= 0) {

binCount = 1;

for (Node<K,V> e = f;; ++binCount) {

K ek;

if (e.hash == hash &&

((ek = e.key) == key ||

(ek != null && key.equals(ek)))) {

oldVal = e.val;

if (!onlyIfAbsent)

e.val = value;

break;

}

Node<K,V> pred = e;

if ((e = e.next) == null) {

pred.next = new Node<K,V>(hash, key, value, null);

break;

}

}

}

3、如果該節點是TreeBin型別的節點,說明是紅黑樹結構,則通過putTreeVal方法往紅黑樹中插入節點;

1

2

3

4

5

6

7

8

9

else if (f instanceof TreeBin) {

Node<K,V> p;

binCount = 2;

if ((p = ((TreeBin<K,V>)f).putTreeVal(hash, key, value)) != null) {

oldVal = p.val;

if (!onlyIfAbsent)

p.val = value;

}

}

4、如果binCount不為0,說明put操作對資料產生了影響,如果當前連結串列的個數達到8個,則通過treeifyBin方法轉化為紅黑樹,如果oldVal不為空,說明是一次更新操作,沒有對元素個數產生影響,則直接返回舊值;

1

2

3

4

5

6

7

if (binCount != 0) {

if (binCount >= TREEIFY_THRESHOLD)

treeifyBin(tab, i);

if (oldVal != null)

return oldVal;

break;

}

5、如果插入的是一個新節點,則執行addCount()方法嘗試更新元素個數baseCount

size實現

1.8中使用一個volatile型別的變數baseCount記錄元素的個數,當插入新資料或則刪除資料時,會通過addCount()方法更新baseCount,實現如下:

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

if ((as = counterCells) != null ||

!U.compareAndSwapLong(this, BASECOUNT, b = baseCount, s = b + x)) {

CounterCell a; long v; int m;

boolean uncontended = true;

if (as == null || (m = as.length - 1) < 0 ||

(a = as[ThreadLocalRandom.getProbe() & m]) == null ||

!(uncontended =

U.compareAndSwapLong(a, CELLVALUE, v = a.value, v + x))) {

fullAddCount(x, uncontended);

return;

}

if (check <= 1)

return;

s = sumCount();

}

1、初始化時counterCells為空,在併發量很高時,如果存在兩個執行緒同時執行CAS修改baseCount值,則失敗的執行緒會繼續執行方法體中的邏輯,使用CounterCell記錄元素個數的變化;

2、如果CounterCell陣列counterCells為空,呼叫fullAddCount()方法進行初始化,並插入對應的記錄數,通過CAS設定cellsBusy欄位,只有設定成功的執行緒才能初始化CounterCell陣列,實現如下:

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

else if (cellsBusy == 0 && counterCells == as &&

U.compareAndSwapInt(this, CELLSBUSY, 0, 1)) {

boolean init = false;

try {                           // Initialize table

if (counterCells == as) {

CounterCell[] rs = new CounterCell[2];

rs[h & 1] = new CounterCell(x);

counterCells = rs;

init = true;

}

} finally {

cellsBusy = 0;

}

if (init)

break;

}

3、如果通過CAS設定cellsBusy欄位失敗的話,則繼續嘗試通過CAS修改baseCount欄位,如果修改baseCount欄位成功的話,就退出迴圈,否則繼續迴圈插入CounterCell物件;

1

2

else if (U.compareAndSwapLong(this, BASECOUNT, v = baseCount, v + x))

break;

所以在1.8中的size實現比1.7簡單多,因為元素個數儲存baseCount中,部分元素的變化個數儲存在CounterCell陣列中,實現如下:

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

public int size() {

long n = sumCount();

return ((n < 0L) ? 0 :

(n > (long)Integer.MAX_VALUE) ? Integer.MAX_VALUE :

(int)n);

}

final long sumCount() {

CounterCell[] as = counterCells; CounterCell a;

long sum = baseCount;

if (as != null) {

for (int i = 0; i < as.length; ++i) {

if ((a = as[i]) != null)

sum += a.value;

}

}

return sum;

}

通過累加baseCountCounterCell陣列中的數量,即可得到元素的總個數;