轉:ThreadLocal系列(三)-TransmittableThreadLocal的使用及原理解析
一、基本使用
首先,TTL是用來解決ITL解決不了的問題而誕生的,所以TTL一定是支援父執行緒的本地變數傳遞給子執行緒這種基本操作的,ITL也可以做到,但是前面有講過,ITL線上程池的模式下,就沒辦法再正確傳遞了,所以TTL做出的改進就是即便是線上程池模式下,也可以很好的將父執行緒本地變數傳遞下去,先來看個例子:
// 需要注意的是,使用TTL的時候,要想傳遞的值不出問題,執行緒池必須得用TTL加一層代理(下面會講這樣做的目的) private static ExecutorService executorService = TtlExecutors.getTtlExecutorService(Executors.newFixedThreadPool(2));private static ThreadLocal tl = new TransmittableThreadLocal<>(); //這裡採用TTL的實現 public static void main(String[] args) { new Thread(() -> { String mainThreadName = "main_01"; tl.set(1); executorService.execute(() -> { sleep(1L); System.out.println(String.format("本地變數改變之前(1), 父執行緒名稱-%s, 子執行緒名稱-%s, 變數值=%s", mainThreadName, Thread.currentThread().getName(), tl.get())); }); executorService.execute(() -> { sleep(1L); System.out.println(String.format("本地變數改變之前(1), 父執行緒名稱-%s, 子執行緒名稱-%s, 變數值=%s", mainThreadName, Thread.currentThread().getName(), tl.get())); }); executorService.execute(() -> { sleep(1L); System.out.println(String.format("本地變數改變之前(1), 父執行緒名稱-%s, 子執行緒名稱-%s, 變數值=%s", mainThreadName, Thread.currentThread().getName(), tl.get())); }); sleep(1L); //確保上面的會在tl.set執行之前執行 tl.set(2); // 等上面的執行緒池第一次啟用完了,父執行緒再給自己賦值 executorService.execute(() -> { sleep(1L); System.out.println(String.format("本地變數改變之後(2), 父執行緒名稱-%s, 子執行緒名稱-%s, 變數值=%s", mainThreadName, Thread.currentThread().getName(), tl.get())); }); executorService.execute(() -> { sleep(1L); System.out.println(String.format("本地變數改變之後(2), 父執行緒名稱-%s, 子執行緒名稱-%s, 變數值=%s", mainThreadName, Thread.currentThread().getName(), tl.get())); }); executorService.execute(() -> { sleep(1L); System.out.println(String.format("本地變數改變之後(2), 父執行緒名稱-%s, 子執行緒名稱-%s, 變數值=%s", mainThreadName, Thread.currentThread().getName(), tl.get())); }); System.out.println(String.format("執行緒名稱-%s, 變數值=%s", Thread.currentThread().getName(), tl.get())); }).start(); new Thread(() -> { String mainThreadName = "main_02"; tl.set(3); executorService.execute(() -> { sleep(1L); System.out.println(String.format("本地變數改變之前(3), 父執行緒名稱-%s, 子執行緒名稱-%s, 變數值=%s", mainThreadName, Thread.currentThread().getName(), tl.get())); }); executorService.execute(() -> { sleep(1L); System.out.println(String.format("本地變數改變之前(3), 父執行緒名稱-%s, 子執行緒名稱-%s, 變數值=%s", mainThreadName, Thread.currentThread().getName(), tl.get())); }); executorService.execute(() -> { sleep(1L); System.out.println(String.format("本地變數改變之前(3), 父執行緒名稱-%s, 子執行緒名稱-%s, 變數值=%s", mainThreadName, Thread.currentThread().getName(), tl.get())); }); sleep(1L); //確保上面的會在tl.set執行之前執行 tl.set(4); // 等上面的執行緒池第一次啟用完了,父執行緒再給自己賦值 executorService.execute(() -> { sleep(1L); System.out.println(String.format("本地變數改變之後(4), 父執行緒名稱-%s, 子執行緒名稱-%s, 變數值=%s", mainThreadName, Thread.currentThread().getName(), tl.get())); }); executorService.execute(() -> { sleep(1L); System.out.println(String.format("本地變數改變之後(4), 父執行緒名稱-%s, 子執行緒名稱-%s, 變數值=%s", mainThreadName, Thread.currentThread().getName(), tl.get())); }); executorService.execute(() -> { sleep(1L); System.out.println(String.format("本地變數改變之後(4), 父執行緒名稱-%s, 子執行緒名稱-%s, 變數值=%s", mainThreadName, Thread.currentThread().getName(), tl.get())); }); System.out.println(String.format("執行緒名稱-%s, 變數值=%s", Thread.currentThread().getName(), tl.get())); }).start(); } private static void sleep(long time) { try { Thread.sleep(time); } catch (InterruptedException e) { e.printStackTrace(); } }
執行結果:
執行緒名稱-Thread-2, 變數值=4 本地變數改變之前(3), 父執行緒名稱-main_02, 子執行緒名稱-pool-1-thread-1, 變數值=3 執行緒名稱-Thread-1, 變數值=2 本地變數改變之前(1), 父執行緒名稱-main_01, 子執行緒名稱-pool-1-thread-2, 變數值=1 本地變數改變之前(1), 父執行緒名稱-main_01, 子執行緒名稱-pool-1-thread-1, 變數值=1 本地變數改變之前(3), 父執行緒名稱-main_02, 子執行緒名稱-pool-1-thread-2, 變數值=3 本地變數改變之前(3), 父執行緒名稱-main_02, 子執行緒名稱-pool-1-thread-2, 變數值=3 本地變數改變之前(1), 父執行緒名稱-main_01, 子執行緒名稱-pool-1-thread-1, 變數值=1 本地變數改變之後(2), 父執行緒名稱-main_01, 子執行緒名稱-pool-1-thread-2, 變數值=2 本地變數改變之後(4), 父執行緒名稱-main_02, 子執行緒名稱-pool-1-thread-1, 變數值=4 本地變數改變之後(4), 父執行緒名稱-main_02, 子執行緒名稱-pool-1-thread-1, 變數值=4 本地變數改變之後(4), 父執行緒名稱-main_02, 子執行緒名稱-pool-1-thread-2, 變數值=4 本地變數改變之後(2), 父執行緒名稱-main_01, 子執行緒名稱-pool-1-thread-1, 變數值=2 本地變數改變之後(2), 父執行緒名稱-main_01, 子執行緒名稱-pool-1-thread-2, 變數值=2
程式有些囉嗦,為了說明問題,加了很多說明,但至少通過上面的例子,不難發現,兩個主執行緒裡都使用執行緒池非同步,而且值在主執行緒裡還發生過改變,測試結果展示一切正常,由此可以知道TTL在使用執行緒池的情況下,也可以很好的完成傳遞,而且不會發生錯亂。
那麼是不是對普通執行緒非同步也有這麼好的支撐呢?
改造下上面的測試程式碼:
private static ThreadLocal tl = new TransmittableThreadLocal<>(); public static void main(String[] args) { new Thread(() -> { String mainThreadName = "main_01"; tl.set(1); new Thread(() -> { sleep(1L); System.out.println(String.format("本地變數改變之前(1), 父執行緒名稱-%s, 子執行緒名稱-%s, 變數值=%s", mainThreadName, Thread.currentThread().getName(), tl.get())); }).start(); new Thread(() -> { sleep(1L); System.out.println(String.format("本地變數改變之前(1), 父執行緒名稱-%s, 子執行緒名稱-%s, 變數值=%s", mainThreadName, Thread.currentThread().getName(), tl.get())); }).start(); new Thread(() -> { sleep(1L); System.out.println(String.format("本地變數改變之前(1), 父執行緒名稱-%s, 子執行緒名稱-%s, 變數值=%s", mainThreadName, Thread.currentThread().getName(), tl.get())); }).start(); sleep(1L); //確保上面的會在tl.set執行之前執行 tl.set(2); // 等上面的執行緒池第一次啟用完了,父執行緒再給自己賦值 new Thread(() -> { sleep(1L); System.out.println(String.format("本地變數改變之後(2), 父執行緒名稱-%s, 子執行緒名稱-%s, 變數值=%s", mainThreadName, Thread.currentThread().getName(), tl.get())); }).start(); new Thread(() -> { sleep(1L); System.out.println(String.format("本地變數改變之後(2), 父執行緒名稱-%s, 子執行緒名稱-%s, 變數值=%s", mainThreadName, Thread.currentThread().getName(), tl.get())); }).start(); new Thread(() -> { sleep(1L); System.out.println(String.format("本地變數改變之後(2), 父執行緒名稱-%s, 子執行緒名稱-%s, 變數值=%s", mainThreadName, Thread.currentThread().getName(), tl.get())); }).start(); System.out.println(String.format("執行緒名稱-%s, 變數值=%s", Thread.currentThread().getName(), tl.get())); }).start(); new Thread(() -> { String mainThreadName = "main_02"; tl.set(3); new Thread(() -> { sleep(1L); System.out.println(String.format("本地變數改變之前(3), 父執行緒名稱-%s, 子執行緒名稱-%s, 變數值=%s", mainThreadName, Thread.currentThread().getName(), tl.get())); }).start(); new Thread(() -> { sleep(1L); System.out.println(String.format("本地變數改變之前(3), 父執行緒名稱-%s, 子執行緒名稱-%s, 變數值=%s", mainThreadName, Thread.currentThread().getName(), tl.get())); }).start(); new Thread(() -> { sleep(1L); System.out.println(String.format("本地變數改變之前(3), 父執行緒名稱-%s, 子執行緒名稱-%s, 變數值=%s", mainThreadName, Thread.currentThread().getName(), tl.get())); }).start(); sleep(1L); //確保上面的會在tl.set執行之前執行 tl.set(4); // 等上面的執行緒池第一次啟用完了,父執行緒再給自己賦值 new Thread(() -> { sleep(1L); System.out.println(String.format("本地變數改變之後(4), 父執行緒名稱-%s, 子執行緒名稱-%s, 變數值=%s", mainThreadName, Thread.currentThread().getName(), tl.get())); }).start(); new Thread(() -> { sleep(1L); System.out.println(String.format("本地變數改變之後(4), 父執行緒名稱-%s, 子執行緒名稱-%s, 變數值=%s", mainThreadName, Thread.currentThread().getName(), tl.get())); }).start(); new Thread(() -> { sleep(1L); System.out.println(String.format("本地變數改變之後(4), 父執行緒名稱-%s, 子執行緒名稱-%s, 變數值=%s", mainThreadName, Thread.currentThread().getName(), tl.get())); }).start(); System.out.println(String.format("執行緒名稱-%s, 變數值=%s", Thread.currentThread().getName(), tl.get())); }).start(); }
相比第一段測試程式碼,這一段的非同步全都是普通非同步,未採用執行緒池的方式進行非同步,看下執行結果:
本地變數改變之後(4), 父執行緒名稱-main_02, 子執行緒名稱-Thread-14, 變數值=4 本地變數改變之前(1), 父執行緒名稱-main_01, 子執行緒名稱-Thread-5, 變數值=1 執行緒名稱-Thread-1, 變數值=2 本地變數改變之前(1), 父執行緒名稱-main_01, 子執行緒名稱-Thread-3, 變數值=1 本地變數改變之後(2), 父執行緒名稱-main_01, 子執行緒名稱-Thread-11, 變數值=2 本地變數改變之前(3), 父執行緒名稱-main_02, 子執行緒名稱-Thread-6, 變數值=3 本地變數改變之後(4), 父執行緒名稱-main_02, 子執行緒名稱-Thread-12, 變數值=4 本地變數改變之後(4), 父執行緒名稱-main_02, 子執行緒名稱-Thread-10, 變數值=4 本地變數改變之前(3), 父執行緒名稱-main_02, 子執行緒名稱-Thread-8, 變數值=3 本地變數改變之前(3), 父執行緒名稱-main_02, 子執行緒名稱-Thread-4, 變數值=3 本地變數改變之前(1), 父執行緒名稱-main_01, 子執行緒名稱-Thread-7, 變數值=1 執行緒名稱-Thread-2, 變數值=4 本地變數改變之後(2), 父執行緒名稱-main_01, 子執行緒名稱-Thread-9, 變數值=2 本地變數改變之後(2), 父執行緒名稱-main_01, 子執行緒名稱-Thread-13, 變數值=2
ok,可以看到,達到了跟第一個測試一致的結果。
到這裡,通過上述兩個例子,TTL的基本使用,以及其解決的問題,我們已經有了初步的瞭解,下面我們來解析一下其內部原理,看看TTL是怎麼完成對ITL的優化的。
二、原理分析
先來看TTL裡面的幾個重要屬性及方法
TTL定義:
public class TransmittableThreadLocal extends InheritableThreadLocal
可以看到,TTL繼承了ITL,意味著TTL首先具備ITL的功能。
再來看看一個重要屬性holder:
/** * 這是一個ITL型別的物件,持有一個全域性的WeakMap(weakMap的key是弱引用,同TL一樣,也是為了解決記憶體洩漏的問題),裡面存放了TTL物件 * 並且重寫了initialValue和childValue方法,尤其是childValue,可以看到在即將非同步時父執行緒的屬性是直接作為初始化值賦值給子執行緒的本地變數物件(TTL)的 */ private static InheritableThreadLocal<Map<TransmittableThreadLocal<?>, ?>> holder = new InheritableThreadLocal<Map<TransmittableThreadLocal<?>, ?>>() { @Override protected Map<TransmittableThreadLocal<?>, ?> initialValue() { return new WeakHashMap<TransmittableThreadLocal<?>, Object>(); } @Override protected Map<TransmittableThreadLocal<?>, ?> childValue(Map<TransmittableThreadLocal<?>, ?> parentValue) { return new WeakHashMap<TransmittableThreadLocal<?>, Object>(parentValue); } };
再來看下set和get:
//下面的方法均屬於TTL類 @Override public final void set(T value) { super.set(value); if (null == value) removeValue(); else addValue(); } @Override public final T get() { T value = super.get(); if (null != value) addValue(); return value; } private void removeValue() { holder.get().remove(this); //從holder持有的map物件中移除 } private void addValue() { if (!holder.get().containsKey(this)) { holder.get().put(this, null); //從holder持有的map物件中新增 } }
TTL裡先了解上述的幾個方法及物件,可以看出,單純的使用TTL是達不到支援執行緒池本地變數的傳遞的,通過第一部分的例子,可以發現,除了要啟用TTL,還需要通過TtlExecutors.getTtlExecutorService包裝一下執行緒池才可以,那麼,下面就來看看在程式即將通過執行緒池非同步的時候,TTL幫我們做了哪些操作(這一部分是TTL支援執行緒池傳遞的核心部分):
首先開啟包裝類,看下execute方法在執行時做了些什麼。
// 此方法屬於執行緒池包裝類ExecutorTtlWrapper @Override public void execute(@Nonnull Runnable command) { executor.execute(TtlRunnable.get(command)); //這裡會把Rannable包裝一層,這是關鍵,有些邏輯處理,需要在run之前執行 } // 對應上面的get方法,返回一個TtlRunnable物件,屬於TtLRannable包裝類 @Nullable public static TtlRunnable get(@Nullable Runnable runnable) { return get(runnable, false, false); } // 對應上面的get方法 @Nullable public static TtlRunnable get(@Nullable Runnable runnable, boolean releaseTtlValueReferenceAfterRun, boolean idempotent) { if (null == runnable) return null; if (runnable instanceof TtlEnhanced) { // 若發現已經是目標型別了(說明已經被包裝過了)直接返回 // avoid redundant decoration, and ensure idempotency if (idempotent) return (TtlRunnable) runnable; else throw new IllegalStateException("Already TtlRunnable!"); } return new TtlRunnable(runnable, releaseTtlValueReferenceAfterRun); //最終初始化 } // 對應上面的TtlRunnable方法 private TtlRunnable(@Nonnull Runnable runnable, boolean releaseTtlValueReferenceAfterRun) { this.capturedRef = new AtomicReference<Object>(capture()); //這裡將捕獲後的父執行緒本地變數儲存在當前物件的capturedRef裡 this.runnable = runnable; this.releaseTtlValueReferenceAfterRun = releaseTtlValueReferenceAfterRun; } // 對應上面的capture方法,用於捕獲當前執行緒(父執行緒)裡的本地變數,此方法屬於TTL的靜態內部類Transmitter @Nonnull public static Object capture() { Map<TransmittableThreadLocal<?>, Object> captured = new HashMap<TransmittableThreadLocal<?>, Object>(); for (TransmittableThreadLocal<?> threadLocal : holder.get().keySet()) { // holder裡目前存放的k-v裡的key,就是需要傳給子執行緒的TTL物件 captured.put(threadLocal, threadLocal.copyValue()); } return captured; // 這裡返回的這個物件,就是當前將要使用執行緒池異步出來的子執行緒,所繼承的本地變數合集 } // 對應上面的copyValue,簡單的將TTL物件裡的值返回(結合之前的原始碼可以知道get方法其實就是獲取當前執行緒(父執行緒)裡的值,呼叫super.get方法) private T copyValue() { return copy(get()); } protected T copy(T parentValue) { return parentValue; }
結合上述程式碼,大致知道了線上程池非同步之前需要做的事情,其實就是把當前父執行緒裡的本地變數取出來,然後賦值給Rannable包裝類裡的capturedRef屬性,到此為止,下面會發生什麼,我們大致上可以猜出來了,接下來大概率會在run方法裡,將這些捕獲到的值賦給子執行緒的holder賦對應的TTL值,那麼我們繼續往下看Rannable包裝類裡的run方法是怎麼實現的:
//run方法屬於Rannable的包裝類TtlRunnable @Override public void run() { Object captured = capturedRef.get(); // 獲取由之前捕獲到的父執行緒變數集 if (captured == null || releaseTtlValueReferenceAfterRun && !capturedRef.compareAndSet(captured, null)) { throw new IllegalStateException("TTL value reference is released after run!"); } /** * 重點方法replay,此方法用來給當前子執行緒賦本地變數,返回的backup是此子執行緒原來就有的本地變數值(原生本地變數,下面會詳細講), * backup用於恢復資料(如果任務執行完畢,意味著該子執行緒會歸還執行緒池,那麼需要將其原生本地變數屬性恢復) */ Object backup = replay(captured); try { runnable.run(); // 執行非同步邏輯 } finally { restore(backup); // 結合上面對於replay的解釋,不難理解,這個方法就是用來恢復原有值的 } }
根據上述程式碼,我們看到了TTL在非同步任務執行前,會先進行賦值操作(就是拿著非同步發生時捕獲到的父執行緒的本地變數,賦給自己),當任務執行完,就恢復原生的自己本身的執行緒變數值。
下面來具體看這倆方法:
//下面的方法均屬於TTL的靜態內部類Transmittable @Nonnull public static Object replay(@Nonnull Object captured) { @SuppressWarnings("unchecked") Map<TransmittableThreadLocal<?>, Object> capturedMap = (Map<TransmittableThreadLocal<?>, Object>) captured; //使用此執行緒非同步時捕獲到的父執行緒裡的本地變數值 Map<TransmittableThreadLocal<?>, Object> backup = new HashMap<TransmittableThreadLocal<?>, Object>(); //當前執行緒原生的本地變數,用於使用完執行緒後恢復用 //注意:這裡迴圈的是當前子執行緒原生的本地變數集合,與本方法相反,restore方法裡迴圈這個holder是指:該執行緒執行期間產生的變數+父執行緒繼承來的變數 for (Iterator<? extends Map.Entry<TransmittableThreadLocal<?>, ?>> iterator = holder.get().entrySet().iterator(); iterator.hasNext(); ) { Map.Entry<TransmittableThreadLocal<?>, ?> next = iterator.next(); TransmittableThreadLocal<?> threadLocal = next.getKey(); backup.put(threadLocal, threadLocal.get()); // 所有原生的本地變數都暫時儲存在backup裡,用於之後恢復用 /** * 檢查,如果捕獲到的執行緒變數裡,不包含當前原生變數值,則從當前原生變數裡清除掉,對應的執行緒本地變數也清掉 * 這就是為什麼會將原生變數儲存在backup裡的原因,為了恢復原生值使用 * 那麼,為什麼這裡要清除掉呢?因為從使用這個子執行緒做非同步那裡,捕獲到的本地變數並不包含原生的變數,當前執行緒 * 在做任務時的首要目標,是將父執行緒裡的變數完全傳遞給任務,如果不清除這個子執行緒原生的本地變數, * 意味著很可能會影響到任務裡取值的準確性。 * * 打個比方,有ttl物件tl,這個tl線上程池的某個子執行緒裡存在對應的值2,當某個主執行緒使用該子執行緒做非同步任務時 * tl這個物件在當前主執行緒裡沒有值,那麼如果不進行下面這一步的操作,那麼在使用該子執行緒做的任務裡就可以通過 * 該tl物件取到值2,不符合預期 */ if (!capturedMap.containsKey(threadLocal)) { iterator.remove(); threadLocal.superRemove(); } } // 這一步就是直接把父執行緒本地變數賦值給當前執行緒了(這一步起就重新整理了holder裡的值了,具體往下看該方法,在非同步執行緒執行期間,還可能產生別的本地變數,比如在真正的run方法內的業務程式碼,再用一個tl物件設定一個值) setTtlValuesTo(capturedMap); // 這個方法屬於擴充套件方法,ttl本身支援重寫非同步任務執行前後的操作,這裡不再具體贅述 doExecuteCallback(true); return backup; } // 結合之前Rannable包裝類的run方法來看,這個方法就是使用上面replay記錄下的原生執行緒變數做恢復用的 public static void restore(@Nonnull Object backup) { @SuppressWarnings("unchecked") Map<TransmittableThreadLocal<?>, Object> backupMap = (Map<TransmittableThreadLocal<?>, Object>) backup; // call afterExecute callback doExecuteCallback(false); // 注意,這裡的holder取出來的,實際上是replay方法設定進去的關於父執行緒裡的所有變數(結合上面來看,就是:該執行緒執行期間產生的變數+父執行緒繼承來的變數) for (Iterator<? extends Map.Entry<TransmittableThreadLocal<?>, ?>> iterator = holder.get().entrySet().iterator(); iterator.hasNext(); ) { Map.Entry<TransmittableThreadLocal<?>, ?> next = iterator.next(); TransmittableThreadLocal<?> threadLocal = next.getKey(); /** * 同樣的,如果子執行緒原生變數不包含某個父執行緒傳來的物件,那麼就刪除,可以思考下,這裡的清除跟上面replay裡的有什麼不同? * 這裡會把不屬於原生變數的物件給刪除掉(這裡被刪除掉的可能是父執行緒繼承下來的,也可能是非同步任務在執行時產生的新值) */ if (!backupMap.containsKey(threadLocal)) { iterator.remove(); threadLocal.superRemove(); } } // 同樣呼叫這個方法,進行值的恢復 setTtlValuesTo(backupMap); } // 真正給當前子執行緒賦值的方法,對應上面的setTtlValuesTo方法 private static void setTtlValuesTo(@Nonnull Map<TransmittableThreadLocal<?>, Object> ttlValues) { for (Map.Entry<TransmittableThreadLocal<?>, Object> entry : ttlValues.entrySet()) { @SuppressWarnings("unchecked") TransmittableThreadLocal<Object> threadLocal = (TransmittableThreadLocal<Object>) entry.getKey(); threadLocal.set(entry.getValue()); //賦值,注意,從這裡開始,子執行緒的holder裡的值會被重新賦值重新整理,可以參照上面ttl的set方法的實現 } }
ok,到這裡基本上把TTL比較核心的程式碼看完了,下面整理下整個流程,這是官方給出的時序圖:
圖2-1
上圖第一行指的是類名稱,下面的流程指的是類所做的事情,根據上面羅列出來的原始碼,結合這個時序圖,可以比較直觀一些的理解整個流程。
三、TTL中執行緒池子執行緒原生變數的產生
這一節是為了驗證上面replay和restore,現在通過一個例子來驗證下,先把原始碼down下來,在原始碼的restore和replay上分別加上輸出語句,遍歷holder:
//replay前後列印holder裡面的值 public static Object replay(@Nonnull Object captured) { @SuppressWarnings("unchecked") Map<TransmittableThreadLocal<?>, Object> capturedMap = (Map<TransmittableThreadLocal<?>, Object>) captured; Map<TransmittableThreadLocal<?>, Object> backup = new HashMap<TransmittableThreadLocal<?>, Object>(); System.out.println("--------------------replay前置,當前拿到的holder裡的TTL列表"); for (Iterator<? extends Map.Entry<TransmittableThreadLocal<?>, ?>> iterator = holder.get().entrySet().iterator(); iterator.hasNext(); ) { Map.Entry<TransmittableThreadLocal<?>, ?> next = iterator.next(); TransmittableThreadLocal<?> threadLocal = next.getKey(); System.out.println(String.format("replay前置裡拿到原生的ttl_k=%s, ttl_value=%s", threadLocal.hashCode(), threadLocal.get())); } for...//程式碼省略,具體看上面 setTtlValuesTo(capturedMap); doExecuteCallback(true); System.out.println("--------------------reply後置,當前拿到的holder裡的TTL列表"); for (Iterator<? extends Map.Entry<TransmittableThreadLocal<?>, ?>> iterator = holder.get().entrySet().iterator(); iterator.hasNext(); ) { Map.Entry<TransmittableThreadLocal<?>, ?> next = iterator.next(); TransmittableThreadLocal<?> threadLocal = next.getKey(); System.out.println(String.format("replay後置裡拿到原生的ttl_k=%s, ttl_value=%s", threadLocal.hashCode(), threadLocal.get())); } return backup; } //restore前後列印holder裡面的值 public static void restore(@Nonnull Object backup) { @SuppressWarnings("unchecked") Map<TransmittableThreadLocal<?>, Object> backupMap = (Map<TransmittableThreadLocal<?>, Object>) backup; // call afterExecute callback doExecuteCallback(false); System.out.println("--------------------restore前置,當前拿到的holder裡的TTL列表"); for (Iterator<? extends Map.Entry<TransmittableThreadLocal<?>, ?>> iterator = holder.get().entrySet().iterator(); iterator.hasNext(); ) { Map.Entry<TransmittableThreadLocal<?>, ?> next = iterator.next(); TransmittableThreadLocal<?> threadLocal = next.getKey(); System.out.println(String.format("restore前置裡拿到當前執行緒內變數,ttl_k=%s, ttl_value=%s", threadLocal.hashCode(), threadLocal.get())); } for...//省略程式碼,具體具體看上面 setTtlValuesTo(backupMap); System.out.println("--------------------restore後置,當前拿到的holder裡的TTL列表"); for (Iterator<? extends Map.Entry<TransmittableThreadLocal<?>, ?>> iterator = holder.get().entrySet().iterator(); iterator.hasNext(); ) { Map.Entry<TransmittableThreadLocal<?>, ?> next = iterator.next(); TransmittableThreadLocal<?> threadLocal = next.getKey(); System.out.println(String.format("restore後置裡拿到當前執行緒內變數,ttl_k=%s, ttl_value=%s", threadLocal.hashCode(), threadLocal.get())); } }
程式碼這樣做的目的,就是要說明執行緒池所謂的原生本地變數是怎麼產生的,以及replay和restore是怎麼設定和恢復的,下面來看個簡單的例子:
private static ExecutorService executorService = TtlExecutors.getTtlExecutorService(Executors.newFixedThreadPool(1)); private static ThreadLocal tl = new TransmittableThreadLocal(); private static ThreadLocal tl2 = new TransmittableThreadLocal(); public static void main(String[] args) throws InterruptedException { tl.set(1); tl2.set(2); executorService.execute(new Runnable() { @Override public void run() { try { Thread.sleep(1000L); } catch (InterruptedException e) { e.printStackTrace(); } } }); }
執行結果如下:
--------------------replay前置,當前拿到的holder裡的TTL列表 replay前置裡拿到原生的ttl_k=1259475182, ttl_value=2 replay前置裡拿到原生的ttl_k=929338653, ttl_value=1 --------------------reply後置,當前拿到的holder裡的TTL列表 replay後置裡拿到原生的ttl_k=1259475182, ttl_value=2 replay後置裡拿到原生的ttl_k=929338653, ttl_value=1 --------------------restore前置,當前拿到的holder裡的TTL列表 restore前置裡拿到當前執行緒內變數,ttl_k=1259475182, ttl_value=2 restore前置裡拿到當前執行緒內變數,ttl_k=929338653, ttl_value=1 --------------------restore後置,當前拿到的holder裡的TTL列表 restore後置裡拿到當前執行緒內變數,ttl_k=1259475182, ttl_value=2 restore後置裡拿到當前執行緒內變數,ttl_k=929338653, ttl_value=1
我們會發現,原生值產生了,從非同步開始,就確定了執行緒池裡的執行緒具備了1和2的值,那麼,再來改動下上面的測試程式碼:
public static void main(String[] args) throws InterruptedException { tl.set(1); executorService.execute(new Runnable() { @Override public void run() { try { Thread.sleep(100L); } catch (InterruptedException e) { e.printStackTrace(); } } }); Thread.sleep(1000L); tl2.set(2);//較第一次換下位置,換到第一次使用執行緒池後執行(這意味著下面這次非同步不會再觸發Thread的init方法了) System.out.println("---------------------------------------------------------------------------------"); executorService.execute(new Runnable() { @Override public void run() { try { Thread.sleep(1000L); } catch (InterruptedException e) { e.printStackTrace(); } } }); }
執行結果為:
--------------------replay前置,當前拿到的holder裡的TTL列表 replay前置裡拿到原生的ttl_k=929338653, ttl_value=1 --------------------reply後置,當前拿到的holder裡的TTL列表 replay後置裡拿到原生的ttl_k=929338653, ttl_value=1 --------------------restore前置,當前拿到的holder裡的TTL列表 restore前置裡拿到當前執行緒內變數,ttl_k=929338653, ttl_value=1 --------------------restore後置,當前拿到的holder裡的TTL列表 restore後置裡拿到當前執行緒內變數,ttl_k=929338653, ttl_value=1 --------------------------------------------------------------------------------- --------------------replay前置,當前拿到的holder裡的TTL列表 replay前置裡拿到原生的ttl_k=929338653, ttl_value=1 --------------------reply後置,當前拿到的holder裡的TTL列表 replay後置裡拿到原生的ttl_k=1020371697, ttl_value=2 replay後置裡拿到原生的ttl_k=929338653, ttl_value=1 --------------------restore前置,當前拿到的holder裡的TTL列表 restore前置裡拿到當前執行緒內變數,ttl_k=1020371697, ttl_value=2 restore前置裡拿到當前執行緒內變數,ttl_k=929338653, ttl_value=1 --------------------restore後置,當前拿到的holder裡的TTL列表 restore後置裡拿到當前執行緒內變數,ttl_k=929338653, ttl_value=1
可以發現,第一次非同步時,只有一個值被傳遞了下去,然後第二次非同步,新加了一個tl2的值,但是看第二次非同步的列印,會發現,restore恢復後,仍然是第一次非同步發生時放進去的那個tl的值。
通過上面的例子,基本可以確認,所謂執行緒池內執行緒的本地原生變數,其實是第一次使用執行緒時被傳遞進去的值,我們之前有說過TTL是繼承至ITL的,之前的文章也說過,執行緒池第一次啟用時是會觸發Thread的init方法的,也就是說,在第一次非同步時拿到的主執行緒的變數會被傳遞給子執行緒,作為子執行緒的原生本地變數儲存起來,後續是replay操作和restore操作也是圍繞著這個原生變數(即原生holder裡的值)來進行設定、恢復的,設定的是當前父執行緒捕獲到的本地變數,恢復的是子執行緒原生本地變數。
holder裡持有的可以理解就是當前執行緒內的所有本地變數,當子執行緒將非同步任務執行完畢後,會執行restore進行恢復原生本地變數,具體參照上面的程式碼和測試程式碼。
四、總結
到這裡基本上確認了TTL是如何進行執行緒池傳值的,以及被包裝的run方法執行非同步任務之前,會使用replay進行設定父執行緒裡的本地變數給當前子執行緒,任務執行完畢,會呼叫restore恢復該子執行緒原生的本地變數(目前原生本地變數的產生,就只碰到上述測試程式碼中的這一種情況,即執行緒第一次使用時通過ITL屬性以及Thread的init方法傳給子執行緒,還不太清楚有沒有其他方式設定)。
其實,正常程式裡想要完成執行緒池上下文傳遞,使用TL就足夠了,我們可以效仿TTL包裝執行緒池物件的原理,進行值傳遞,非同步任務結束後,再remove,以此類推來完成執行緒池值傳遞,不過這種方式過於單純,且要求上下文為只讀物件,否則子執行緒存在寫操作,就會發生上下文汙染。
TTL專案地址(可以詳細瞭解下它的其他特性和用法):https://github.com/alibaba/transmittable-thread-local