J.U.C 之 ConcurrentSkipListMap
到目前為止,我們在Java世界裡看到了兩種實現key-value的資料結構:Hash、TreeMap,這兩種資料結構各自都有著優缺點。
- Hash表:插入、查詢最快,為O(1);如使用連結串列實現則可實現無鎖;資料有序化需要顯式的排序操作。
- 紅黑樹:插入、查詢為O(logn),但常數項較小;無鎖實現的複雜性很高,一般需要加鎖;資料天然有序。 然而,這次介紹第三種實現key-value的資料結構:SkipList。SkipList有著不低於紅黑樹的效率,但是其原理和實現的複雜度要比紅黑樹簡單多了。
SkipList
什麼是SkipList?Skip List ,稱之為跳錶,它是一種可以替代平衡樹的資料結構,其資料元素預設按照key值升序,天然有序。Skip list讓已排序的資料分佈在多層連結串列中,以0-1隨機數決定一個資料的向上攀升與否,通過“空間來換取時間”的一個演演算法,在每個節點中增加了向前的指標,在插入、刪除、查詢時可以忽略一些不可能涉及到的結點,從而提高了效率。
我們先看一個簡單的連結串列,如下:
如果我們需要查詢9、21、30,則需要比較次數為3 + 6 + 8 = 17 次,那麼有沒有優化方案呢?有!我們將該連結串列中的某些元素提煉出來作為一個比較“索引”,如下:
我們先與這些索引進行比較來決定下一個元素是往右還是下走,由於存在“索引”的緣故,導致在檢索的時候會大大減少比較的次數。當然元素不是很多,很難體現出優勢,當元素足夠多的時候,這種索引結構就會大顯身手。
SkipList的特性
SkipList具備如下特性:
- 由很多層結構組成,level是通過一定的概率隨機產生的
- 每一層都是一個有序的連結串列,預設是升序,也可以根據建立對映時所提供的Comparator進行排序,具體取決於使用的構造方法
- 最底層(Level 1)的連結串列包含所有元素
- 如果一個元素出現在Level i 的連結串列中,則它在Level i 之下的連結串列也都會出現
- 每個節點包含兩個指標,一個指向同一連結串列中的下一個元素,一個指向下面一層的元素
我們將上圖再做一些擴充套件就可以變成一個典型的SkipList結構了
SkipList的查詢
SkipListd的查詢演演算法較為簡單,對於上面我們我們要查詢元素21,其過程如下:
- 比較3,大於3,往後找(9),
- 比9大,繼續往後找(25),但是比25小,則從9的下一層開始找(16)
- 16的後面節點依然為25,則繼續從16的下一層找
- 找到21
紅色虛線代表路徑。
SkipList的插入
SkipList的插入操作主要包括:
- 查詢合適的位置。這裡需要明確一點就是在確認新節點要佔據的層次K時,採用丟硬幣的方式,完全隨機。如果佔據的層次K大於連結串列的層次,則重新申請新的層,否則插入指定層次
- 申請新的節點
- 調整指標
假定我們要插入的元素為23,經過查詢可以確認她是位於25前,9、16、21後。當然需要考慮申請的層次K。
如果層次K > 3
需要申請新層次(Level 4)
如果層次 K = 2
直接在Level 2 層插入即可
這裡會涉及到以個演演算法:通過丟硬幣決定層次K,該演演算法我們通過後面ConcurrentSkipListMap原始碼來分析。還有一個需要注意的地方就是,在K層插入元素後,需要確保所有小於K層的層次都應該出現新節點。
SkipList的刪除
刪除節點和插入節點思路基本一致:找到節點,刪除節點,調整指標。
比如刪除節點9,如下:
ConcurrentSkipListMap
通過上面我們知道SkipList採用空間換時間的演演算法,其插入和查詢的效率O(logn),其效率不低於紅黑樹,但是其原理和實現的複雜度要比紅黑樹簡單多了。一般來說會操作連結串列List,就會對SkipList毫無壓力。
ConcurrentSkipListMap其內部採用SkipLis資料結構實現。為了實現SkipList,ConcurrentSkipListMap提供了三個內部類來構建這樣的連結串列結構:Node、Index、HeadIndex。其中Node表示最底層的單連結串列有序節點、Index表示為基於Node的索引層,HeadIndex用來維護索引層次。到這裡我們可以這樣說ConcurrentSkipListMap是通過HeadIndex維護索引層次,通過Index從最上層開始往下層查詢,一步一步縮小查詢範圍,最後到達最底層Node時,就只需要比較很小一部分資料了。在JDK中的關係如下圖:
Node
static final class Node<K,V> {
final K key;
volatile Object value;
volatile ConcurrentSkipListMap.Node<K,V> next;
/** 省略些許程式碼 */
}
複製程式碼
Node的結構和一般的單連結串列毫無區別,key-value和一個指向下一個節點的next。
Index
static class Index<K,V> {
final ConcurrentSkipListMap.Node<K,V> node;
final ConcurrentSkipListMap.Index<K,V> down;
volatile ConcurrentSkipListMap.Index<K,V> right;
/** 省略些許程式碼 */
}
複製程式碼
Index提供了一個基於Node節點的索引Node,一個指向下一個Index的right,一個指向下層的down節點。
HeadIndex
static final class HeadIndex<K,V> extends Index<K,V> {
final int level; //索引層,從1開始,Node單連結串列層為0
HeadIndex(Node<K,V> node,Index<K,V> down,V> right,int level) {
super(node,down,right);
this.level = level;
}
}
複製程式碼
HeadIndex內部就一個level來定義層級。
ConcurrentSkipListMap提供了四個建構函式,每個建構函式都會呼叫initialize()方法進行初始化工作。
final void initialize() {
keySet = null;
entrySet = null;
values = null;
descendingMap = null;
randomSeed = seedGenerator.nextInt() | 0x0100; // ensure nonzero
head = new ConcurrentSkipListMap.HeadIndex<K,V>(new ConcurrentSkipListMap.Node<K,V>(null,BASE_HEADER,null),null,1);
}
複製程式碼
注意,initialize()方法不僅僅只在建構函式中被呼叫,如clone,clear、readObject時都會呼叫該方法進行初始化步驟。這裡需要注意randomSeed的初始化。
private transient int randomSeed;
randomSeed = seedGenerator.nextInt() | 0x0100; // ensure nonzero
複製程式碼
randomSeed一個簡單的隨機數生成器(在後面介紹)。
put操作
CoucurrentSkipListMap提供了put()方法用於將指定值與此對映中的指定鍵關聯。原始碼如下:
public V put(K key,V value) {
if (value == null)
throw new NullPointerException();
return doPut(key,value,false);
}
複製程式碼
首先判斷value如果為null,則丟擲NullPointerException,否則呼叫doPut方法,其實如果各位看過JDK的原始碼的話,應該對這樣的操作很熟悉了,JDK原始碼裡面很多方法都是先做一些必要性的驗證後,然後通過呼叫do**()方法進行真正的操作。
doPut()方法內容較多,我們分步分析。
private V doPut(K key,V value,boolean onlyIfAbsent) {
Node<K,V> z; // added node
if (key == null)
throw new NullPointerException();
// 比較器
Comparator<? super K> cmp = comparator;
outer: for (;;) {
for (Node<K,V> b = findPredecessor(key,cmp),n = b.next; ; ) {
/** 省略程式碼 */
複製程式碼
doPut()方法有三個引數,除了key,value外還有一個boolean型別的onlyIfAbsent,該引數作用與如果存在當前key時,該做何動作。當onlyIfAbsent為false時,替換value,為true時,則返回該value。用程式碼解釋為:
if (!map.containsKey(key))
return map.put(key,value);
else
return map.get(key);
複製程式碼
首先判斷key是否為null,如果為null,則丟擲NullPointerException,從這裡我們可以確認ConcurrentSkipList是不支援key或者value為null的。然後呼叫findPredecessor()方法,傳入key來確認位置。findPredecessor()方法其實就是確認key要插入的位置。
private Node<K,V> findPredecessor(Object key,Comparator<? super K> cmp) {
if (key == null)
throw new NullPointerException(); // don't postpone errors
for (;;) {
// 從head節點開始,head是level最高階別的headIndex
for (Index<K,V> q = head,r = q.right,d;;) {
// r != null,表示該節點右邊還有節點,需要比較
if (r != null) {
Node<K,V> n = r.node;
K k = n.key;
// value == null,表示該節點已經被刪除了
// 通過unlink()方法過濾掉該節點
if (n.value == null) {
//刪掉r節點
if (!q.unlink(r))
break; // restart
r = q.right; // reread r
continue;
}
// value != null,節點存在
// 如果key 大於r節點的key 則往前進一步
if (cpr(cmp,key,k) > 0) {
q = r;
r = r.right;
continue;
}
}
// 到達最右邊,如果dowm == null,表示指標已經達到最下層了,直接返回該節點
if ((d = q.down) == null)
return q.node;
q = d;
r = d.right;
}
}
}
複製程式碼
findPredecessor()方法意思非常明確:尋找前輩。從最高層的headIndex開始向右一步一步比較,直到right為null或者右邊節點的Node的key大於當前key為止,然後再向下尋找,依次重複該過程,直到down為null為止,即找到了前輩,看返回的結果注意是Node,不是Item,所以插入的位置應該是最底層的Node連結串列。
在這個過程中ConcurrentSkipListMap賦予了該方法一個其他的功能,就是通過判斷節點的value是否為null,如果為null,表示該節點已經被刪除了,通過呼叫unlink()方法刪除該節點。
final boolean unlink(Index<K,V> succ) {
return node.value != null && casRight(succ,succ.right);
}
複製程式碼
刪除節點過程非常簡單,更改下right指標即可。
通過findPredecessor()找到前輩節點後,做什麼呢?看下面:
for (Node<K,n = b.next;;) {
// 前輩節點的next != null
if (n != null) {
Object v; int c;
Node<K,V> f = n.next;
// 不一致讀,主要原因是併發,有節點捷足先登
if (n != b.next) // inconsistent read
break;
// n.value == null,該節點已經被刪除了
if ((v = n.value) == null) { // n is deleted
n.helpDelete(b,f);
break;
}
// 前輩節點b已經被刪除
if (b.value == null || v == n) // b is deleted
break;
// 節點大於,往前移
if ((c = cpr(cmp,n.key)) > 0) {
b = n;
n = f;
continue;
}
// c == 0 表示,找到一個key相等的節點,根據onlyIfAbsent引數來做判斷
// onlyIfAbsent ==false,則通過casValue,替換value
// onlyIfAbsent == true,返回該value
if (c == 0) {
if (onlyIfAbsent || n.casValue(v,value)) {
@SuppressWarnings("unchecked") V vv = (V)v;
return vv;
}
break; // restart if lost race to replace value
}
// else c < 0; fall through
}
// 將key-value包裝成一個node,插入
z = new Node<K,V>(key,n);
if (!b.casNext(n,z))
break; // restart if lost race to append to b
break outer;
}
複製程式碼
找到合適的位置後,就是在該位置插入節點咯。插入節點的過程比較簡單,就是將key-value包裝成一個Node,然後通過casNext()方法加入到連結串列當中。當然是插入之前需要進行一系列的校驗工作。
在最下層插入節點後,下一步工作是什麼?新建索引。前面博主提過,在插入節點的時候,會根據採用拋硬幣的方式來決定新節點所插入的層次,由於存在併發的可能,ConcurrentSkipListMap採用ThreadLocalRandom來生成隨機數。如下:
int rnd = ThreadLocalRandom.nextSecondarySeed();
複製程式碼
拋硬幣決定層次的思想很簡單,就是通過拋硬幣如果硬幣為正面則層次level + 1 ,否則停止,如下:
// 拋硬幣決定層次
while (((rnd >>>= 1) & 1) != 0)
++level;
複製程式碼
在闡述SkipList插入節點的時候說明瞭,決定的層次level會分為兩種情況進行處理,一是如果層次level大於最大的層次話則需要新增一層,否則就在相應層次以及小於該level的層次進行節點新增處理。
level <= headIndex.level
// 如果決定的層次level比最高層次head.level小,直接生成最高層次的index
// 由於需要確認每一層次的down,所以需要從最下層依次往上生成
if (level <= (max = h.level)) {
for (int i = 1; i <= level; ++i)
idx = new ConcurrentSkipListMap.Index<K,V>(z,idx,null);
}
複製程式碼
從底層開始,小於level的每一層都初始化一個index,每次的node都指向新加入的node,down指向下一層的item,右側next全部為null。整個處理過程非常簡單:為小於level的每一層初始化一個index,然後加入到原來的index鏈條中去。
level > headIndex.level
// leve > head.level 則新增一層
else { // try to grow by one level
// 新增一層
level = max + 1;
// 初始化 level個item節點
@SuppressWarnings("unchecked")
ConcurrentSkipListMap.Index<K,V>[] idxs =
(ConcurrentSkipListMap.Index<K,V>[])new ConcurrentSkipListMap.Index<?,?>[level+1];
for (int i = 1; i <= level; ++i)
idxs[i] = idx = new ConcurrentSkipListMap.Index<K,null);
//
for (;;) {
h = head;
int oldLevel = h.level;
// 層次擴大了,需要重新開始(有新執行緒節點加入)
if (level <= oldLevel) // lost race to add level
break;
// 新的頭結點HeadIndex
ConcurrentSkipListMap.HeadIndex<K,V> newh = h;
ConcurrentSkipListMap.Node<K,V> oldbase = h.node;
// 生成新的HeadIndex節點,該HeadIndex指向新增層次
for (int j = oldLevel+1; j <= level; ++j)
newh = new ConcurrentSkipListMap.HeadIndex<K,V>(oldbase,newh,idxs[j],j);
// HeadIndex CAS替換
if (casHead(h,newh)) {
h = newh;
idx = idxs[level = oldLevel];
break;
}
}
複製程式碼
當拋硬幣決定的level大於最大層次level時,需要新增一層進行處理。處理邏輯如下:
- 初始化一個對應的index陣列,大小為level + 1,然後為每個單位都建立一個index,箇中引數為:Node為新增的Z,down為下一層index,right為null
- 通過for迴圈來進行擴容操作。從最高層進行處理,新增一個HeadIndex,箇中引數:節點Node,down都為最高層的Node和HeadIndex,right為剛剛建立的對應層次的index,level為相對應的層次level。最後通過CAS把當前的head與新加入層的head進行替換。 通過上面步驟我們發現,儘管已經找到了前輩節點,也將node插入了,也確定確定了層次並生成了相應的Index,但是並沒有將這些Index插入到相應的層次當中,所以下面的程式碼就是將index插入到相對應的層當中。
// 從插入的層次level開始
splice: for (int insertionLevel = level;;) {
int j = h.level;
// 從headIndex開始
for (ConcurrentSkipListMap.Index<K,V> q = h,t = idx;;) {
if (q == null || t == null)
break splice;
// r != null;這裡是找到相應層次的插入節點位置,注意這裡只橫向找
if (r != null) {
ConcurrentSkipListMap.Node<K,V> n = r.node;
int c = cpr(cmp,n.key);
// n.value == null ,解除關係,r右移
if (n.value == null) {
if (!q.unlink(r))
break;
r = q.right;
continue;
}
// key > n.key 右移
if (c > 0) {
q = r;
r = r.right;
continue;
}
}
// 上面找到節點要插入的位置,這裡就插入
// 當前層是最頂層
if (j == insertionLevel) {
// 建立聯絡
if (!q.link(r,t))
break; // restart
if (t.node.value == null) {
findNode(key);
break splice;
}
// 標誌的插入層 -- ,如果== 0 ,表示已經到底了,插入完畢,退出迴圈
if (--insertionLevel == 0)
break splice;
}
// 上面節點已經插入完畢了,插入下一個節點
if (--j >= insertionLevel && j < level)
t = t.down;
q = q.down;
r = q.right;
}
}
複製程式碼
這段程式碼分為兩部分看,一部分是找到相應層次的該節點插入的位置,第二部分在該位置插入,然後下移。
至此,ConcurrentSkipListMap的put操作到此就結束了。程式碼量有點兒多,這裡總結下:
- 首先通過findPredecessor()方法找到前輩節點Node
- 根據返回的前輩節點以及key-value,新建Node節點,同時通過CAS設定next
- 設定節點Node,再設定索引節點。採取拋硬幣方式決定層次,如果所決定的層次大於現存的最大層次,則新增一層,然後新建一個Item連結串列。
- 最後,將新建的Item連結串列插入到SkipList結構中。
get操作
相比於put操作 ,get操作會簡單很多,其過程其實就只相當於put操作的第一步:
private V doGet(Object key) {
if (key == null)
throw new NullPointerException();
Comparator<? super K> cmp = comparator;
outer: for (;;) {
for (ConcurrentSkipListMap.Node<K,n = b.next;;) {
Object v; int c;
if (n == null)
break outer;
ConcurrentSkipListMap.Node<K,V> f = n.next;
if (n != b.next) // inconsistent read
break;
if ((v = n.value) == null) { // n is deleted
n.helpDelete(b,f);
break;
}
if (b.value == null || v == n) // b is deleted
break;
if ((c = cpr(cmp,n.key)) == 0) {
@SuppressWarnings("unchecked") V vv = (V)v;
return vv;
}
if (c < 0)
break outer;
b = n;
n = f;
}
}
return null;
}
複製程式碼
與put操作第一步相似,首先呼叫findPredecessor()方法找到前輩節點,然後順著right一直往右找即可,同時在這個過程中同樣承擔了一個刪除value為null的節點的職責。
remove操作
remove操作為刪除指定key節點,如下:
public V remove(Object key) {
return doRemove(key,null);
}
複製程式碼
直接呼叫doRemove()方法,這裡remove有兩個引數,一個是key,另外一個是value,所以doRemove方法即提供remove key,也提供同時滿足key-value。
final V doRemove(Object key,Object value) {
if (key == null)
throw new NullPointerException();
Comparator<? super K> cmp = comparator;
outer: for (;;) {
for (ConcurrentSkipListMap.Node<K,n = b.next;;) {
Object v; int c;
if (n == null)
break outer;
ConcurrentSkipListMap.Node<K,V> f = n.next;
// 不一致讀,重新開始
if (n != b.next) // inconsistent read
break;
// n節點已刪除
if ((v = n.value) == null) { // n is deleted
n.helpDelete(b,f);
break;
}
// b節點已刪除
if (b.value == null || v == n) // b is deleted
break;
if ((c = cpr(cmp,n.key)) < 0)
break outer;
// 右移
if (c > 0) {
b = n;
n = f;
continue;
}
/*
* 找到節點
*/
// value != null 表示需要同時校驗key-value值
if (value != null && !value.equals(v))
break outer;
// CAS替換value
if (!n.casValue(v,null))
break;
if (!n.appendMarker(f) || !b.casNext(n,f))
findNode(key); // retry via findNode
else {
// 清理節點
findPredecessor(key,cmp); // clean index
// head.right == null表示該層已經沒有節點,刪掉該層
if (head.right == null)
tryReduceLevel();
}
@SuppressWarnings("unchecked") V vv = (V)v;
return vv;
}
}
return null;
}
複製程式碼
呼叫findPredecessor()方法找到前輩節點,然後通過右移,然後比較,找到後利用CAS把value替換為null,然後判斷該節點是不是這層唯一的index,如果是的話,呼叫tryReduceLevel()方法把這層幹掉,完成刪除。
其實從這裡可以看出,remove方法僅僅是把Node的value設定null,並沒有真正刪除該節點Node,其實從上面的put操作、get操作我們可以看出,他們在尋找節點的時候都會判斷節點的value是否為null,如果為null,則呼叫unLink()方法取消關聯關係,如下:
if (n.value == null) {
if (!q.unlink(r))
break; // restart
r = q.right; // reread r
continue;
}
複製程式碼
size操作
ConcurrentSkipListMap的size()操作和ConcurrentHashMap不同,它並沒有維護一個全域性變數來統計元素的個數,所以每次呼叫該方法的時候都需要去遍歷。
public int size() {
long count = 0;
for (Node<K,V> n = findFirst(); n != null; n = n.next) {
if (n.getValidValue() != null)
++count;
}
return (count >= Integer.MAX_VALUE) ? Integer.MAX_VALUE : (int) count;
}
複製程式碼
呼叫findFirst()方法找到第一個Node,然後利用node的next去統計。最後返回統計資料,最多能返回Integer.MAX_VALUE。注意這裡在執行緒併發下是安全的。