併發程式設計-ConcurrentHashMap(一)(剖析CHM原始碼、設計思想、資料結構)
併發程式設計-ConcurrentHashMap(一)(剖析CHM原始碼、設計思想、資料結構)
本篇來聊聊1.8的ConcurrentHashMap(CHS),關於它的一些設計思想(高低位擴容、分段鎖、紅黑樹、連結串列 so on),資料結構,和原始碼試實現行剖析,本篇會講到前面的一部分程式碼分析,包括(延遲初始化、閾值判斷擴容、以及高低位擴容)
為什麼使用CHS
【HashMap】是非執行緒安全的,有時候因為沒有給使用【HashMap】的程式碼段加同步鎖,在1.7的時候居然導致了死鎖,。那我們就使用【Hashtable】,他是可以保證執行緒安全,但是他只是粗暴的加上了synchronized關鍵字,這樣鎖的粒度太粗
(效能很低) 。所以當多執行緒場景下CHS就應運而生。
CHS的使用
平常的新增,刪除等一些操作這裡我們不再繼續瞭解,咱們瞭解一下在1.8後加的一些新的方法【computeIfAbsent】【computeIfPresent】【computeIfPresent】【compute】【merge】我這裡分別寫了個例子,所以不再贅述
CHS儲存和實現
總體來說,他是基於一個數組進行資料的儲存,陣列中的每個元素都叫做一個節點,並且分為兩種方式,
【連結串列】:是用來解決雜湊衝突的,我們在說threadlocal的時候說到過,用的是線性探索進行解決的
【紅黑樹】:用來解決連結串列過於長,帶來的時間複雜度增加的問題
,紅黑樹使得時間複雜度從【O(n)->O(logn)】!!!node長度超過64的時候,連結串列長度超過8的時候,則進行紅黑樹的轉換,當做擴容的時候,紅黑樹會轉換成節點,因為當擴容的時候,資料會進行拉伸,伴隨著,資料結構無法達到紅黑樹的條件,從而轉變
總體視角:
- 當我們put一個數據的時候,首先會用你put的key的雜湊值計算一個數組下標
- 如果我們put key的雜湊數值一樣的話,則加入此節點的下面,就變成了一個單項列表。
CHS原始碼分析
我們發現,在new一個chs的時候沒並沒有建立一個新的物件,建立物件和賦值的操作都在put方法中實現的,這就是【延遲初始化】
finalV putVal(K key, V value, boolean onlyIfAbsent) { if (key == null || value == null) throw new NullPointerException(); //使用你傳進來的key去計算一個hash值,後面要用這個計算下標 int hash = spread(key.hashCode()); int binCount = 0; //這裡是個自旋,因為是多執行緒裡面肯定牽扯到cas的操作,那cas可能會失敗,失敗之後肯定要重新把這個數值傳進來,所以使用自旋 for (Node<K,V>[] tab = table;;) { Node<K,V> f; int n, i, fh; //如果table為空則去初始化一個Node<K,V>[] table 這是儲存的容器; if (tab == null || (n = tab.length) == 0) tab = initTable(); // (n - 1) & hash這裡用換算出來的hash值去計算一個下標 else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) { //如果當前的算出來的下標處沒有資料則建立一個node並且儲存資料,這裡還是通過cas保障資料安全 if (casTabAt(tab, i, null, new Node<K,V>(hash, key, value, null))) break; } else if ((fh = f.hash) == MOVED) tab = helpTransfer(tab, f); //當前位置如果儲存資料,則走這裡的邏輯 else { V oldVal = null; //鎖住當前的node節點避免執行緒安全問題,這裡的鎖的粒度很細,因為只是鎖住當前的node,意味著其他的節點,都可以在這個時候同時插入資料,這樣就提升了效能 synchronized (f) { if (tabAt(tab, i) == f) { //這裡是針對連結串列的操作 if (fh >= 0) { binCount = 1; //f代表的是當前node的頭結點,拿到頭節點開始向下遍歷,binCount實際上是在統計連結串列的長度 for (Node<K,V> e = f;; ++binCount) { K ek; //判斷是否存在相同的key,如果存在則進行覆蓋(相同的key後者會覆蓋前者) if (e.hash == hash && ((ek = e.key) == key || (ek != null && key.equals(ek)))) { oldVal = e.val; if (!onlyIfAbsent) e.val = value; break; } //如果不存在則把當前的key和value新增到連結串列中,這裡使用的是尾插法( = e.next),直接新增到尾部 Node<K,V> pred = e; if ((e = e.next) == null) { pred.next = new Node<K,V>(hash, key, value, null); break; } } } //這裡是針對紅黑樹的操作 else if (f instanceof TreeBin) { Node<K,V> p; binCount = 2; if ((p = ((TreeBin<K,V>)f).putTreeVal(hash, key, value)) != null) { oldVal = p.val; if (!onlyIfAbsent) p.val = value; } } } } if (binCount != 0) { //如果連結串列長度大於等於8,則會根據閾值判斷是轉化為紅黑樹還是擴容 if (binCount >= TREEIFY_THRESHOLD) treeifyBin(tab, i); if (oldVal != null) return oldVal; break; } } } addCount(1L, binCount); return null; }initTable(這是進行容器初始化的操作,因為是多執行緒情況下,所以要加鎖,這是使用CAS充當了鎖)這裡的【sizeCtl】等於是一個狀態機,去標識當前陣列擴容的狀態
private final Node<K,V>[] initTable() { Node<K,V>[] tab; int sc; //只要容器沒有初始化就不斷進行初始化 while ((tab = table) == null || tab.length == 0) { if ((sc = sizeCtl) < 0) Thread.yield(); //因為是多執行緒,所以這裡使用cas保證原子性 else if (U.compareAndSwapInt(this, SIZECTL, sc, -1)) { try { if ((tab = table) == null || tab.length == 0) { //這裡的預設長度是16 int n = (sc > 0) ? sc : DEFAULT_CAPACITY; @SuppressWarnings("unchecked") Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n]; table = tab = nt; //這裡保留擴容的閾值(當陣列長度到達擴容的閾值的時候,就進行擴容) sc = n - (n >>> 2); } } finally { sizeCtl = sc; } break; } } return tab; }treeifyBin(這裡根據閾值判斷,是轉換紅黑樹,還是擴容,前面說了陣列長度到達64的時候才會轉換為紅黑樹)
private final void treeifyBin(Node<K,V>[] tab, int index) { Node<K,V> b; int n, sc; if (tab != null) { //如果table長度小於64則進行擴容,否則進行紅黑樹的轉換 if ((n = tab.length) < MIN_TREEIFY_CAPACITY) //進行擴容 tryPresize(n << 1); else if ((b = tabAt(tab, index)) != null && b.hash >= 0) { //進行紅黑樹的轉換 synchronized (b) { if (tabAt(tab, index) == b) { TreeNode<K,V> hd = null, tl = null; for (Node<K,V> e = b; e != null; e = e.next) { TreeNode<K,V> p = new TreeNode<K,V>(e.hash, e.key, e.val, null, null); if ((p.prev = tl) == null) hd = p; else tl.next = p; tl = p; } setTabAt(tab, index, new TreeBin<K,V>(hd)); } } } } }tryPresize(對陣列進行擴容):這裡牽扯到一個【多執行緒併發擴容】,簡而言之,就是允許多個執行緒對陣列協助擴容,
擴容的本質:
- 每次把陣列的長度擴大到原來的一倍
- 然後把老的資料遷移到新的陣列
private final void tryPresize(int size) { //用於判斷擴容的目標大小 int c = (size >= (MAXIMUM_CAPACITY >>> 1)) ? MAXIMUM_CAPACITY : tableSizeFor(size + (size >>> 1) + 1);//這裡就是為了滿足2m³,如果是15那就給你轉化成16(轉換成最近的2m³) int sc; //說明要做陣列的初始化,因為這個方法在別的地方也有使用,所以要進行判斷 while ((sc = sizeCtl) >= 0) { Node<K,V>[] tab = table; int n; //這裡進行陣列的初始化 if (tab == null || (n = tab.length) == 0) { //這裡陣列容量是通過對比的計算出來的,初始容量(我們可以自定義chm的大小)和擴容容量,誰的數字大就選擇誰作為陣列的預設長度 n = (sc > c) ? sc : c; if (U.compareAndSwapInt(this, SIZECTL, sc, -1)) { try { if (table == tab) { @SuppressWarnings("unchecked") Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n]; table = nt; sc = n - (n >>> 2); } } finally { sizeCtl = sc; } } } //已經是最大的容量,則不進行擴容了,直接返回 else if (c <= sc || n >= MAXIMUM_CAPACITY) break; else if (tab == table) { //這裡生成一個擴容戳,這是為了保證當前擴容範圍的唯一性(使用多執行緒來執行分段擴容),我們在下面聊聊這個東西,他挺有意思的
int rs = resizeStamp(n); //第一次擴容的時候,不會走這一段邏輯,因為sc不小於1 if (sc < 0) { Node<K,V>[] nt; //表示擴容結束 if ((sc >>> RESIZE_STAMP_SHIFT) != rs || sc == rs + 1 || sc == rs + MAX_RESIZERS || (nt = nextTable) == null || transferIndex <= 0) break; //表示沒有結束,所以每次增加一個執行緒進行擴容,則在【低位】加一,當擴容結束後低位就開始遞減,證明執行緒一個個開始釋放,最終低位全部都是0,則說明擴容完畢 if (U.compareAndSwapInt(this, SIZECTL, sc, sc + 1))
//這裡實現陣列的資料轉移 transfer(tab, nt); }
//第一次走這個邏輯,他會把我們上面算出的rs左移16位 就會變成(換算的結果看下面的resizeStamp分析)-> 1000 0000 0001 1011 0000 0000 0000 0000
//算出來的戳的 【高16位】標識當前的額擴容標記,【低16位】標識當前擴容的執行緒數量
//然後對換算出來的結果加2 (2的二進位制是10)
//於是就變成了 1000 000 0001 1011 0000 0000 0000 0010 (上面講高位標識擴容標記,低位表示擴容的執行緒數量,那這裡就是一個執行緒在參加擴容) else if (U.compareAndSwapInt(this, SIZECTL, sc, (rs << RESIZE_STAMP_SHIFT) + 2)) transfer(tab, null); } } }因為是多執行緒共同參與資料的遷移,那我肯定要有一個地方記錄參與這個任務的執行緒數量
【numberOfLeadingZeros】:這個方法會【返回這個資料的二進位制串中從最左邊算起連續的“0”的總數量】,
- 例如 一個int型別的長度是32 【0000 0000 0000 0000 0000 0000 0000 0000】 現在在17位這裡有一個1,那就在前面有16個0這個時候這個方法就會返回16 【0000 0000 0000 0000 1000 0000 0000 0000】
static final int resizeStamp(int n) {
//把前面計算出來的二進位制的第16位變成1,比如說現在key計算出來的是16 那轉換出來的二進位制就是10000 因為int是32位所以給他前面補充0就變成了
//0000 0000 0000 0000 0000 0000 0001 0000 ->那他最高位數前面的數字就有27個0就返回27
// 27換算出來的二進位制是 【11011】 給前面補充0之後就變成了 0000 0000 0000 0000 0000 0000 000 1 1011
// 給他的第16位變成1那就變成->0000 0000 0000 0000 1000 0000 0001 1011return Integer.numberOfLeadingZeros(n) | (1 << (RESIZE_STAMP_BITS - 1)); }