1. 程式人生 > >高併發學習筆記(九)

高併發學習筆記(九)

今年企業對Java開發的市場需求,你看懂了嗎? >>>   

一、ThreadLocal原始碼分析

1.什麼是ThreadLocal

    ThreadLocal類是Java提供的一個執行緒私有的讀寫變數,可以理解為在Java的堆空間上專門劃出一小塊空間用於存放執行緒私有的資料或物件,執行緒之間是訪問不到對方的ThreadLocal變數。下面看個用法示例:

/**
* ThreadLocal的用法示例
* Created by bzhang on 2019/3/21.
*/
public class TestThreadLocal {
      private ThreadLocal<String> local = new ThreadLocal<>();    //直接new,即可建立
      public String get(){
            return local.get();     //獲取ThreadLocal中的資料
      }
      public void put(String data){
            local.set(data);  //往ThreadLocal中存放資料
      }

      public void remove(){
            local.remove();   //刪除ThreadLocal中的資料
      }

      public static void main(String[] args) {
            TestThreadLocal test = new TestThreadLocal();
            //在新建執行緒中存放資料
            new Thread(new Runnable() {
                  @Override
                  public void run() {
                        test.put("gun");
                        System.out.println(Thread.currentThread().getName()+":"+test.get());
                        try {
                              TimeUnit.MILLISECONDS.sleep(400);
                        } catch (InterruptedException e) {
                              e.printStackTrace();
                        }
                        test.remove();
                        System.out.println(Thread.currentThread().getName()+":"+test.get());
                  }
            }).start();

            try {
                  TimeUnit.MILLISECONDS.sleep(200);
            } catch (InterruptedException e) {
                  e.printStackTrace();
            }
            //在主執行緒中獲取local中的資料
            System.out.println(Thread.currentThread().getName()+":"+test.get());
      }
}

//結果:
Thread-0:gun
main:null
Thread-0:null

    ThreadLocal的用法十分簡單,就像一個容器一樣,可以存放資料(set),返回資料(get),可以刪除資料(remove),唯一不太一樣的地方就是這個ThreadLocal與執行緒掛鉤,在不同執行緒中得到的結果是不一樣的。

    在分析原始碼之前,先看看ThreadLocal的的結構及引用關係,大致如下圖:

    其中ThreadLocalMap是threadLocal的一個內部類,而Entry又是 ThreadLocalMap的一個內部類,Entry用於儲存一個ThreadLocal對應的資料(同一執行緒下),從這裡我們就可以看出ThreadLocalMap和hashmap十分類似,ThreadLocalMap也是一個Map容器,存放著以threadLocal為key的鍵值對(hashmap的key可以自定義,而ThreadLocalMap的key只能是ThreadLocal),並且ThreadLocalMap的底層資料結構是用陣列實現的(hashmap則是用陣列+連結串列)。

    下面通過原始碼來看看ThreadLocalMap的set是如何實現的:

//ThreadLocal的構造器,可以看出,啥也沒做
public ThreadLocal() {
}

//往ThreadLocal中設定值
public void set(T value) {
    Thread t = Thread.currentThread();    //獲取當前執行緒的引用
    ThreadLocalMap map = getMap(t);    //獲取t的對應ThreadLocalMap
    if (map != null)
        //若是map已經存在,則直接新增鍵值對,後面再講
        map.set(this, value);
    else
        createMap(t, value);    //若原先沒有map,則以t和value新建對應的Map容器
}

//返回t執行緒對應的threadLocals,初始threadLocals為null
ThreadLocalMap getMap(Thread t) {
    return t.threadLocals;
}

//新建t執行緒對應的ThreadLocalMap
void createMap(Thread t, T firstValue) {
    t.threadLocals = new ThreadLocalMap(this, firstValue);
}

    這裡使用到了一個ThreadLocal的內部類,createMap時新建了一個ThreadLocalMap物件。

