1. 程式人生 > >原始碼分析:Exchanger之資料交換器

原始碼分析:Exchanger之資料交換器

## 簡介 [Exchanger](https://jinglingwang.cn)是Java5 開始引入的一個類,它允許兩個執行緒之間交換持有的資料。當[Exchanger](https://jinglingwang.cn)在一個執行緒中呼叫exchange方法之後,會阻塞等待另一個執行緒呼叫同樣的exchange方法,然後以執行緒安全的方式交換資料,之後執行緒繼續執行。 ### 官方示例 在JDK的原始碼註釋中,提供了一個簡單的示例demo,稍加修改後就可以執行 ```java public class FillAndEmpty { Exchanger exchanger = new Exchanger(); Integer initialEmptyBuffer = 1; Integer initialFullBuffer = 2; class FillingLoop implements Runnable { public void run() { Integer currentBuffer = initialEmptyBuffer; try { while (currentBuffer != 2) { currentBuffer = exchanger.exchange(currentBuffer); } System.out.println("FillingLoop:"+currentBuffer); } catch (InterruptedException ex) { } } } class EmptyingLoop implements Runnable { public void run() { Integer currentBuffer = initialFullBuffer; try { while (currentBuffer != 1) { currentBuffer = exchanger.exchange(currentBuffer); } System.out.println("EmptyingLoop:"+currentBuffer); } catch (InterruptedException ex) { } } } void start() { new Thread(new FillingLoop()).start(); new Thread(new EmptyingLoop()).start(); } public static void main(String[] args){ FillAndEmpty f = new FillAndEmpty(); f.start(); } } ``` ## 原始碼分析 ### 內部類 Exchanger 中定義了兩個內部類:Node、Participant ```java // 使用 @sun.misc.Contended 註解避免出現偽共享 @sun.misc.Contended static final class Node { int index; // Arena 中的索引 int bound; // Exchanger.bound的最後記錄值 int collides; // 當前 bound 的CAS 失敗數 int hash; // Pseudo-random for spins Object item; // 執行緒的當前資料項 volatile Object match; // 由釋放執行緒提供的專案 volatile Thread parked; // 當阻塞(parked)時,設定此執行緒,否則為null } ``` ```java /** 繼承了ThreadLocal,並初始化了Node物件 */ static final class Participant extends ThreadLocal { public Node initialValue() { return new Node(); } } ``` ### 重要的屬性 ```java /** 每個執行緒的狀態 */ private final Participant participant; /** 消除陣列;在啟用(在slotExchange中)之前為空。元素訪問使用volatile get和CAS */ private volatile Node[] arena; /** 在檢測到爭用之前一直使用的插槽,可以理解為先到的執行緒的資料項 */ private volatile Node slot; /** 每次更新時,將最大有效競技場位置的索引與高位SEQ號進行“或”運算。 */ private volatile int bound; ``` ### exchange()方法 等待另一個執行緒到達交換點(除非當前執行緒被中斷),然後將給定的物件傳遞給它,作為回報接收另一個的物件。 ```java public V exchange(V x) throws InterruptedException { // 交換後的物件v Object v; // item 為交換出去的物件,如果為null則換成NULL_ITEM物件 Object item = (x == null) ? NULL_ITEM : x; // translate null args // 1.1構造方法沒有初始化arena,所以第一個進來的執行緒看見的arena肯定為null // 1.2第一個進來的執行緒繼續呼叫slotExchange(item, false, 0L)方法 if ((arena != null || (v = slotExchange(item, false, 0L)) == null) && // 2.1 Thread.interrupted(): 檢測執行緒是否有被中斷 // 2.2 arenaExchange(item, false, 0L):slotExchange方法 返回了null時會進入到這個方法 ((Thread.interrupted() || (v = arenaExchange(item, false, 0L)) == null))) throw new InterruptedException(); return (v == NULL_ITEM) ? null : (V)v; } ``` arenaExchange()方法總結: 1. 呼叫exchange方法的執行緒等待另一個執行緒到達交換點完成交換資料 2. 如果交換的資料為null,會被轉換成一個`NULL_ITEM` 的Object物件作為轉換的資料項 3. 構造方法未初始化`arena`物件,所以會先呼叫`slotExchange`方法借用slot插槽來交換物件 4. 如果`slotExchange`方法成功返回了另一個交換到的物件,則直接返回交換到的資料項 5. 如果`slotExchange`方法成功返回了null,會繼續呼叫`arenaExchange`方法完成資料交換並返回 ### slotExchange()方法 ```java /** * item:要交換的專案 * timed:是否有設定超時 * ns: 設定的超時時間 * return: 返回另一個執行緒的資料項;如果啟用arena或執行緒在完成之前被中斷,則為null;如果超時,則為TIMED_OUT */ private final Object slotExchange(Object item, boolean timed, long ns) { // 獲取當前執行緒node節點物件 Node p = participant.get(); Thread t = Thread.currentThread(); // 當前執行緒 if (t.isInterrupted()) // preserve interrupt status so caller can recheck return null; // 自旋 for (Node q;;) { if ((q = slot) != null) { // 兩個執行緒先到的執行緒,slot肯定為null,一般後到的執行緒會進入到這個if分支 // 如果在當前執行緒之前已經有執行緒呼叫了exchange方法,slot就肯定不為null,條件成立 if (U.compareAndSwapObject(this, SLOT, q, null)) {// 後來的執行緒會呼叫CAS吧slot再置為null // q.item 是較早的執行緒的資料項 Object v = q.item; // item 是當前執行緒的資料項;by: https://jinglingwang.cn q.match = item; // 之前阻塞(park)的執行緒 Thread w = q.parked; if (w != null) //可能另一個執行緒還在自旋,沒有阻塞,所以這裡可能會為null // 喚醒之前被阻塞的執行緒 U.unpark(w); // 返回之前的執行緒的資料項 return v; } // create arena on contention, but continue until slot null // 上面CAS修改slot失敗後,會進入到這裡;https://jinglingwang.cn // SEQ = MMASK + 1 = 256 if (NCPU > 1 && bound == 0 && U.compareAndSwapInt(this, BOUND, 0, SEQ)) // if條件成立,初始化arena陣列 // 我8核的CPU,計算的length是 (4+2) << 7 == 768 arena = new Node[(FULL + 2) << ASHIFT]; } else if (arena != null) // 如果上面的if條件成立並且初始化了arena陣列,會進入到arenaExchange方法 return null; // caller must reroute to arenaExchange else { p.item = item; // p節點的item設定為當前項item if (U.compareAndSwapObject(this, SLOT, null, p)) // CAS 修改slot的值,修改成功退出自旋 break; p.item = null; //CAS 修改失敗沒有退出自旋,重置p節點的item為null } } // 理論上第一個先到的執行緒會進入到下面,會阻塞自己,等待另一個執行緒的資料項到來 // await release int h = p.hash; long end = timed ? System.nanoTime() + ns : 0L; // 超時時間 // 根據CPU的核數確定自旋的次數1024 or 1 int spins = (NCPU >
1) ? SPINS : 1; Object v; while ((v = p.match) == null) { // 先到的執行緒 p.match 可能會為null,下面開始自旋等待另一個執行緒交換的資料設定到match if (spins > 0) { **// 至少先自旋 1024 次,等待match資料項,自旋後才阻塞自己** h ^= h << 1; h ^= h >>> 3; h ^= h << 10; if (h == 0) h = SPINS | (int)t.getId(); // 重新計算hash else if (h < 0 && (--spins & ((SPINS >>> 1) - 1)) == 0) // 減少自旋次數 Thread.yield(); // 讓出CPU的使用權 } else if (slot != p) // 上面自旋次數已經減到0了,並且slot != p,沒有衝突的話理論上slot 應該是等於 p 的 spins = SPINS; // 重置自旋次數 else if (!t.isInterrupted() && arena == null && (!timed || (ns = end - System.nanoTime()) > 0L)) { U.putObject(t, BLOCKER, this); p.parked = t; if (slot == p) U.park(false, ns); // 呼叫底層阻塞最早的執行緒 // 執行緒被喚醒了,回到上面再次判斷while自旋,p.match理論上不會是null了,p.match是後到的執行緒的資料項,是需要返回給當前執行緒的項 p.parked = null; U.putObject(t, BLOCKER, null); } else if (U.compareAndSwapObject(this, SLOT, p, null)) { // 如果執行緒阻塞超時了,還是沒等待要交換的資料項,會進入到這裡,返回一個TIMED_OUT 物件或null v = timed && ns <= 0L && !t.isInterrupted() ? TIMED_OUT : null; break; } } // 將 當前執行緒p 的 match 屬性設定成 null U.putOrderedObject(p, MATCH, null); p.item = null; p.hash = h; // 返回匹配後的資料項v return v; } ``` slotExchange()方法總結: 1. 執行緒進入該方法後,會先拿到`[Exchanger](https://jinglingwang.cn)`的`Participant`,也就是`Node`資料節點`p`; 2. 檢查執行緒的狀態,是否有被中斷,如果是返回null,會進入到下面的`arenaExchange`方法邏輯 3. 先呼叫slotExchange()方法的執行緒會使用CAS的方式執行緒安全的佔用`slot`插槽 4. 然後會自旋至少1024次並不斷讓出CPU使用權,期間如果成功等待到了另外的執行緒的資料項(`p.match != null`),則直接返回交換到的資料(`v = p.match`) 5. 如果自旋後沒有等到交換的資料項,呼叫`U.park`阻塞當前執行緒,等待另一個執行緒的到來將其喚醒或者超時 6. 另一個執行緒進入slotExchange()方法後,發現slot插槽已經被佔用(已經有執行緒在等它交換資料了),取出slot插槽中的item資料(第一個執行緒的資料),並設定自己的資料到插槽的match項,然後喚醒另一個執行緒,成功換反交換到的資料。 7. 被喚醒的執行緒成功獲得match資料,並返回交換後的match資料 `slotExchange`方法返回null的2種情況: 1. 執行緒被中斷,會返回null 2. 設定了超時時間,並且時間超時,會返回`TIMED_OUT` 3. 第一個執行緒超時了,把slot從p置為null的同事第二個執行緒剛好呼叫CAS也在把slot從q修改為null,這時候第二個執行緒會修改失敗,然後就會去初始化`arena`陣列,然後第二個執行緒就可能返回null ### [arenaExchange()方法](https://jinglingwang.cn) 從`exchange()`方法實現中可以看到,只有當`slotExchange()`方法返回null之後才會執行到`arenaExchange()`方法,而執行緒中斷的情況是不會進入到該方法的,所以只有另一種情況,但是要進入的機率太小了,斷點除錯的話難以構造這種情況。 ```java private final Object arenaExchange(Object item, boolean timed, long ns) { // 實質上就是個Node陣列 Node[] a = arena; // 獲取當前執行緒node節點物件 Node p = participant.get(); // p.index 訪問插槽的索引位置,初始值為0 for (int i = p.index;;) { // access slot at i // j是原始陣列偏移量 https://jinglingwang.cn int b, m, c; long j; // j is raw array offset // ABASE:返回Node陣列中第一個元素的偏移地址+128; i << ASHIFT : i<<7 // getObjectVolatile:獲取obj物件中offset偏移地址對應的object型field的值,支援volatile load語義 // q節點就是通過CAS獲取arena陣列偏移(i + 1) * 128個地址位上的node Node q = (Node)U.getObjectVolatile(a, j = (i << ASHIFT) + ABASE); // 如果獲取到的節點不為空,並且再次吧j位置的q元素置為null if (q != null && U.compareAndSwapObject(a, j, q, null)) { // 整個條件成立,代表執行緒獲得了交換的資料 Object v = q.item; // release q.match = item; Thread w = q.parked; if (w != null) // 有阻塞的執行緒就喚醒 U.unpark(w); return v; // 返回交換的資料 } else if (i <= (m = (b = bound) & MMASK) && q == null) { // i 沒有越界,並且q==null // 把當前執行緒的資料賦予給p節點的item p.item = item; // offer if (U.compareAndSwapObject(a, j, null, p)) { // 再使用CAS的方式把p節點安全的放入到陣列的j位置上 // CAS 修改成功 // 計算超時時間 long end = (timed && m == 0) ? System.nanoTime() + ns : 0L; Thread t = Thread.currentThread(); // wait 當前執行緒 // 自旋 1024 for (int h = p.hash, spins = SPINS;;) { Object v = p.match; //交換的資料 if (v != null) { // 交換的資料不為null,說明有其他執行緒把交換的資料送進來了 U.putOrderedObject(p, MATCH, null); // 將match和item置為null p.item = null; // clear for next use p.hash = h; return v;// 返回資料 } else if (spins >
0) { // 異或移位 h ^= h << 1; h ^= h >>> 3; h ^= h << 10; // xorshift if (h == 0) // initialize hash 初始化hash h = SPINS | (int)t.getId(); else if (h < 0 && // approx 50% true (--spins & ((SPINS >>> 1) - 1)) == 0) // 減少自旋次數 Thread.yield(); // two yields per wait 讓出CPU使用權 } else if (U.getObjectVolatile(a, j) != p) // 和slotExchange方法中的類似 // 重置自旋次數 spins = SPINS; // releaser hasn't set match yet else if (!t.isInterrupted() && m == 0 && (!timed || // 超時時間設定 (ns = end - System.nanoTime()) >
0L)) { U.putObject(t, BLOCKER, this); // emulate LockSupport p.parked = t; // minimize window if (U.getObjectVolatile(a, j) == p) U.park(false, ns); // 阻塞當前執行緒,等待被喚醒 p.parked = null; // 執行緒被喚醒了 U.putObject(t, BLOCKER, null); } else if (U.getObjectVolatile(a, j) == p && U.compareAndSwapObject(a, j, p, null)) { // m會跟著bound變化,初始會是0 if (m != 0) // try to shrink U.compareAndSwapInt(this, BOUND, b, b + SEQ - 1); // 修改b p.item = null; p.hash = h; // i = p.index無符號右移1位 i = p.index >>>= 1; // descend if (Thread.interrupted()) //執行緒被中斷 return null; if (timed && m == 0 && ns <= 0L) // 超時,返回TIME_OUT return TIMED_OUT; break; // expired; restart } } } else // 使用CAS的方式把p節點安全的放入到陣列的j位置上失敗(可能有其他執行緒已經捷足先登),重置p節點的item p.item = null; // clear offer } else { // 上面兩個if條件都沒成立:比如q!=null,compareAndSwapObject失敗,陣列未越界 if (p.bound != b) { // stale; reset p.bound = b; // b變化了,重置bond p.collides = 0; // 當前 bound 的CAS 失敗數 i = (i != m || m == 0) ? m : m - 1; // 確定索引i } else if ((c = p.collides) < m || m == FULL || !U.compareAndSwapInt(this, BOUND, b, b + SEQ + 1)) { p.collides = c + 1; // bound 的CAS 失敗數+1 // 確定迴圈遍歷i,繼續回到上面最初的地方自旋 i = (i == 0) ? m : i - 1; // cyclically traverse } else // 此時表示bound值增加了SEQ+1 i = m + 1; // grow p.index = i; // 設定下標,繼續自旋 } } } ``` ## Exchanger總結: 1. Exchanger 可以以執行緒安全的方式完成兩個執行緒之間資料的交換工作 2. By: [http://jinglingwang.cn](http://jinglingwang.cn) 3. Exchanger 主要是使用了自旋和CAS來保證資料的原子性 4. 一般情況下,slotExchange()方法即可完成資料交換的工作 5. JDK8 版本的Exchanger 使用了 `@sun.misc.Contended`註解來避免偽共享 6. 資料交換過程可以總結為:A、B執行緒交換資料 ,A發現slot為空就把自己的資料放入到slot插槽中的item項,自旋或阻塞等待B執行緒的資料,B執行緒進來發現A執行緒的資料後取走資料並設定自己的資料到match,然後再喚醒A執行緒取走B執行緒的match資料。多個執行緒交換時,需要用到slot