1. 程式人生 > >Netty原始碼分析-- ThreadLocal分析(九)

Netty原始碼分析-- ThreadLocal分析(九)




        不知道畫成這樣大家能不能理解,大體的意思就是Thread中有一個成員變數是  ThreadLocal.ThreadLocalMap threadLocals = null ;  ThreadLocalMap  是 ThreadLocal的內部類,這個Map的結構就像HashMap,是一個Entry陣列,每個Entry的key是一個ThreadLocal。

        簡單說了一下,我們來看原始碼,就看下這個 ThreadLocalMap。


       這裡有一個很重要的點就是 這裡,構造的這個Entry 是繼承於 WeakReference ,  Entry的key(ThreadLocal例項)作為WeakReference的referent。

       ok,說到這可能有小夥伴對WeakReference 不是很熟悉,WeakReference 就是一個弱引用。沒有任何其他strong reference(強引用)指向的時候, 如果這時發生GC, 那麼這個物件就會被回收,不論當前的記憶體空間是否足夠,這個物件都會被回收。如果對GC不是很瞭解,可以去看我的這一篇 JVM記憶體模型與垃圾回收 。 如果想要更加形象的理解這個 WeakReference, 可以去看  關於Java中的WeakReference。



      註釋說明這個建構函式是一個懶載入,僅僅有需要的時候才會建立。 建立一個初始化16個長度的Entry陣列 。 然後計算第一個key的陣列下標。 建立一個Entry物件放入table。  設定大小為 1 

      擴容大小門檻 設定擴容大小為 16 * 2/3 = 10 ,當有10個元素時發生擴容。

      這裡主要是這個下標的計算,就是用ThreadLocal雜湊值 & (16-1) 得到下標,相當於 ThreadLocal雜湊值 % 16, 效能更好。





        這個東西叫 魔數:與斐波那契數列和黃金分割點有關。用於雜湊均勻,減少hash衝突

      那麼也就是說,後面的每一個ThreadLocal的雜湊值都是前一個ThreadLocal雜湊值 + 0x61c88647。


 public T get() {
        Thread t = Thread.currentThread(); // 獲取當前執行緒
        ThreadLocalMap map = getMap(t); // 獲取當前執行緒的成員變數 threadLocals 
        if (map != null) { 
            ThreadLocalMap.Entry e = map.getEntry(this); // 獲取當前ThreadLocal的Entry ,至於怎麼獲取的,我們一會再深入探討
            if (e != null) {
                T result = (T)e.value;
                return result;  // 取entry的value返回
        return setInitialValue(); // 初始化map, 先說這個
private T setInitialValue() {
    T value = initialValue(); // 為子類提供重寫方法
    Thread t = Thread.currentThread();
    ThreadLocalMap map = getMap(t);
    if (map != null)
        map.set(this, value); // 如果map不是空,那麼將當前的這個ThreadLocal放入map
        createMap(t, value);  // 如果是空,那麼建立一個map, 這個就是建立一個ThreadLocalMap,前面已經說了。
    return value;


    public void set(T value) {
        Thread t = Thread.currentThread();
        ThreadLocalMap map = getMap(t);
        if (map != null)
            map.set(this, value);
            createMap(t, value);


 1 private void set(ThreadLocal<?> key, Object value) {
 3             // We don't use a fast path as with get() because it is at
 4             // least as common to use set() to create new entries as
 5             // it is to replace existing ones, in which case, a fast
 6             // path would fail more often than not.
 8             Entry[] tab = table; // 當前Entry陣列
 9             int len = tab.length; // 長度
10             int i = key.threadLocalHashCode & (len-1); // 計算下標
11             // 下面就是解決hash衝突的方式,叫線性探測法。如果當前槽沒有元素,直接插入元素;如果當前槽有元素,則向後尋找第一個為null的槽,放置該元素
12             for (Entry e = tab[i];
13                  e != null;
14                  e = tab[i = nextIndex(i, len)]) { // 迴圈,等於null跳出,這個nextIndex方法就是找一個元素,也就是說,從當前ThreadLocal所在的下標開始往後迴圈。
15                 ThreadLocal<?> k = e.get(); // 獲取ThreadLocal
17                 if (k == key) { // 如果正好迴圈到了與傳入的ThreadLocal相等的那個例項,直接設定值,然後返回。
18                     e.value = value;
19                     return;
20                 }
22                 if (k == null) { // 如果遇到一個Entry的key是空的,那麼將執行replaceStaleEntry(可以理解為這個位置已經無用了,可以被替換了,這裡就是替換邏輯)。 為啥是空的呢?因為執行了remove()方法,remove()方法後面看。
23                     replaceStaleEntry(key, value, i);
24                     return;
25                 }
26             }
28             tab[i] = new Entry(key, value); // 遇到為null的Entry,則跳出到這裡,則建立一個Entry
29             int sz = ++size; // 大小 ++ 
30             if (!cleanSomeSlots(i, sz) && sz >= threshold) // 檢視是否需要擴容,先執行 cleanSomeSlots 清理過期資料,如果清理完成仍然達到threshold(閾值),則進行rehash擴容。
31                 rehash(); 
32       }

       接下來就是看一下這個 replaceStaleEntry 方法,這是我覺得最難理解的部分之一,我們儘可能的去模擬一個最複雜的過程, 我簡單畫個圖



   假設上面這個是初始結構圖, 我們按照上面的set方法中的線性探測法會找到E4,也就是下面這樣,也就是找到了一個已經無效的Entry,就是第23行程式碼,然後需要進行替換 replaceStaleEntry 




 1         private void replaceStaleEntry(ThreadLocal<?> key, Object value,
 2                                        int staleSlot) {
 3             Entry[] tab = table;
 4             int len = tab.length; // 陣列長度 按照我上面的初始化圖這裡是8
 5             Entry e;
 7             // Back up to check for prior stale entry in current run.
 8             // We clean out whole runs at a time to avoid continual
 9             // incremental rehashing due to garbage collector freeing
10             // up refs in bunches (i.e., whenever the collector runs).
11             int slotToExpunge = staleSlot; // slotToExpunge = staleSlot = 4
// 往前遍歷,找到第一個無效的Entry,將 slotToExpunge 設定為當前無效未被回收的索引, 走到這裡,我們就假定E2已經變成無效的Entry,如圖(三) 12 for (int i = prevIndex(staleSlot, len); 13 (e = tab[i]) != null; 14 i = prevIndex(i, len)) 15 if (e.get() == null) 16 slotToExpunge = i; 17 18 // Find either the key or trailing null slot of run, whichever 19 // occurs first 20 for (int i = nextIndex(staleSlot, len); // 向後遍歷,找到第一個無效或者相等的位置 21 (e = tab[i]) != null; 22 i = nextIndex(i, len)) { 23 ThreadLocal<?> k = e.get(); 24 25 // If we find key, then we need to swap it 26 // with the stale entry to maintain hash table order. 27 // The newly stale slot, or any other stale slot 28 // encountered above it, can then be sent to expungeStaleEntry 29 // to remove or rehash all of the other entries in run. 30 if (k == key) { // 如果當前的k 和 傳入的 key(ThreadLocal例項)是相等的 31 e.value = value; // 那麼直接覆蓋新值 32 33 tab[i] = tab[staleSlot]; // 然後將兩個位置的Entry互換, 如圖(三) 34 tab[staleSlot] = e; 35 36 // Start expunge at preceding stale entry if it exists 37 if (slotToExpunge == staleSlot) // 當前slotToExpunge = 2 staleSlot = 4 不會進入 如果待回收的槽索引就是當前的無效索引,則設定 slotToExpunge = i 38 slotToExpunge = i; 39 cleanSomeSlots(expungeStaleEntry(slotToExpunge), len); 40 return; 41 } 42 43 // If we didn't find stale entry on backward scan, the 44 // first stale entry seen while scanning for key is the 45 // first still present in the run. 46 if (k == null && slotToExpunge == staleSlot) // 如果 "k==null && 待回收的位置索引就是當前的無效索引",則設定 slotToExpunge = i, 也就是說往前遍歷並未找到一個無效的Entry 47 slotToExpunge = i; 48 } 49 50 // If key not found, put new entry in stale slot 51 tab[staleSlot].value = null; // 置空當前的這個entry的value 幫助GC 52 tab[staleSlot] = new Entry(key, value); // 建立一個新的Entry放在這個位置上 53 54 // If there are any other stale entries in run, expunge them 55 if (slotToExpunge != staleSlot) // 如果不相等,那麼就清理無效的entry 56 cleanSomeSlots(expungeStaleEntry(slotToExpunge), len); 57 }




 我們再假如 replaceStaleEntry 第 37 行程式碼是相等的,那麼將進入一個 cleanSomeSlots(expungeStaleEntry(slotToExpunge), len) 這段程式碼




 1         private int expungeStaleEntry(int staleSlot) {
 2             Entry[] tab = table;
 3             int len = tab.length;
 5             // expunge entry at staleSlot
 6             tab[staleSlot].value = null; // 將2的位置的entry的value設為null 幫助GC
 7             tab[staleSlot] = null; // 將2槽位設定為null
 8             size--;
10             // Rehash until we encounter null
11             Entry e;
12             int i;
13             for (i = nextIndex(staleSlot, len); // 從 3 開始往後遍歷,
14                  (e = tab[i]) != null;
15                  i = nextIndex(i, len)) {
16                 ThreadLocal<?> k = e.get();
17                 if (k == null) {   // 找到無效的entry 設定為 null
18                     e.value = null;
19                     tab[i] = null;
20                     size--;
21                 } else {
22                     int h = k.threadLocalHashCode & (len - 1); // 找到有有效的設定計算hash值,這裡我們如圖(四), 我們假設key3的hash 其實應該在 E2 ,之所以在E3是因為hash衝突。
23                     if (h != i) { // 那麼現在 h = 2 , i = 3 不相等
24                         tab[i] = null; // 將 3 的entry 設定為空
26                         // Unlike Knuth 6.4 Algorithm R, we must scan until
27                         // null because multiple entries could have been stale.
28                         while (tab[h] != null) // 不斷遍歷,直到找到一個entry = null 的位置
29                             h = nextIndex(h, len);
30                         tab[h] = e; // 將原本 3 位置的entry 設定到 新位置上面,執行完成後,變成如圖(五)
31                     }
32                 }
33             }
34             return i;
35         }




 1         private boolean cleanSomeSlots(int i, int n) {
 2             boolean removed = false;
 3             Entry[] tab = table;
 4             int len = tab.length;
 5             do {
 6                 i = nextIndex(i, len);
 7                 Entry e = tab[i];
 8                 if (e != null && e.get() == null) {
 9                     n = len;
10                     removed = true;
11                     i = expungeStaleEntry(i); // 嘗試回收或者rehash"i到i之後的第一個Entry為null的索引(即返回值)之間"的Entry資料, 就是執行幾次上面的那個過程,進行資料整理。
12                 }
13             } while ( (n >>>= 1) != 0); // 這裡是重點, 因為我的n=8,不斷右移 1位  ,直到成為0 , 也就是 8/2/2/2 移動三次  ,當第四次的時候 n = 0,就會結束迴圈
14             return removed;
15         }


1         private Entry getEntry(ThreadLocal<?> key) {
2             int i = key.threadLocalHashCode & (table.length - 1); // 獲取hash槽位
3             Entry e = table[i];
4             if (e != null && e.get() == key) // 判斷當前是否已經無效,如果有效並且key相等直接返回,沒有發生hash衝突
5                 return e;
6             else
7                 return getEntryAfterMiss(key, i, e); // 無效或者發生hash衝突進入這個方法
8         }
 1         private Entry getEntryAfterMiss(ThreadLocal<?> key, int i, Entry e) {
 2             Entry[] tab = table;
 3             int len = tab.length;
 5             while (e != null) { 
 6                 ThreadLocal<?> k = e.get();
 7                 if (k == key) // 線性探測, 如果key相等,表示找到了,直接返回
 8                     return e;
 9                 if (k == null) // key無效, 則整理資料
10                     expungeStaleEntry(i);
11                 else
12                     i = nextIndex(i, len); // 迴圈下一個
13                 e = tab[i];
14             }
15             return null; // 沒找到返回null
16         }


 1         private void remove(ThreadLocal<?> key) {
 2             Entry[] tab = table;
 3             int len = tab.length;
 4             int i = key.threadLocalHashCode & (len-1);
 5             for (Entry e = tab[i];
 6                  e != null;
 7                  e = tab[i = nextIndex(i, len)]) {
 8                 if (e.get() == key) {  // 線性探測 找到那個相等的key對應的位置
 9                     e.clear(); //ThreadLocal置為null除 ,也就是entry的key = null
10                     expungeStaleEntry(i); // 再次嘗試整理資料
11                     return;
12                 }
13             }
14         }



1         private void rehash() {
2             expungeStaleEntries(); // 回收所有無效的entry, 如果回收後達不到下面的條件,則無需擴容
4             // Use lower threshold for doubling to avoid hysteresis
5             if (size >= threshold - threshold / 4) // 擴容條件 len * 2/3 * 3/4 = len * 1/2 = 8
6                 resize();
7         }
 1         private void resize() {
 2             Entry[] oldTab = table;
 3             int oldLen = oldTab.length;
 4             int newLen = oldLen * 2; // 擴容到 2 倍大小
 5             Entry[] newTab = new Entry[newLen]; // 新new陣列
 6             int count = 0;
 8             for (int j = 0; j < oldLen; ++j) {
 9                 Entry e = oldTab[j];
10                 if (e != null) {
11                     ThreadLocal<?> k = e.get();
12                     if (k == null) {
13                         e.value = null; // Help the GC
14                     } else {
15                         int h = k.threadLocalHashCode & (newLen - 1); // 把老的資料重新根據新的長度進行計算下標
16                         while (newTab[h] != null)
17                             h = nextIndex(h, newLen);
18                         newTab[h] = e; // 賦值
19                         count++;
20                     }
21                 }
22             }
24             setThreshold(newLen); // 設定新的閾值
25             size = count;
26             table = newTab;
27         }