//ThreadLocalMap的建構函式,建立了容量為16的Entry型別的table陣列
    //將執行緒要存放的資料以鍵值對的形式存放在table陣列中,其中鍵為ThreadLocal物件本身,值為要存放的資料
    ThreadLocalMap(ThreadLocal<?> firstKey, Object firstValue) {
        table = new Entry[INITIAL_CAPACITY];
        //確定鍵值對在陣列中的位置,通過雜湊確定在table中位置
        int i = firstKey.threadLocalHashCode & (INITIAL_CAPACITY - 1);    
        table[i] = new Entry(firstKey, firstValue);
        size = 1;    //資料個數+1
        setThreshold(INITIAL_CAPACITY);    //設定陣列擴容的臨界值
    }


    //Java中將引用分為強,軟,弱,虛,Entry繼承了WeakReference類
    //表示Entry物件都將是弱引用物件,而被弱引用關聯的物件只能生存到下一次垃圾收集之前,
    //即當垃圾收集器工作時,無論當前記憶體是否足夠都會回收掉只被弱引用關聯的物件
        //Entry是個鍵值對儲存物件,value用於存放值,k則是ThreadLocal本身
    static class Entry extends WeakReference<ThreadLocal<?>> {
        
        Object value;    //存放值

        Entry(ThreadLocal<?> k, Object v) {
            super(k);
            value = v;
        }
    }

    //table陣列的初始大小
    private static final int INITIAL_CAPACITY = 16;

        //Entry陣列
    private Entry[] table;

        //table中資料的個數
    private int size = 0;


    //table陣列下一次擴容的臨界值,預設為0
    private int threshold; // Default to 0

        //設定table陣列需要擴容的臨界值,當陣列使用了threshold的容量,就開始擴容
    private void setThreshold(int len) {
        threshold = len * 2 / 3;
    }

//用於生成ThreadLocal的hashcode
private final int threadLocalHashCode = nextHashCode();


//生成下一個hashcode的方法
private static int nextHashCode() {
    return nextHashCode.getAndAdd(HASH_INCREMENT);
}


//下一個hashcode的自增量
private static final int HASH_INCREMENT = 0x61c88647;


//原子型別,用於生成下一個ThreadLocal的hashcode
private static AtomicInteger nextHashCode =
    new AtomicInteger();

    瞭解了set的過程,在來看看get的過程:

public T get() {
    Thread t = Thread.currentThread();    //獲取當前執行緒
    ThreadLocalMap map = getMap(t);    //獲取執行緒對應的threadLocals
    //判斷map是否為null,即是否設定過threadLocals
    if (map != null) {
        ThreadLocalMap.Entry e = map.getEntry(this);
        //判斷e是否為null,即table陣列中是否存在ThreadLocalMap對應的entry
        if (e != null) {
            @SuppressWarnings("unchecked")
            T result = (T)e.value;    //存在返回值
            return result;
        }
    }
    return setInitialValue();    //還未初始化ThreadLocalMap,執行setInitialValue方法
}

//從table陣列中取出對應的Entry
private Entry getEntry(ThreadLocal<?> key) {
    int i = key.threadLocalHashCode & (table.length - 1);    //計算對應的在table陣列中的位置
    Entry e = table[i];
    //判斷table陣列中i是否存在資料,且是不是同一個ThreadLocal
    if (e != null && e.get() == key)
        return e;
    else
        return getEntryAfterMiss(key, i, e);    //未找到對應的Entry物件時呼叫該方法
}

//遍歷table陣列,查詢與key對應的entry
private Entry getEntryAfterMiss(ThreadLocal<?> key, int i, Entry e) {
    Entry[] tab = table;
    int len = tab.length;

    while (e != null) {
        ThreadLocal<?> k = e.get();    //獲取e中對應的ThreadLocal物件
        if (k == key)    //key與e中的key對應時,說明找到了對應的entry,直接返回
            return e;
        if (k == null)    //當e的鍵為null,說明這個entry已經失效了,則需要清除
            expungeStaleEntry(i);
        else        //e的鍵不為null,但又不是key,則查詢陣列下個索引
            i = nextIndex(i, len);
        e = tab[i];
    }
    return null;    不存在對應的entry,返回null
}

