1. 程式人生 > >JDK併發工具類原始碼學習系列——ConcurrentSkipListMap(續)

JDK併發工具類原始碼學習系列——ConcurrentSkipListMap(續)

上一篇介紹了ConcurrentSkipListMap的原理以及對put()方法進行了解析,本篇繼續接著上一篇往下看。
上一篇說到put()的最後一句:insertIndex(z, level);這一句是將一個新節點插入到跳錶結構中,下面看看是如何實現的。

private void insertIndex(Node<K,V> z, int level) {
 HeadIndex<K,V> h = head;// 讀取跳錶的頭
    int max = h.level;// 跳錶的最高層級

    if (level <= max) {// level <= max則需要將自最底層到level層的每一層插入該節點
// 此處從最底層開始往上建立跳錶節點,每個節點的down指向下一層的節點, // 最後的idx為level層的節點,通過down形成一個連結串列結構 Index<K,V> idx = null; for (int i = 1; i <= level; ++i) idx = new Index<K,V>(z, idx, null); // 將該節點插入跳錶 addIndex(idx, h, level); } else { // Add a new level(新增加一層)
/* * To reduce interference by other threads checking for * empty levels in tryReduceLevel, new levels are added * with initialized right pointers. Which in turn requires * keeping levels in an array to access them while * creating new head index nodes from the opposite * direction. */
level = max + 1;// 最大值+1 Index<K,V>[] idxs = (Index<K,V>[])new Index[level+1];//一個數組 Index<K,V> idx = null;// 一個新的跳錶頭結點 // 該陣列是一個從level層開始一直向下引用的垂直鏈表,用於方便獲取某層的節點 // 用處在下面if (level <= oldLevel)出可以看出 for (int i = 1; i <= level; ++i) idxs[i] = idx = new Index<K,V>(z, idx, null); HeadIndex<K,V> oldh;//舊頭結點 int k; for (;;) { oldh = head; int oldLevel = oldh.level; // 此處如果發生level <= oldLevel說明其他執行緒已經增加了這一層,那麼當前執行緒要插入的層已經不>跳錶的最大層了 // 那直接從陣列中渠道level層的節點插入到跳錶即可,插入方式同上面 if (level <= oldLevel) { // lost race to add level k = level; break; } HeadIndex<K,V> newh = oldh; Node<K,V> oldbase = oldh.node; for (int j = oldLevel+1; j <= level; ++j) // 迴圈建立頭結點,直到level層,其實從level = max + 1;這裡可以看出也就最多增加一層 // 新的頭結點node和下一層的node一樣,down引用下一層,right是我們新建的節點(通過idxs陣列直接訪問第N層的節點) newh = new HeadIndex<K,V>(oldbase, newh, idxs[j], j); // 用新的頭結點取代舊的頭結點,使用CAS避免併發安全問題 if (casHead(oldh, newh)) { // 此處k = oldLevel,直接導致下面插入節點的時候從oldLevel開始插入, // 因為oldLevel之上是新插入的層級,每層都是一個接節點,所以已經是OK的了 k = oldLevel; break; } } // 插入節點 addIndex(idxs[k], oldh, k); } } private void addIndex(Index<K,V> idx, HeadIndex<K,V> h, int indexLevel) { // Track next level to insert in case of retries int insertionLevel = indexLevel; Comparable<? super K> key = comparable(idx.node.key); if (key == null) throw new NullPointerException(); // Similar to findPredecessor, but adding index nodes along // path to key. // 此處死迴圈是在發生不一致時進行重試 for (;;) { // 此處從跳錶頭部開始找到該插入該節點的位置,類似findPredecessor int j = h.level; Index<K,V> q = h; Index<K,V> r = q.right; Index<K,V> t = idx; for (;;) { // 在當前層級查詢適合插入節點的位置 if (r != null) { Node<K,V> n = r.node; // compare before deletion check avoids needing recheck int c = key.compareTo(n.key); if (n.value == null) {// 如果r已被刪除,則移除r if (!q.unlink(r)) break; r = q.right;//r指向下一層節點,繼續 continue; } if (c > 0) {//如果當前訪問的節點key<需插入節點的key,則繼續右移 q = r; r = r.right; continue; } } // 到達此處有兩種情況: // 1.已訪問到該層連結串列的最右邊 // 2.當前訪問的節點(r)的key>=需插入節點的key,則繼續右移 if (j == insertionLevel) {// 如果找到的層級==要插入的層級,那麼就在該層級插入該節點 // Don't insert index if node already deleted if (t.indexesDeletedNode()) {// 判斷要插入的節點是否已經被刪除,此處考慮的是多執行緒情況下可能發生的併發問題 findNode(key); // cleans up return; } // 將t插入到q的右邊,替代之前的r,通過CAS確保併發安全 if (!q.link(r, t)) break; // restart if (--insertionLevel == 0) {// 插入下一層,直到0為止 // need final deletion check before return if (t.indexesDeletedNode()) findNode(key); return; } } // 將j-1有兩個目的: // 1.如果在-1之前j == insertionLevel,即查詢已經定位到要插入節點的那一層,則insertionLevel會-1,所以j也要跟著-1 // 2.在-1之前j != insertionLevel,說明還未查詢到要插入的那一層,需要繼續往下查詢 //此處需要同時滿足兩個條件才可向下一層查詢: // 1.-1之後的j>=insertionLevel(此處可能是還未找到要插入的層,也可能已經找到了) // 2.j < indexLevel,針對第一點還未找到要插入的層,如果j>=indeLevel說明該層不需要插入節點 if (--j >= insertionLevel && j < indexLevel) // t=t.down即將t往下移動一層,此處的t是在每一層要插入的節點,其實也就是要插入節點的下一層 t = t.down; // 到這裡存在兩種情況: // 1.上層已經插入了 // 2.還未找到要插入的層(第一個要插入的層,也就是新插入節點被分配到的層級) // 不管上面如何處理,到這裡都需要往下遍歷了 q = q.down; r = q.right; } } }

真的挺複雜的邏輯,本身跳錶的插入就有點複雜,現在還需要考慮併發問題,就更多複雜難懂了。程式碼中加入了大量的註釋,希望能夠說明白吧,其實自己看著也挺蒙,明白原理就行,哈哈。
put()總算說完了,下面看看get()吧。

get(Object)
private V doGet(Object okey) {
Comparable<? super K> key = comparable(okey);
    Node<K,V> bound = null;
    Index<K,V> q = head;
    Index<K,V> r = q.right;
    Node<K,V> n;
    K k;
    int c;
    for (;;) {
        Index<K,V> d;
        // Traverse rights
        if (r != null && (n = r.node) != bound && (k = n.key) != null) {
            if ((c = key.compareTo(k)) > 0) {
                q = r;
                r = r.right;
                continue;
            } else if (c == 0) {
                Object v = n.value;
                return (v != null)? (V)v : getUsingFindNode(key);
            } else
                // 這裡當前查詢的節點(n)key>指定key說明已經跑過頭了,此時需要往下找了
                // bound在這裡的作用是在下一層進行查詢時如果找到了這一層的bound節點則說明又需要往下找了
                // 這裡如果理解了跳錶的結構就好理解了,因為有的節點並不是每層都有
                bound = n;
        }

        // Traverse down
        if ((d = q.down) != null) {
            q = d;
            r = d.right;
        } else
            break;
    }
    // 如果執行到這裡,說明已經找到最底層了,但是依舊沒找到該節點,所以要找的節點如果存在那麼就是在最底層的靠右端
    // 所以還需要往右查詢
    // Traverse nexts
    for (n = q.node.next;  n != null; n = n.next) {
        if ((k = n.key) != null) {
            if ((c = key.compareTo(k)) == 0) {
                Object v = n.value;
                return (v != null)? (V)v : getUsingFindNode(key);
            } else if (c < 0)
                break;
        }
    }
    return null;
}

private V getUsingFindNode(Comparable<? super K> key) {
    /*
     * Loop needed here and elsewhere in case value field goes
     * null just as it is about to be returned, in which case we
     * lost a race with a deletion, so must retry.
     */
    // 該方法迴圈查詢key的節點,如果找不到key對應的節點,則直接返回null
    // 如果找到對應的節點,但是value==null,則重試(原因尚不清楚,註釋也不是太明白)
    for (;;) {
        // findNode返回的n如果不為null,那麼就已經判斷過n的value是否為null了,
        // 下面再次判斷可能是為了避免其他執行緒在這段時間內刪掉了這個節點
        // 不過既然被其他執行緒刪掉了為什麼還要去找呢,直接返回null不就好了,是在不明白,難道是為了再去刪掉這個節點?
        Node<K,V> n = findNode(key);
        if (n == null)
            return null;
        Object v = n.value;
        if (v != null)
            return (V)v;
    }
}

/**
* @By Vicky:此方法用於根據key查詢對應的節點
 */
private Node<K,V> findNode(Comparable<? super K> key) {
    for (;;) {
        // findPredecessor找到key的左邊的節點,之前有介紹
        Node<K,V> b = findPredecessor(key);
        // n是b的next節點,正確情況下n的key==key
        Node<K,V> n = b.next;
        for (;;) {
            if (n == null)  // n==null說明b是最右邊的節點,所以其實未找到key對應的節點
                return null;
            Node<K,V> f = n.next;
            if (n != b.next)                // inconsistent read
                break;
            Object v = n.value;
            // n被刪除了,helpDelete掉
            if (v == null) {                // n is deleted
                n.helpDelete(b, f);
                break;
            }
            // b被打上了delete標記,等待其他執行緒將其刪除
            if (v == n || b.value == null)  // b is deleted
                break;
            int c = key.compareTo(n.key);
            if (c == 0)// 即n就是要查詢的節點
                return n;
            if (c < 0)// b的key小於指定key,但是b的下一個節點的key大於指定key,說明沒指定key對應的節點
                return null;
            b = n;// 繼續往右走
            n = f;
        }
    }
}

get()過程涉及三個方法,都是查詢,相對put()來說,get簡單多了,就是按跳錶的結構查詢,只是需要判斷節點是否被刪除,同時還需要幫忙將被刪除的節點移除跳錶結構。程式碼中的註釋應該比較清楚了,就不再細述了,下面看remove()方法。

remove(Object)
public V remove(Object key) {
    return doRemove(key, null);
}

/**
* @By Vicky:刪除的邏輯和查詢類似,將找到的節點的value置為null,同時將其打上刪除標記 
 */
final V doRemove(Object okey, Object value) {
    Comparable<? super K> key = comparable(okey);
    for (;;) {
        // findPredecessor找到最底層的前一個節點(這裡的節點是連結串列的節點,而非跳錶的節點)
        Node<K,V> b = findPredecessor(key);
        Node<K,V> n = b.next;
        for (;;) {
            // b.next==null說明未找到響應的節點,直接返回null
            if (n == null)
                return null;
            Node<K,V> f = n.next;
            if (n != b.next)                    // inconsistent read
                break;
            Object v = n.value;
            // 下面分別判斷n和b是否已被刪除
            if (v == null) {                    // n is deleted
                n.helpDelete(b, f);
                break;
            }
            if (v == n || b.value == null)      // b is deleted
                break;
            int c = key.compareTo(n.key);
            // c<0說明findPredecessor找到的節點的下一個節點的key<要刪除的key
            // 正確情況下(無併發情況下)c應該是>=0才對
            if (c < 0)
                return null;
            // c>0繼續查詢
            if (c > 0) {
                b = n;
                n = f;
                continue;
            }
            // value!=null則說明需要待刪除的節點的值和指定的值一樣才可刪除
            if (value != null && !value.equals(v))
                return null;
            // 將節點的value置為null即表示刪除
            if (!n.casValue(v, null))
                break;
            // 同時為n新增刪除標記
            if (!n.appendMarker(f) || !b.casNext(n, f))
                // 這一步其實就是為了觸發刪除節點
                findNode(key);                  // Retry via findNode
            else {
                // 觸發刪除節點,同時如果最高層的節點被刪完了則降一層
                findPredecessor(key);           // Clean index
                if (head.right == null)
                    tryReduceLevel();
            }
            return (V)v;
        }
    }
}

remove()和get()差不多,畢竟刪除節點的第一步就是找到這個節點,程式碼中添加了註釋,這裡不再細述。

看完了三個重要方法,說實話雖然每個方法中的每行程式碼都可以理解,但是想要理解為什麼這麼做還是很難的,畢竟程式碼是死的。就比如說remove和get,remove中也要查詢節點,但是remove是呼叫findPredecessor()來直接查詢最底層連結串列的前繼節點,而get是自己按照跳錶的結構進行查詢,為什麼呢?還有一點,所有的跳錶節點中引用的node(連結串列節點)都是一樣的,即如果第2層跳錶引用一個node(key=4)和第1層引用的node(key=4)其實是同一個引用,而且同最底層的連結串列的node也是同一個,所以如果找到最上層的那個節點,不就是相當於找到最底層的那個節點了嗎?為什麼非得找最底層呢?
這裡可以給出一個答案,那就是findPredecessor()會將已經被刪除的跳錶的節點從跳錶結構中移除,因為刪除節點只是將value置為null,順便加個刪除標記,但是這些節點還掛在跳錶中,所以需要人為的觸發findPredecessor()去一個個刪除,也許這就是目的吧。

這三個方法都沒有加鎖,使用了CAS進行更新,同時修改結構的時候都是控制在一個CAS操作中完成,例如put()方法在將一個節點插入到連結串列時,是先建立一個新的節點,這個節點的next指向要插入的位置的下一個節點,這步操作不會影響整體的結構,所以是安全的,然後將要插入位置的前一個節點的next原子性的更新成新的節點,所以也不會影響到其他執行緒,而remove則只是將節點的value值修改為null,以及原子的更新節點的next,所以CAS+volatile就是JDKconcurrent包下的併發控制的實現原理。

使用場景

ConcurrentSkipListMap使用跳錶結構來儲存連結串列的順序,解決了在有序連結串列中快速查詢的問題,所以ConcurrentSkipListMap適合在需要連結串列的高效更新效率(刪除/插入)以及還要保證一定的隨機訪問效率(查詢/更新)的場景下使用。

參考文章

以上就是本篇的全部內容,如有錯誤之處,還望大家指出,謝謝~~~