JDK容器與併發—Map—ConcurrentSkipListMap
概述
基於跳錶實現的ConcurrentNavigableMap。
1)containsKey、get、put、remove等操作的平均時間複雜度為log(n);size非固定時間操作,因非同步特性,需要遍歷所有節點才能確定size,且可能不是正確的值如果遍歷過程中有修改;批量操作:putAll、equals、toArray、containsValue、clear非原子性。
2)增刪改查操作併發執行緒安全;
3)迭代器是弱一致性的:map建立時或之後某個時間點的,不丟擲ConcurrentModificationException,可能與其他操作併發執行。升序檢視及迭代器比降序的要快;
資料結構
基於連結串列,有三種類型節點Node、Index、HeadIndex,底層為Node單鏈表有序層(升序),其他層為基於Node層的Index層即索引層,可能有多個索引層。
相對於單鏈表,跳錶查詢節點很快是在於:通過HeadIndex維護索引層次,通過Index從最上層開始往下查詢元素,一步步縮小查詢範圍,到了最底層Node單鏈表層,就只需要比較很少的元素就可以找到待查詢的元素節點。
// Node節點,擁有key-value對 // 單鏈表,有序,第一個節點為head.node標記節點,中間可能會穿插些刪除標記節點(即marker節點) static final class Node<K,V> { final K key; volatile Object value; // value為Object型別,便於區分刪除標記節點、head.node標記節點 volatile Node<K,V> next; /** * Creates a new regular node. */ Node(K key, Object value, Node<K,V> next) { this.key = key; this.value = value; this.next = next; } // 建立一個刪除標記節點 // 刪除標記節點的value為this,以區分其他Node節點; // key為null,其他場合會用到,但不能用作區分,因為head.node的key也為null Node(Node<K,V> next) { this.key = null; this.value = this; this.next = next; } // CAS value屬性 boolean casValue(Object cmp, Object val) { return UNSAFE.compareAndSwapObject(this, valueOffset, cmp, val); } // CAS next屬性 boolean casNext(Node<K,V> cmp, Node<K,V> val) { return UNSAFE.compareAndSwapObject(this, nextOffset, cmp, val); } // 是否為刪除標記節點 boolean isMarker() { return value == this; } // 是否為header node節點 boolean isBaseHeader() { return value == BASE_HEADER; } // 在當前Node節點後增加刪除標記節點(採用CAS next方式實現) boolean appendMarker(Node<K,V> f) { return casNext(f, new Node<K,V>(f)); } // 推進刪除Node節點 // 一般在遍歷過程中,如果遇到當前Node的value為null,會呼叫該方法 void helpDelete(Node<K,V> b, Node<K,V> f) { // 首先檢查當前的連結是否為b——>this——>f; // 再進行推進刪除,過程分兩步,每一步都採用CAS實現 // 每次只推進一步,以減小推進執行緒間的干擾 if (f == next && this == b.next) { // 檢測是否為b——>this——>f,其中b為前驅節點,f為後繼節點 if (f == null || f.value != f) // 待刪除節點未進行標記 appendMarker(f);// 連結刪除標記節點 else b.casNext(this, f.next); // 刪除當前節點及其刪除標記節點,完成刪除 } } // 獲取節點的有效value V getValidValue() { Object v = value; if (v == this || v == BASE_HEADER) // 若為刪除標記節點、header node節點,則返回null return null; return (V)v; } // 對有效鍵值對封裝到不可變的Entry AbstractMap.SimpleImmutableEntry<K,V> createSnapshot() { V v = getValidValue(); if (v == null) return null; return new AbstractMap.SimpleImmutableEntry<K,V>(key, v); } // UNSAFE mechanics private static final sun.misc.Unsafe UNSAFE; private static final long valueOffset; private static final long nextOffset; static { try { UNSAFE = sun.misc.Unsafe.getUnsafe(); Class k = Node.class; valueOffset = UNSAFE.objectFieldOffset (k.getDeclaredField("value")); nextOffset = UNSAFE.objectFieldOffset (k.getDeclaredField("next")); } catch (Exception e) { throw new Error(e); } } } // 索引節點,用於從最上層往下縮小查詢範圍 // 邏輯上的雙鏈表,非前後連結,而是上下連結 static class Index<K,V> { final Node<K,V> node; // 索引節點是基於Node節點的 final Index<K,V> down;// down為final的,簡化併發 volatile Index<K,V> right; /** * Creates index node with given values. */ Index(Node<K,V> node, Index<K,V> down, Index<K,V> right) { this.node = node; this.down = down; this.right = right; } // CAS right屬性 final boolean casRight(Index<K,V> cmp, Index<K,V> val) { return UNSAFE.compareAndSwapObject(this, rightOffset, cmp, val); } // 其Node節點是否刪除 final boolean indexesDeletedNode() { return node.value == null; } // 連結Index節點 // 如果連結過程中,其Node節點已刪除,則不連結,以減小與解連結的CAS競爭 /* * @param succ the expected current successor * @param newSucc the new successor * @return true if successful */ final boolean link(Index<K,V> succ, Index<K,V> newSucc) { Node<K,V> n = node; newSucc.right = succ; // 將newSucc連結進來 return n.value != null && casRight(succ, newSucc); // CAS right } // 解連結index節點,如果其Node已刪除,則解連結失敗 /** * @param succ the expected current successor * @return true if successful */ final boolean unlink(Index<K,V> succ) { return !indexesDeletedNode() && casRight(succ, succ.right);// CAS right } // Unsafe mechanics private static final sun.misc.Unsafe UNSAFE; private static final long rightOffset; static { try { UNSAFE = sun.misc.Unsafe.getUnsafe(); Class k = Index.class; rightOffset = UNSAFE.objectFieldOffset (k.getDeclaredField("right")); } catch (Exception e) { throw new Error(e); } } } // HeadIndex節點,跟蹤索引層次 static final class HeadIndex<K,V> extends Index<K,V> { final int level;// 索引層,從1開始,Node單鏈表層為0 HeadIndex(Node<K,V> node, Index<K,V> down, Index<K,V> right, int level) { super(node, down, right); this.level = level; } }
JDK中一個例項結構圖:
WIKI上關於跳錶的解釋圖:
構造器
無參構造,空Map public ConcurrentSkipListMap() { this.comparator = null; // 用Key的Comparable介面排序 initialize(); } // 帶comparator引數構造 public ConcurrentSkipListMap(Comparator<? super K> comparator) { this.comparator = comparator; initialize(); } // 帶Map引數構造,採用Key的Comparable介面排序 public ConcurrentSkipListMap(Map<? extends K, ? extends V> m) { this.comparator = null; initialize(); putAll(m); } // 帶SortedMap引數構造,採用SortedMap的comparator排序 public ConcurrentSkipListMap(SortedMap<K, ? extends V> m) { this.comparator = m.comparator(); initialize(); buildFromSorted(m); }
增刪改查
初始化
final void initialize() {
keySet = null;
entrySet = null;
values = null;
descendingMap = null;
randomSeed = seedGenerator.nextInt() | 0x0100; // ensure nonzero
head = new HeadIndex<K,V>(new Node<K,V>(null, BASE_HEADER, null),
null, null, 1); // 初始化head節點,空Map
}
增、改
步驟:
1)查詢比key小的前驅節點,查詢過程中刪除待刪除Node節點的索引節點;
2)從前驅節點開始,遍歷底層Node單鏈表,若已存在相關的key-value對,則CAS替換新的value,返回舊value;若不存在,則確定插入位置;遍歷過程中,推進刪除待刪除的Node節點;
3)用key、value建立新的Node節點,用CAS next方式連結進來;
4)給新的Node節點隨機生成一個索引層次,若層次大於0,則給其增加索引節點,返回null。
public V put(K key, V value) {
if (value == null)
throw new NullPointerException();
return doPut(key, value, false);
}
private V doPut(K kkey, V value, boolean onlyIfAbsent) {
Comparable<? super K> key = comparable(kkey);
for (;;) {
Node<K,V> b = findPredecessor(key); // 找到前驅節點後,接下來就是在Node單鏈表層精確找到插入位置
Node<K,V> n = b.next;
for (;;) {
// 遍歷清除Node節點操作同findNode
if (n != null) {
Node<K,V> f = n.next;
if (n != b.next)
break;
Object v = n.value;
if (v == null) {
n.helpDelete(b, f);
break;
}
if (v == n || b.value == null)
break;
int c = key.compareTo(n.key);
if (c > 0) { // key大於n,繼續往後找
b = n;
n = f;
continue;
}
if (c == 0) { // 已有相關的key-value對
if (onlyIfAbsent || n.casValue(v, value))
return (V)v;
else
break; // CAS value失敗,則重新開始,失敗原因可能是n變成了待刪除節點或有其他修改執行緒修改過
}
// else c < 0; fall through:說明新增的key-value對需要插到b和n之間
}
Node<K,V> z = new Node<K,V>(kkey, value, n);
if (!b.casNext(n, z)) // 將新節點z插入b、n之間
break; // 失敗了,原因同n != b.next,重來
int level = randomLevel(); // 給新增的Node節點隨機生成一個索引層次
if (level > 0)
insertIndex(z, level); // 給z增加索引節點
return null;
}
}
}
// 查詢比key小的前驅節點,若沒有,則返回head.node
// 一些操作依賴該方法刪除索引節點
private Node<K,V> findPredecessor(Comparable<? super K> key) {
if (key == null)
throw new NullPointerException(); // don't postpone errors
for (;;) {
Index<K,V> q = head;
Index<K,V> r = q.right;
// 從索引層最上層開始,往右往下,
// 一直找到最下層索引層(即第一層),從而確定查詢範圍,以在底層Node單鏈表遍歷精確找到
for (;;) {
if (r != null) { // 在索引層,往右找
Node<K,V> n = r.node;
K k = n.key;
if (n.value == null) { // 遍歷到待刪除節點n的索引節點
if (!q.unlink(r))// 刪除其索引節點(採用CAS right屬性)
//刪除失敗原因:q被標記為待刪除節點或在q後增加新索引節點或已刪除了其right節點
break; // 重新開始
r = q.right;// 若刪除成功,則獲取新的right索引節點,繼續找
continue;
}
if (key.compareTo(k) > 0) { // 若key大,說明可能還有小於key的更大的,繼續找
q = r;
r = r.right;
continue;
}
}
Index<K,V> d = q.down;// 當層索引層沒有,則往下一層找,進一步縮小查詢範圍
if (d != null) {// 在下一層索引層,繼續找
q = d;
r = d.right;
} else
return q.node;// 確定前驅節點,如果沒有則為head.node標記節點
}
}
}
// 給新增的Node節點隨機生成一個索引層次
/**
* Returns a random level for inserting a new node.
* Hardwired to k=1, p=0.5, max 31 (see above and
* Pugh's "Skip List Cookbook", sec 3.4).
*
* This uses the simplest of the generators described in George
* Marsaglia's "Xorshift RNGs" paper. This is not a high-quality
* generator but is acceptable here.
*/
private int randomLevel() {
int x = randomSeed;
x ^= x << 13;
x ^= x >>> 17;
randomSeed = x ^= x << 5;
if ((x & 0x80000001) != 0) // test highest and lowest bits
return 0;
int level = 1;
while (((x >>>= 1) & 1) != 0) ++level;
return level;
}
// 為Node節點新增索引節點
/**
* @param z the node
* @param level the level of the index
*/
private void insertIndex(Node<K,V> z, int level) {
HeadIndex<K,V> h = head;
int max = h.level; // head的索引層次是最大的
if (level <= max) { // 待新增索引節點的索引層次在head索引層次內,建立索引節點新增進來即可
Index<K,V> idx = null;
for (int i = 1; i <= level; ++i)
idx = new Index<K,V>(z, idx, null);// 索引節點,從下往上鍊接
addIndex(idx, h, level); // 將索引節點連結進來
} else { // 增加一層索引層,需要new新層次的HeadIndex
/*
* To reduce interference by other threads checking for
* empty levels in tryReduceLevel, new levels are added
* with initialized right pointers. Which in turn requires
* keeping levels in an array to access them while
* creating new head index nodes from the opposite
* direction.
*/
level = max + 1;
Index<K,V>[] idxs = (Index<K,V>[])new Index[level+1];
Index<K,V> idx = null;
for (int i = 1; i <= level; ++i)
idxs[i] = idx = new Index<K,V>(z, idx, null);
HeadIndex<K,V> oldh;
int k;
for (;;) {
oldh = head;
int oldLevel = oldh.level;
if (level <= oldLevel) { // 其他執行緒增加過索引層
k = level;
break; // 同上面的level <= max情況處理
}
HeadIndex<K,V> newh = oldh;
Node<K,V> oldbase = oldh.node;
for (int j = oldLevel+1; j <= level; ++j) // 有可能其他執行緒刪除過索引層,所以從oldLevel至level增加HeadIndex
newh = new HeadIndex<K,V>(oldbase, newh, idxs[j], j); // 建立新層次的HeadIndex且將索引節點idxs相應層次連結進來
if (casHead(oldh, newh)) { // CAS head HeadIndex節點
k = oldLevel;
break;
}
}
addIndex(idxs[k], oldh, k); // 需要將idxs的舊oldLevel層次及下面的索引連結進來
}
}
/**
* 從第indexLevel層往下到第1層,將索引節點連結進來
* @param idx the topmost index node being inserted
* @param h the value of head to use to insert. This must be
* snapshotted by callers to provide correct insertion level
* @param indexLevel the level of the index
*/
private void addIndex(Index<K,V> idx, HeadIndex<K,V> h, int indexLevel) {
// Track next level to insert in case of retries
int insertionLevel = indexLevel;
Comparable<? super K> key = comparable(idx.node.key);
if (key == null) throw new NullPointerException();
// 過程與findPredecessor類似, 只是多了增加索引節點
for (;;) {
int j = h.level;
Index<K,V> q = h;
Index<K,V> r = q.right;
Index<K,V> t = idx;
for (;;) {
if (r != null) {// 在索引層,往右遍歷
Node<K,V> n = r.node;
// compare before deletion check avoids needing recheck
int c = key.compareTo(n.key);
if (n.value == null) {
if (!q.unlink(r))
break;
r = q.right;
continue;
}
if (c > 0) {
q = r;
r = r.right;
continue;
}
}
if (j == insertionLevel) { // 可以連結索引節點idx
if (t.indexesDeletedNode()) { // 索引節點的Node節點被標記為待刪除節點
findNode(key); // 推進刪除索引節點及其Node節點
return; // 不用增加索引節點了
}
if (!q.link(r, t)) // 將第insertionLevel層索引節點連結進來
//刪除失敗原因:同findPredecessor種的q.unlink(r)
break; // 連結失敗,重新開始
if (--insertionLevel == 0) { // 準備連結索引節點idx的下一層索引
if (t.indexesDeletedNode()) // 返回前再次檢查索引節點t是否被標記為待刪除節點,以進行清理工作
findNode(key);
return; // insertionLevel==0表明已經完成索引節點idx的連結
}
}
if (--j >= insertionLevel && j < indexLevel) // 已連結過索引節點idx的第insertionLevel+1層
t = t.down; // 準備連結索引節點idx的第insertionLevel層
q = q.down; // 準備連結索引節點idx的下一層索引
r = q.right;
}
}
}
// 查詢相關key的Node,沒有則返回null。
// 遍歷Node單鏈表中,清除待刪除節點
// 在doPut、doRemove、findNear等都包含這樣的遍歷清除操作
// 不能共享這樣的清除程式碼,因為增刪改查需要獲取Node連結串列順序的快照暫存到自身的區域性變數,用於併發
// 一些操作依賴此方法刪除Node節點
private Node<K,V> findNode(Comparable<? super K> key) {
for (;;) {
Node<K,V> b = findPredecessor(key); // 獲取key的前驅節點
Node<K,V> n = b.next;
for (;;) {
if (n == null)
return null;
Node<K,V> f = n.next;
// 不是連續的b——>n——>f快照,不能進行後續解連結待刪除節點
//變化情況:在b後增加了新節點或刪除了其next節點或增加了刪除標記節點以刪除b,
if (n != b.next)
break; // 重新開始
Object v = n.value;
if (v == null) { // n為待刪除節點
n.helpDelete(b, f); // 推進刪除節點n
break; // 重新開始
}
// 返回的前驅節點b為待刪除節點
// 這裡不能直接刪除b,因為不知道b的前驅節點,只能重新開始,呼叫findPredecessor返回更前的節點
if (v == n || b.value == null) // b is deleted
break; // 重新開始
int c = key.compareTo(n.key);
if (c == 0)
return n;
if (c < 0)
return null;
b = n;
n = f;
}
}
}
刪
步驟:
1)查詢比key小的前驅節點;
2)從前驅節點開始,遍歷底層Node單鏈表,若不存在相關的key-value對,則返回null;否則確定Node節點位置;假設確定刪除的節點為n,b為其前驅節點,f為其後繼節點:
3)用CAS方式將其value置為null;
作用:
a)其他增刪改查執行緒遍歷到該節點時,都知其為待刪除節點;
b)其他增刪改查執行緒可通過CAS修改n的next,推進n的刪除。
該步失敗,只需要重試即可。
4)在其後新增刪除標記節點marker;
作用:
a)新增節點不能插入到n的後面;
b)基於CAS方式的刪除,可以避免刪除上的錯誤。
5)刪除該節點及其刪除標記節點;
第4)5)步可能會失敗,原因在於其他操作執行緒在遍歷過程中知n的value為null後,會幫助推進刪除n,這些幫助操作可以保證沒有一個執行緒因為刪除執行緒的刪除操作而阻塞。
另外,刪除需要確保b——>n——>marker——>f連結關係才能進行。
6)用findPredecessor刪除其索引節點;
7)若最上層索引層無Node索引節點,則嘗試降低索引層次。
8)返回其舊value
public V remove(Object key) {
return doRemove(key, null);
}
// 主要的刪除Node節點及其索引節點方法
final V doRemove(Object okey, Object value) {
Comparable<? super K> key = comparable(okey);
for (;;) {
Node<K,V> b = findPredecessor(key);
Node<K,V> n = b.next;
for (;;) {
if (n == null)
return null;
Node<K,V> f = n.next;
if (n != b.next) // inconsistent read
break;
Object v = n.value;
if (v == null) { // n is deleted
n.helpDelete(b, f);
break;
}
if (v == n || b.value == null) // b is deleted
break;
int c = key.compareTo(n.key);
if (c < 0)
return null;
if (c > 0) {
b = n;
n = f;
continue;
}
if (value != null && !value.equals(v))
return null;
if (!n.casValue(v, null)) // 將value置為null
break;
if (!n.appendMarker(f) || !b.casNext(n, f)) // 新增刪除標記節點,刪除該節點與其刪除標記節點
findNode(key); // 失敗則用findNode繼續刪除
else {
findPredecessor(key); // 用findPredecessor刪除其索引節點
if (head.right == null)
tryReduceLevel();
}
return (V)v;
}
}
}
// 當最上三層索引層無Node索引節點,則將最上層索引層去掉。
// 採用CAS方式去掉後,如果其又擁有Node索引節點,則嘗試將其恢復。
private void tryReduceLevel() {
HeadIndex<K,V> h = head;
HeadIndex<K,V> d;
HeadIndex<K,V> e;
if (h.level > 3 &&
(d = (HeadIndex<K,V>)h.down) != null &&
(e = (HeadIndex<K,V>)d.down) != null &&
e.right == null &&
d.right == null &&
h.right == null &&
casHead(h, d) && // try to set
h.right != null) // recheck
casHead(d, h); // try to backout
}
查
步驟:
1)查詢比key小的前驅節點;
2)從前驅節點開始,遍歷底層Node單鏈表,若不存在相關的key-value對,則返回null;否則獲取Node節點;
3)判斷其value是否為null,如果不為null,則直接返回value;否則重試,原因是其被標記為待刪除節點。
public V get(Object key) {
return doGet(key);
}
private V doGet(Object okey) {
Comparable<? super K> key = comparable(okey);
/*
* Loop needed here and elsewhere in case value field goes
* null just as it is about to be returned, in which case we
* lost a race with a deletion, so must retry.
*/
for (;;) {
Node<K,V> n = findNode(key);
if (n == null)
return null;
Object v = n.value;
if (v != null)
return (V)v;
}
}
迭代器
基礎迭代器為Iter,從first節點開始遍歷Node單鏈表層:
abstract class Iter<T> implements Iterator<T> {
/** the last node returned by next() */
Node<K,V> lastReturned;
/** the next node to return from next(); */
Node<K,V> next;
/** Cache of next value field to maintain weak consistency */
V nextValue;
/** Initializes ascending iterator for entire range. */
Iter() {
for (;;) {
next = findFirst();
if (next == null)
break;
Object x = next.value;
if (x != null && x != next) {
nextValue = (V) x;
break;
}
}
}
public final boolean hasNext() {
return next != null;
}
/** Advances next to higher entry. */
final void advance() {
if (next == null)
throw new NoSuchElementException();
lastReturned = next;
for (;;) {
next = next.next; // next
if (next == null)
break;
Object x = next.value;
if (x != null && x != next) {
nextValue = (V) x;
break;
}
}
}
public void remove() {
Node<K,V> l = lastReturned;
if (l == null)
throw new IllegalStateException();
// It would not be worth all of the overhead to directly
// unlink from here. Using remove is fast enough.
ConcurrentSkipListMap.this.remove(l.key);
lastReturned = null;
}
}
特性
如何實現增刪改查併發執行緒安全?
1. 採用無鎖併發方式;
2.基於final、volatile、CAS方式助於併發;
3. 刪除Node節點方式:將該節點的value置為null,且在其後插入一個刪除標記節點,即:b——>n——>marker——>f(假設n為待刪除Node節點,marker為其刪除標記節點,b為n的前驅節點,f為n的刪除前的後繼節點)這種方式解決了4個問題:
1)與Node插入可併發進行,因為n後為marker標記節點,肯定不會在n後插入新的Node節點;
2)與Node修改可併發進行,因為n的value為null,修改執行緒對n的CAS修改肯定是失敗的;
3)與Node讀可併發進行,因為n的value為null,即使讀執行緒匹配到n,也返回的value為null,而在Map中返回null即代表key-value對不存在,n正在刪除,所以也表明不存在,儘管不是嚴格意義上的;
4)所有操作在遍歷Node單鏈表時,可根據以上鍊接關係,推進刪除n和marker。