高併發學習筆記(九)
一、ThreadLocal原始碼分析
1.什麼是ThreadLocal
ThreadLocal類是Java提供的一個執行緒私有的讀寫變數,可以理解為在Java的堆空間上專門劃出一小塊空間用於存放執行緒私有的資料或物件,執行緒之間是訪問不到對方的ThreadLocal變數。下面看個用法示例:
/** * ThreadLocal的用法示例 * Created by bzhang on 2019/3/21. */ public class TestThreadLocal { private ThreadLocal<String> local = new ThreadLocal<>(); //直接new,即可建立 public String get(){ return local.get(); //獲取ThreadLocal中的資料 } public void put(String data){ local.set(data); //往ThreadLocal中存放資料 } public void remove(){ local.remove(); //刪除ThreadLocal中的資料 } public static void main(String[] args) { TestThreadLocal test = new TestThreadLocal(); //在新建執行緒中存放資料 new Thread(new Runnable() { @Override public void run() { test.put("gun"); System.out.println(Thread.currentThread().getName()+":"+test.get()); try { TimeUnit.MILLISECONDS.sleep(400); } catch (InterruptedException e) { e.printStackTrace(); } test.remove(); System.out.println(Thread.currentThread().getName()+":"+test.get()); } }).start(); try { TimeUnit.MILLISECONDS.sleep(200); } catch (InterruptedException e) { e.printStackTrace(); } //在主執行緒中獲取local中的資料 System.out.println(Thread.currentThread().getName()+":"+test.get()); } } //結果: Thread-0:gun main:null Thread-0:null
ThreadLocal的用法十分簡單,就像一個容器一樣,可以存放資料(set),返回資料(get),可以刪除資料(remove),唯一不太一樣的地方就是這個ThreadLocal與執行緒掛鉤,在不同執行緒中得到的結果是不一樣的。
在分析原始碼之前,先看看ThreadLocal的的結構及引用關係,大致如下圖:
其中ThreadLocalMap是threadLocal的一個內部類,而Entry又是 ThreadLocalMap的一個內部類,Entry用於儲存一個ThreadLocal對應的資料(同一執行緒下),從這裡我們就可以看出ThreadLocalMap和hashmap十分類似,ThreadLocalMap也是一個Map容器,存放著以threadLocal為key的鍵值對(hashmap的key可以自定義,而ThreadLocalMap的key只能是ThreadLocal),並且ThreadLocalMap的底層資料結構是用陣列實現的(hashmap則是用陣列+連結串列)。
下面通過原始碼來看看ThreadLocalMap的set是如何實現的:
//ThreadLocal的構造器,可以看出,啥也沒做 public ThreadLocal() { } //往ThreadLocal中設定值 public void set(T value) { Thread t = Thread.currentThread(); //獲取當前執行緒的引用 ThreadLocalMap map = getMap(t); //獲取t的對應ThreadLocalMap if (map != null) //若是map已經存在,則直接新增鍵值對,後面再講 map.set(this, value); else createMap(t, value); //若原先沒有map,則以t和value新建對應的Map容器 } //返回t執行緒對應的threadLocals,初始threadLocals為null ThreadLocalMap getMap(Thread t) { return t.threadLocals; } //新建t執行緒對應的ThreadLocalMap void createMap(Thread t, T firstValue) { t.threadLocals = new ThreadLocalMap(this, firstValue); }
這裡使用到了一個ThreadLocal的內部類,createMap時新建了一個ThreadLocalMap物件。
//ThreadLocalMap的建構函式,建立了容量為16的Entry型別的table陣列
//將執行緒要存放的資料以鍵值對的形式存放在table陣列中,其中鍵為ThreadLocal物件本身,值為要存放的資料
ThreadLocalMap(ThreadLocal<?> firstKey, Object firstValue) {
table = new Entry[INITIAL_CAPACITY];
//確定鍵值對在陣列中的位置,通過雜湊確定在table中位置
int i = firstKey.threadLocalHashCode & (INITIAL_CAPACITY - 1);
table[i] = new Entry(firstKey, firstValue);
size = 1; //資料個數+1
setThreshold(INITIAL_CAPACITY); //設定陣列擴容的臨界值
}
//Java中將引用分為強,軟,弱,虛,Entry繼承了WeakReference類
//表示Entry物件都將是弱引用物件,而被弱引用關聯的物件只能生存到下一次垃圾收集之前,
//即當垃圾收集器工作時,無論當前記憶體是否足夠都會回收掉只被弱引用關聯的物件
//Entry是個鍵值對儲存物件,value用於存放值,k則是ThreadLocal本身
static class Entry extends WeakReference<ThreadLocal<?>> {
Object value; //存放值
Entry(ThreadLocal<?> k, Object v) {
super(k);
value = v;
}
}
//table陣列的初始大小
private static final int INITIAL_CAPACITY = 16;
//Entry陣列
private Entry[] table;
//table中資料的個數
private int size = 0;
//table陣列下一次擴容的臨界值,預設為0
private int threshold; // Default to 0
//設定table陣列需要擴容的臨界值,當陣列使用了threshold的容量,就開始擴容
private void setThreshold(int len) {
threshold = len * 2 / 3;
}
//用於生成ThreadLocal的hashcode
private final int threadLocalHashCode = nextHashCode();
//生成下一個hashcode的方法
private static int nextHashCode() {
return nextHashCode.getAndAdd(HASH_INCREMENT);
}
//下一個hashcode的自增量
private static final int HASH_INCREMENT = 0x61c88647;
//原子型別,用於生成下一個ThreadLocal的hashcode
private static AtomicInteger nextHashCode =
new AtomicInteger();
瞭解了set的過程,在來看看get的過程:
public T get() {
Thread t = Thread.currentThread(); //獲取當前執行緒
ThreadLocalMap map = getMap(t); //獲取執行緒對應的threadLocals
//判斷map是否為null,即是否設定過threadLocals
if (map != null) {
ThreadLocalMap.Entry e = map.getEntry(this);
//判斷e是否為null,即table陣列中是否存在ThreadLocalMap對應的entry
if (e != null) {
@SuppressWarnings("unchecked")
T result = (T)e.value; //存在返回值
return result;
}
}
return setInitialValue(); //還未初始化ThreadLocalMap,執行setInitialValue方法
}
//從table陣列中取出對應的Entry
private Entry getEntry(ThreadLocal<?> key) {
int i = key.threadLocalHashCode & (table.length - 1); //計算對應的在table陣列中的位置
Entry e = table[i];
//判斷table陣列中i是否存在資料,且是不是同一個ThreadLocal
if (e != null && e.get() == key)
return e;
else
return getEntryAfterMiss(key, i, e); //未找到對應的Entry物件時呼叫該方法
}
//遍歷table陣列,查詢與key對應的entry
private Entry getEntryAfterMiss(ThreadLocal<?> key, int i, Entry e) {
Entry[] tab = table;
int len = tab.length;
while (e != null) {
ThreadLocal<?> k = e.get(); //獲取e中對應的ThreadLocal物件
if (k == key) //key與e中的key對應時,說明找到了對應的entry,直接返回
return e;
if (k == null) //當e的鍵為null,說明這個entry已經失效了,則需要清除
expungeStaleEntry(i);
else //e的鍵不為null,但又不是key,則查詢陣列下個索引
i = nextIndex(i, len);
e = tab[i];
}
return null; 不存在對應的entry,返回null
}
//清除失效的entry中的資料,並更新table陣列,且將table陣列中無效的entry對應的索引位置賦為null
private int expungeStaleEntry(int staleSlot) {
Entry[] tab = table;
int len = tab.length;
tab[staleSlot].value = null; //清除value
tab[staleSlot] = null; //清除陣列中的entry
size--; //數量-1
// Rehash until we encounter null
Entry e;
int i;
//迴圈遍歷table陣列,清除已失效資料,更新未失效資料再陣列中的位置
for (i = nextIndex(staleSlot, len);(e = tab[i]) != null;i = nextIndex(i, len)) {
ThreadLocal<?> k = e.get();
if (k == null) {
e.value = null;
tab[i] = null;
size--;
} else {
int h = k.threadLocalHashCode & (len - 1); //計算未失效資料的心索引
if (h != i) { //判斷未失效資料的索引是否改變,改變就更新索引,未改變不處理
tab[i] = null;
while (tab[h] != null) //新索引中有資料,就往後移動一位,知道找到索引中沒有資料的位置
h = nextIndex(h, len);
tab[h] = e;
}
}
}
return i;
}
//判斷下一個陣列索引是否越界,越界就返回陣列的0索引
private static int nextIndex(int i, int len) {
return ((i + 1 < len) ? i + 1 : 0);
}
//ThreadLocalMap尚未初始化就呼叫ThreadLocal中get方法,就觸發呼叫該方法
//該方法初始化一個ThreadLocalMap,ThreadLocalMap中僅有一個以當前ThreadLocal為鍵,值為null的Entry資料
private T setInitialValue() {
T value = initialValue(); //獲取初始預設值,預設為null
Thread t = Thread.currentThread();
ThreadLocalMap map = getMap(t); //獲取當前執行緒對應的ThreadLocalMap
if (map != null)
map.set(this, value);
else
createMap(t, value);
return value;
}
//預設get不到值,返回null,可重寫該方法
protected T initialValue() {
return null;
}
知道了get,再回看set方法中的map.set方法:
//執行緒已有對應的ThreadLocalMap,則更新其value值
private void set(ThreadLocal<?> key, Object value) {
Entry[] tab = table;
int len = tab.length;
int i = key.threadLocalHashCode & (len-1); //獲取在table陣列中的索引值
//當i位置的entry不為null時迴圈遍歷table陣列,
//即存在hash衝突,那麼就要往後移動1位去在嘗試插入,若還是衝突,繼續後移,直到找到一個空位置
//若i位置的entry==null,表示該threadlocal可以直接往table陣列中插入(沒有hash衝突)
for (Entry e = tab[i]; e != null; e = tab[i = nextIndex(i, len)]) {
ThreadLocal<?> k = e.get();
//找到對應的entry,更新value即可
//這裡表示要插入的key已經存在,直接更新value就行了
if (k == key) {
e.value = value;
return;
}
//查詢到的entry中k為null,說明該Entry關聯的ThreadLocal被回收(key是弱引用,很可能失效)
if (k == null) {
replaceStaleEntry(key, value, i); //整理table陣列
return;
}
}
//建立要插入table陣列的新Entry
tab[i] = new Entry(key, value);
int sz = ++size; //數量+1
//新增資料後,若陣列中的資料個數達到擴容臨界值,
//則要進行陣列擴容,且所有資料重新進行hash雜湊計算索引位置
if (!cleanSomeSlots(i, sz) && sz >= threshold)
rehash();
}
//整理table
private void replaceStaleEntry(ThreadLocal<?> key, Object value,
int staleSlot) {
Entry[] tab = table;
int len = tab.length;
Entry e;
int slotToExpunge = staleSlot;
//查詢table中的一個索引,該索引具有如下特點:
//該索引的前一個索引位置上沒有entry(entry==null),且該索引對應的entry的key為null
//往前查詢失效的Entry,找到的話就用slotToExpunge記錄
for (int i = prevIndex(staleSlot, len); (e = tab[i]) != null; i = prevIndex(i, len))
if (e.get() == null)
slotToExpunge = i;
//往後鍵為key的Entry
for (int i = nextIndex(staleSlot, len); (e = tab[i]) != null; i = nextIndex(i, len)) {
ThreadLocal<?> k = e.get(); //獲取entry的鍵值
if (k == key) { //若與要找的key相同
e.value = value; //更新value值
//交換staleSlot(key對應的原索引位置)和i(查詢到key現在所在的索引位置),減少下次查詢路勁長度
tab[i] = tab[staleSlot];
tab[staleSlot] = e;
//判斷失效的entry對應的索引位置slotToExpunge和staleSlot是否相等,若相等就令staleSlot=i
//判斷清理工作從哪個索引開始
if (slotToExpunge == staleSlot)
slotToExpunge = i;
//清理table陣列中對應entry的key為null的索引
cleanSomeSlots(expungeStaleEntry(slotToExpunge), len);
return;
}
//若entry已失效,記錄索引
if (k == null && slotToExpunge == staleSlot)
slotToExpunge = i;
}
//如果在前面的查詢並整理table中沒有找到 我們要設定資料的 ThreadLocal,那麼就需要構造一個新的Entry
tab[staleSlot].value = null;
tab[staleSlot] = new Entry(key, value);
if (slotToExpunge != staleSlot)
cleanSomeSlots(expungeStaleEntry(slotToExpunge), len);
}
//獲取前一個索引,0的前一個為陣列的最後一個索引
private static int prevIndex(int i, int len) {
return ((i - 1 >= 0) ? i - 1 : len - 1);
}
//清理Entry
private boolean cleanSomeSlots(int i, int n) {
boolean removed = false; //是否移除的標誌位
Entry[] tab = table;
int len = tab.length;
do {
i = nextIndex(i, len); //獲取下一個索引
Entry e = tab[i];
//判斷e是否為null,且e是否有鍵值
//當e不為null,且e的鍵為null,說明有要清除的entry
if (e != null && e.get() == null) {
n = len;
removed = true;
i = expungeStaleEntry(i);
}
} while ( (n >>>= 1) != 0); //n減小一半
return removed;
}
最後再來看看remove方法:
//remove方法比較簡單,就是查詢ThreadLocal在ThreadLocalMap的table陣列中是否存在
//若是存在就將對應的entry的key置為null,然後清理table陣列即可
public void remove() {
ThreadLocalMap m = getMap(Thread.currentThread());
if (m != null)
m.remove(this);
}
private void remove(ThreadLocal<?> key) {
Entry[] tab = table;
int len = tab.length;
int i = key.threadLocalHashCode & (len-1);
//查詢陣列中是否有鍵為key的entry
for (Entry e = tab[i];
e != null;
e = tab[i = nextIndex(i, len)]) {
if (e.get() == key) {
e.clear(); //將查詢到的entry的key置為null
expungeStaleEntry(i); //清理table陣列
return;
}
}
}
public void clear() {
this.referent = null;
}
OK,ThreadLocal的原始碼就分析到這,接下來講一下ThreadLocal在高併發情形下的注意點:
在使用ThreadLocal時,一定要線上程結束時執行remove方法回收資源,否則會有記憶體洩漏的風險。因為在多執行緒環境下,區分執行緒是否相同,只能通過判斷執行緒的pid/cid。一個執行緒在結束後,若不回收ThreadLocal中的資源,作業系統在啟動新的執行緒任務時可能會複用之前的執行緒(使用一些執行緒池時就是如此),導致該執行緒的ThreadLocal中的資源沒有被回收,而出現記憶體洩漏。因此用完一定記得