//清除失效的entry中的資料,並更新table陣列,且將table陣列中無效的entry對應的索引位置賦為null
private int expungeStaleEntry(int staleSlot) {
    Entry[] tab = table;
    int len = tab.length;

    tab[staleSlot].value = null;    //清除value
    tab[staleSlot] = null;    //清除陣列中的entry
    size--;    //數量-1

    // Rehash until we encounter null
    Entry e;
    int i;
    //迴圈遍歷table陣列,清除已失效資料,更新未失效資料再陣列中的位置
    for (i = nextIndex(staleSlot, len);(e = tab[i]) != null;i = nextIndex(i, len)) {
        ThreadLocal<?> k = e.get();
        if (k == null) {
            e.value = null;
            tab[i] = null;
            size--;
        } else {
            int h = k.threadLocalHashCode & (len - 1);    //計算未失效資料的心索引
            if (h != i) {    //判斷未失效資料的索引是否改變,改變就更新索引,未改變不處理
                tab[i] = null;
                while (tab[h] != null)    //新索引中有資料,就往後移動一位,知道找到索引中沒有資料的位置
                    h = nextIndex(h, len);
                tab[h] = e;
            }
        }
    }
    return i;
}

//判斷下一個陣列索引是否越界,越界就返回陣列的0索引
private static int nextIndex(int i, int len) {
    return ((i + 1 < len) ? i + 1 : 0);
}

//ThreadLocalMap尚未初始化就呼叫ThreadLocal中get方法,就觸發呼叫該方法
//該方法初始化一個ThreadLocalMap,ThreadLocalMap中僅有一個以當前ThreadLocal為鍵,值為null的Entry資料
private T setInitialValue() {
    T value = initialValue();    //獲取初始預設值,預設為null
    Thread t = Thread.currentThread();
    ThreadLocalMap map = getMap(t);    //獲取當前執行緒對應的ThreadLocalMap
    if (map != null)
        map.set(this, value);
    else
        createMap(t, value);
    return value;
}

//預設get不到值,返回null,可重寫該方法
protected T initialValue() {
    return null;
}

    知道了get,再回看set方法中的map.set方法:

//執行緒已有對應的ThreadLocalMap,則更新其value值
private void set(ThreadLocal<?> key, Object value) {

    Entry[] tab = table;
    int len = tab.length;
    int i = key.threadLocalHashCode & (len-1);    //獲取在table陣列中的索引值

    //當i位置的entry不為null時迴圈遍歷table陣列,
    //即存在hash衝突,那麼就要往後移動1位去在嘗試插入,若還是衝突,繼續後移,直到找到一個空位置
    //若i位置的entry==null,表示該threadlocal可以直接往table陣列中插入(沒有hash衝突)
    for (Entry e = tab[i]; e != null; e = tab[i = nextIndex(i, len)]) {
        ThreadLocal<?> k = e.get();
        
        //找到對應的entry,更新value即可
        //這裡表示要插入的key已經存在,直接更新value就行了
        if (k == key) {
            e.value = value;
            return;
        }
        //查詢到的entry中k為null,說明該Entry關聯的ThreadLocal被回收(key是弱引用,很可能失效)
        if (k == null) {
            replaceStaleEntry(key, value, i);    //整理table陣列
            return;
        }
    }
    
    //建立要插入table陣列的新Entry
    tab[i] = new Entry(key, value);
    int sz = ++size;    //數量+1
    //新增資料後,若陣列中的資料個數達到擴容臨界值,
    //則要進行陣列擴容,且所有資料重新進行hash雜湊計算索引位置
    if (!cleanSomeSlots(i, sz) && sz >= threshold)
        rehash();
}

