1. 程式人生 > >Java高並發 -- 並發擴展

Java高並發 -- 並發擴展

hash沖突 lee ret 包含 裏的 服務 imu 找不到 transient

Java高並發 -- 並發擴展

死鎖

死鎖是指兩個或兩個以上的事務在執行過程中,因爭奪鎖資源而造成的一種互相等待的現象,若無外力作用兩個事務都無法推進,這樣就產生了死鎖。死鎖的四個必要條件:

  • 互斥條件:即任何時刻,一個資源只能被一個進程使用。其他進程必須等待。
  • 請求和保持條件:即當資源請求者在請求其他的資源的同時保持對原有資源的占有且不釋放。
  • 不剝奪條件:資源請求者不能強制從資源占有者手中奪取資源,資源只能由資源占有者主動釋放。
  • 環路等待條件:比如A占有B在等待的資源(B等待A釋放),B占有A在等待的資源(A等待B釋放)。多個進程循環等待著相鄰進程占用著的資源。

是指兩個或兩個以上的線程在執行過程中,互相占用著對方想要的資源但都不釋放,造成了互相等待,結果線程都無法向前推進。

死鎖的檢測:可以采用等待圖(wait-for gragh)。采用深度優先搜索的算法實現,如果圖中有環路就說明存在死鎖。

解決死鎖:

  • 破環鎖的四個必要條件之一,可以預防死鎖。
  • 加鎖順序保持一致。不同的加鎖順序很可能導致死鎖,比如哲學家問題:A先申請筷子1在申請筷子2,而B先申請筷子2在申請筷子1,最後誰也得不到一雙筷子(同時擁有筷子1和筷子2)
  • 撤消或掛起進程,剝奪資源。終止參與死鎖的進程,收回它們占有的資源,從而解除死鎖。

舉個簡單的例子

public class DeadLock implements Runnable {

    public static Object fork1 = new Object();
    public static Object fork2 = new Object();

    private String name;
    private Object tool;

    public DeadLock(Object o) {
        this.tool = o;
        if (tool == fork1) {
            this.name = "哲學家A";
        }
        if (tool == fork2) {
            this.name = "哲學家B";
        }
    }

    @Override
    public void run() {
        if (tool == fork1) {
            synchronized (fork1) {
                try {
                    System.out.println(name+"拿到了一個叉子");
                    Thread.sleep(500);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                synchronized (fork2) {
                    System.out.println(name+"拿到兩個叉子了");
                }
            }
        }

        if (tool == fork2) {
            synchronized (fork2) {
                try {
                    System.out.println(name+"拿到了一個叉子");
                    Thread.sleep(500);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                synchronized (fork1) {
                    System.out.println(name+"拿到兩個叉子了");
                }
            }
        }
    }

    public static void main(String[] args) {
        DeadLock a = new DeadLock(fork1);
        DeadLock b = new DeadLock(fork2);

        Thread t1 = new Thread(a);
        Thread t2 = new Thread(b);
        t1.start();
        t2.start();

    }
}

運行上面這段程序,會輸出

哲學家B拿到了一個叉子
哲學家A拿到了一個叉子

然後程序就進入了死循環,因為哲學家A在等B手裏的叉子,哲學家B也在等A手上的叉子,但是他倆誰都不肯釋放。

HashMap和ConcurrentHshMap

HashMap

HashMap的底層使用數組+鏈表/紅黑樹實現。

transient Node<K,V>[] table;這表示HashMap是Node數組構成,其中Node類的實現如下,可以看出這其實就是個鏈表,鏈表的每個結點是一個<K,V>映射。

static class Node<K,V> implements Map.Entry<K,V> {
    final int hash;
    final K key;
    V value;
    Node<K,V> next;
}

HashMap的每個下標都存放了一條鏈表。

常量/變量定義

/* 常量定義 */

// 初始容量為16
static final int DEFAULT_INITIAL_CAPACITY = 1 << 4;
// 最大容量
static final int MAXIMUM_CAPACITY = 1 << 30;
// 負載因子,當鍵值對個數達到DEFAULT_INITIAL_CAPACITY * DEFAULT_LOAD_FACTOR會觸發resize擴容 
static final float DEFAULT_LOAD_FACTOR = 0.75f;
// 當鏈表長度大於8,且數組長度大於MIN_TREEIFY_CAPACITY,就會轉為紅黑樹
static final int TREEIFY_THRESHOLD = 8;
// 當resize時候發現鏈表長度小於6時,從紅黑樹退化為鏈表
static final int UNTREEIFY_THRESHOLD = 6;
// 在要將鏈表轉為紅黑樹之前,再進行一次判斷,若數組容量小於該值,則用resize擴容,放棄轉為紅黑樹
// 主要是為了在建立Map的初期,放置過多鍵值對進入同一個數組下標中,而導致不必要的鏈表->紅黑樹的轉化,此時擴容即可,可有效減少沖突
static final int MIN_TREEIFY_CAPACITY = 64;

/* 變量定義 */

// 鍵值對的個數
transient int size;
// 鍵值對的個數大於該值時候,會觸發擴容
int threshold;
// 非線程安全的集合類中幾乎都有這個變量的影子,每次結構被修改都會更新該值,表示被修改的次數
transient int modCount;

關於modCount的作用見這篇blog

在一個叠代器初始的時候會賦予它調用這個叠代器的對象的modCount,如果在叠代器遍歷的過程中,一旦發現這個對象的modCount和叠代器中存儲的modCount不一樣那就拋異常。
Fail-Fast機制:java.util.HashMap不是線程安全的,因此如果在使用叠代器的過程中有其他線程修改了map,那麽將拋出ConcurrentModificationException,這就是所謂fail-fast策略。這一策略在源碼中的實現是通過modCount域,modCount顧名思義就是修改次數,對HashMap內容的修改都將增加這個值,那麽在叠代器初始化過程中會將這個值賦給叠代器的expectedModCount。在叠代過程中,判斷modCount跟expectedModCount是否相等,如果不相等就表示已經有其他線程修改了Map。

註意初始容量和擴容後的容量都必須是2的次冪,為什麽呢?

hash方法

先看散列方法

static final int hash(Object key) {
    int h;
    return (key == null) ? 0 : (h = key.hashCode()) ^ (h >>> 16);
}

HashMap的散列方法如上,其實就是將hash值的高16位和低16位異或,我們將馬上看到hash在與n - 1相與的時候,高位的信息也被考慮了,能使碰撞的概率減小,散列得更均勻。

在JDK 8中,HashMap的putVal方法中有這麽一句

if ((p = tab[i = (n - 1) & hash]) == null)
    tab[i] = newNode(hash, key, value, null);

關鍵就是這句(n - 1) & hash,這行代碼是把待插入的結點散列到數組中某個下標中,其中hash就是通過上面的方法的得到的,為待插入Node的key的hash值,n是table的容量即table.length,2的次冪用二進制表示的話,只有最高位為1,其余為都是0。減去1,剛好就反了過來。比如16的二進制表示為10000,減去1後的二進制表示為01111,除了最高位其余各位都是1,保證了在相與時,可以使得散列值分布得更均勻(因為如果某位為0比如1011,那麽結點永遠不會被散列到1111這個位置),且當n為2的次冪時候有(n - 1) & hash == hash % n, 舉個例子,比如hash等於6時候,01111和00110相與就是00110,hash等於16時,相與就等於0了,多舉幾個例子便可以驗證這一結論。最後來回答為什麽HashMap的容量要始終保持2的次冪

  • 使散列值分布均勻
  • 位運算的效率比取余的效率高

註意table.length是數組的容量,而transient int size表示存入Map中的鍵值對數。

int threshold表示臨界值,當鍵值對的個數大於臨界值,就會擴容。threshold的更新是由下面的方法完成的。

static final int tableSizeFor(int cap) {
    int n = cap - 1;
    n |= n >>> 1;
    n |= n >>> 2;
    n |= n >>> 4;
    n |= n >>> 8;
    n |= n >>> 16;
    return (n < 0) ? 1 : (n >= MAXIMUM_CAPACITY) ? MAXIMUM_CAPACITY : n + 1;
}

該方法返回大於等於cap的最小的二次冪數值。比如cap為16,就返回16,cap為17就返回32。

put方法

put方法主要由putVal方法實現:

  • 如果沒有產生hash沖突,直接在數組tab[i = (n - 1) & hash]處新建一個結點;
  • 否則,發生了hash沖突,此時key如果和頭結點的key相同,找到要更新的結點,直接跳到最後去更新值
  • 否則,如果數組下標中的類型是TreeNode,就插入到紅黑樹中
  • 如果只是普通的鏈表,就在鏈表中查找,找到key相同的結點就跳出,到最後去更新值;到鏈表尾也沒有找到就在尾部插入一個新結點。接著判斷此時鏈表長度若大於8的話,還需要將鏈表轉為紅黑樹(註意在要將鏈表轉為紅黑樹之前,再進行一次判斷,若數組容量小於64,則用resize擴容,放棄轉為紅黑樹)

get方法

get方法由getNode方法實現:

  • 如果在數組下標的鏈表頭就找到key相同的,那麽返回鏈表頭的值
  • 否則如果數組下標處的類型是TreeNode,就在紅黑樹中查找
  • 否則就是在普通鏈表中查找了
  • 都找不到就返回null

remove方法的流程大致和get方法類似。

HashMap的擴容,resize()過程?

newCap = oldCap << 1

resize方法中有這麽一句,說明是擴容後數組大小是原數組的兩倍。

Node<K,V>[] newTab = (Node<K,V>[])new Node[newCap];
        table = newTab;
        if (oldTab != null) {
            for (int j = 0; j < oldCap; ++j) {
                Node<K,V> e;
                if ((e = oldTab[j]) != null) {
                    oldTab[j] = null;
                    // 如果數組中只有一個元素,即只有一個頭結點,重新哈希到新數組的某個下標
                    if (e.next == null)
                        newTab[e.hash & (newCap - 1)] = e;
                    else if (e instanceof TreeNode)
                        ((TreeNode<K,V>)e).split(this, newTab, j, oldCap);
                    else { // preserve order
                        // 數組下標處的鏈表長度大於1,非紅黑樹的情況
                        Node<K,V> loHead = null, loTail = null;
                        Node<K,V> hiHead = null, hiTail = null;
                        Node<K,V> next;
                        do {
                            next = e.next;
                            // oldCap是2的次冪,最高位是1,其余為是0,哈希值和其相與,根據哈希值的最高位是1還是0,鏈表被拆分成兩條,哈希值最高位是0分到loHead。
                            if ((e.hash & oldCap) == 0) {
                                if (loTail == null)
                                    loHead = e;
                                else
                                    loTail.next = e;
                                loTail = e;
                            }
                            // 哈希值最高位是1分到hiHead
                            else {
                                if (hiTail == null)
                                    hiHead = e;
                                else
                                    hiTail.next = e;
                                hiTail = e;
                            }
                        } while ((e = next) != null);
                        if (loTail != null) {
                            loTail.next = null;
                            // loHead掛到新數組[原下標]處;
                            newTab[j] = loHead;
                        }
                        if (hiTail != null) {
                            hiTail.next = null;
                            // hiHead掛到新數組中[原下標+oldCap]處
                            newTab[j + oldCap] = hiHead;
                        }
                    }
                }
            }
        }
        return newTab;

舉個例子,比如oldCap是16,二進制表示是10000,hash值的後五位和oldCap相與,因為oldCap的最高位(從右往左數的第5位)是1其余位是0,因此hash值的該位是0的所有元素被分到一條鏈表,掛到新數組中原下標處,hash值該位為1的被分到另外一條鏈表,掛到新數組中原下標+oldCap處。舉個例子:桶0中的元素其hash值後五位是0XXXX的就被分到桶0種,其hash值後五位是1XXXX就被分到桶4中。

ConcurrentHashMap

JDK 7中使用的是分段鎖,內部分成了16個Segment即分段,每個分段可以看作是一個小型的HashMap,每次put只會鎖定一個分段,降低了鎖的粒度:

  • 首先根據key計算出一個hash值,找到對應的Segment
  • 調用Segment的lock方法(Segment繼承了重入鎖),鎖住該段內的數據,所以並沒有鎖住ConcurrentHashMap的全部數據
  • 根據key計算出hash值,找到Segment中數組中對應下標的鏈表,並將該數據放置到該鏈表中
  • 判斷當前Segment包含元素的數量大於閾值,則Segment進行擴容(Segment的個數是不能擴容的,但是單個Segment裏面的數組是可以擴容的)

多線程put的時候,只要被加入的鍵值不屬於 同一個分段,就可以做到真正的並行put。對不同的Segment則無需考慮線程同步,對於同一個Segment的操作才需考慮。

JDK 8中使用了CAS+synchronized保證線程安全,也采取了數組+鏈表/紅黑樹的結構。

put時使用synchronized鎖住了桶中鏈表的頭結點。

數組的擴容,被問到了我在看吧.....我只知道多個線程可以協助數據的遷移。

有這麽一個問題,ConcurrentHashMap,有三個線程,A先put觸發了擴容,擴容時間很長,此時B也put會怎麽樣?此時C調用get方法會怎麽樣?C讀取到的元素是舊桶中的元素還是新桶中的

A先觸發擴容,ConcurrentHashMap遷移是在鎖定舊桶的前提下進行遷移的,並沒有去鎖定新桶。

  • 在某個桶的遷移過程中,別的線程想要對該桶進行put操作怎麽辦?一旦某個桶在遷移過程中了,必然要獲取該桶的鎖,所以其他線程的put操作要被阻塞。因此B被阻塞
  • 某個桶已經遷移完成(其他桶還未完成),別的線程想要對該桶進行put操作怎麽辦?該線程會首先檢查是否還有未分配的遷移任務,如果有則先去執行遷移任務,如果沒有即全部任務已經分發出去了,那麽此時該線程可以直接對新的桶進行插入操作(映射到的新桶必然已經完成了遷移,所以可以放心執行操作)

ConcurrentHashMap的get操作沒有加鎖,所以可以讀取到值,不過是舊桶中的值。

if (finishing) {
    nextTable = null;
    table = nextTab;
    sizeCtl = (n << 1) - (n >>> 1);
    return;
}

從table = nextTable可以看出,當所有數據遷移完成時,才將用nextTab新數組去覆蓋舊數組table。所以在A擴容過程中,C讀取到的是舊數組中的元素

擴容

  • 垂直擴容(垂直擴展):提升單機處理能力(提高硬盤、內存、CPU)
  • 水平擴容(水平擴展):增加系統成員數(增加服務器個數)

Java高並發 -- 並發擴展