//整理table
private void replaceStaleEntry(ThreadLocal<?> key, Object value,
                               int staleSlot) {
    Entry[] tab = table;
    int len = tab.length;
    Entry e;
        
        int slotToExpunge = staleSlot;
    //查詢table中的一個索引,該索引具有如下特點:
    //該索引的前一個索引位置上沒有entry(entry==null),且該索引對應的entry的key為null
    //往前查詢失效的Entry,找到的話就用slotToExpunge記錄
    for (int i = prevIndex(staleSlot, len); (e = tab[i]) != null; i = prevIndex(i, len))
        if (e.get() == null)
            slotToExpunge = i;
    
    //往後鍵為key的Entry
    for (int i = nextIndex(staleSlot, len); (e = tab[i]) != null; i = nextIndex(i, len)) {
        ThreadLocal<?> k = e.get();    //獲取entry的鍵值
        if (k == key) {    //若與要找的key相同
            e.value = value;    //更新value值
            
            //交換staleSlot(key對應的原索引位置)和i(查詢到key現在所在的索引位置),減少下次查詢路勁長度
            tab[i] = tab[staleSlot];
            tab[staleSlot] = e;

            //判斷失效的entry對應的索引位置slotToExpunge和staleSlot是否相等,若相等就令staleSlot=i
            //判斷清理工作從哪個索引開始
            if (slotToExpunge == staleSlot)
                slotToExpunge = i;
            //清理table陣列中對應entry的key為null的索引
            cleanSomeSlots(expungeStaleEntry(slotToExpunge), len);
            return;
        }
                //若entry已失效,記錄索引
        if (k == null && slotToExpunge == staleSlot)
            slotToExpunge = i;
    }
    
    //如果在前面的查詢並整理table中沒有找到 我們要設定資料的 ThreadLocal,那麼就需要構造一個新的Entry
    tab[staleSlot].value = null;
    tab[staleSlot] = new Entry(key, value);

    if (slotToExpunge != staleSlot)
        cleanSomeSlots(expungeStaleEntry(slotToExpunge), len);
}

//獲取前一個索引,0的前一個為陣列的最後一個索引
private static int prevIndex(int i, int len) {
    return ((i - 1 >= 0) ? i - 1 : len - 1);
}

//清理Entry
private boolean cleanSomeSlots(int i, int n) {
    boolean removed = false;    //是否移除的標誌位
    Entry[] tab = table;
    int len = tab.length;
    do {
        i = nextIndex(i, len);    //獲取下一個索引
        Entry e = tab[i];
        //判斷e是否為null,且e是否有鍵值
        //當e不為null,且e的鍵為null,說明有要清除的entry
        if (e != null && e.get() == null) {
            n = len;
            removed = true;
            i = expungeStaleEntry(i);
        }
    } while ( (n >>>= 1) != 0);    //n減小一半
    return removed;
}

    最後再來看看remove方法:

//remove方法比較簡單,就是查詢ThreadLocal在ThreadLocalMap的table陣列中是否存在
//若是存在就將對應的entry的key置為null,然後清理table陣列即可
public void remove() {
    ThreadLocalMap m = getMap(Thread.currentThread());
    if (m != null)
        m.remove(this);
}

private void remove(ThreadLocal<?> key) {
    Entry[] tab = table;
    int len = tab.length;
    int i = key.threadLocalHashCode & (len-1);
    //查詢陣列中是否有鍵為key的entry
    for (Entry e = tab[i];
         e != null;
         e = tab[i = nextIndex(i, len)]) {
        if (e.get() == key) {
            e.clear();    //將查詢到的entry的key置為null
            expungeStaleEntry(i);    //清理table陣列
            return;
        }
    }
}

public void clear() {
    this.referent = null;
}

    OK,ThreadLocal的原始碼就分析到這,接下來講一下ThreadLocal在高併發情形下的注意點:

    在使用ThreadLocal時,一定要線上程結束時執行remove方法回收資源,否則會有記憶體洩漏的風險。因為在多執行緒環境下,區分執行緒是否相同,只能通過判斷執行緒的pid/cid。一個執行緒在結束後,若不回收ThreadLocal中的資源,作業系統在啟動新的執行緒任務時可能會複用之前的執行緒(使用一些執行緒池時就是如此),導致該執行緒的ThreadLocal中的資源沒有被回收,而出現記憶體洩漏。因此用完一定記得