java.util.Concurrent.Exchanger 原始碼
類圖
Exchanger 是一個用於執行緒間協作的工具類,Exchanger用於進行執行緒間的資料交換,它提供一個同步點,在這個同步點,兩個執行緒可以交換彼此的資料。這兩個執行緒通過exchange 方法交換資料,如果第一個執行緒先執行exchange 方法,它會一直等待第二個執行緒也執行exchange 方法,當兩個執行緒都到達同步點時,這兩個執行緒就可以交換資料。
原始碼:
package java.util.concurrent; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicReference; import java.util.concurrent.locks.LockSupport; public class Exchanger<V> { private static final int ASHIFT = 7; private static final int MMASK = 0xff; private static final int SEQ = MMASK + 1; private static final int NCPU = Runtime.getRuntime().availableProcessors(); static final int FULL = (NCPU >= (MMASK << 1)) ? MMASK : NCPU >>> 1; private static final int SPINS = 1 << 10;// 自旋次數 private static final Object NULL_ITEM = new Object();//如果交換的資料為null,則用NULL_ITEM代替 private static final Object TIMED_OUT = new Object(); private final Participant participant;//每個執行緒的資料,ThreadLocal 子類 private volatile Node[] arena; private volatile Node slot;// 用於交換資料的槽位 private volatile int bound; @sun.misc.Contended static final class Node { int index; //arena的下標,多個槽位的時候利用 int bound; // 上一次記錄的Exchanger.bound; int collides; // 在當前bound下CAS失敗的次數; int hash; // 用於自旋; Object item; // 這個執行緒的當前項,也就是需要交換的資料; volatile Object match; // 交換的資料 volatile Thread parked; // 執行緒 } static final class Participant extends ThreadLocal<Node> { // 初始值返回Node public Node initialValue() { return new Node(); } } private final Object arenaExchange(Object item, boolean timed, long ns) { // 槽位陣列 Node[] a = arena; //代表當前執行緒的Node Node p = participant.get(); // p.index 初始值為 0 for (int i = p.index; ; ) { // access slot at i int b, m, c; long j; // j is raw array offset //在槽位陣列中根據"索引" i 取出資料 j相當於是 "第一個"槽位 Node q = (Node) U.getObjectVolatile(a, j = (i << ASHIFT) + ABASE); // 該位置上有資料(即有執行緒在這裡等待交換資料) if (q != null && U.compareAndSwapObject(a, j, q, null)) { // 進行資料交換,這裡和單槽位的交換是一樣的 Object v = q.item; // release q.match = item; Thread w = q.parked; if (w != null) U.unpark(w); return v; } // bound 是最大的有效的 位置,和MMASK相與,得到真正的儲存資料的索引最大值 else if (i <= (m = (b = bound) & MMASK) && q == null) { // i 在這個範圍內,該槽位也為空 //將需要交換的資料 設定給p p.item = item; // offer //設定該槽位資料(在該槽位等待其它執行緒來交換資料) if (U.compareAndSwapObject(a, j, null, p)) { long end = (timed && m == 0) ? System.nanoTime() + ns : 0L; Thread t = Thread.currentThread(); // wait // 進行一定時間的自旋 for (int h = p.hash, spins = SPINS; ; ) { Object v = p.match; //在自旋的過程中,有執行緒來和該執行緒交換資料 if (v != null) { //交換資料後,清空部分設定,返回交換得到的資料,over U.putOrderedObject(p, MATCH, null); p.item = null; // clear for next use p.hash = h; return v; } else if (spins > 0) { h ^= h << 1; h ^= h >>> 3; h ^= h << 10; // xorshift if (h == 0) // initialize hash h = SPINS | (int) t.getId(); else if (h < 0 && // approx 50% true (--spins & ((SPINS >>> 1) - 1)) == 0) Thread.yield(); // two yields per wait } // 交換資料的執行緒到來,但是還沒有設定好match,再稍等一會 else if (U.getObjectVolatile(a, j) != p) spins = SPINS; //符合條件,特別注意m==0 這個說明已經到達area 中最小的儲存資料槽位了 //沒有其他執行緒在槽位等待了,所有當前執行緒需要阻塞在這裡 else if (!t.isInterrupted() && m == 0 && (!timed || (ns = end - System.nanoTime()) > 0L)) { U.putObject(t, BLOCKER, this); // emulate LockSupport p.parked = t; // minimize window // 再次檢查槽位,看看在阻塞前,有沒有執行緒來交換資料 if (U.getObjectVolatile(a, j) == p) U.park(false, ns); // 掛起 p.parked = null; U.putObject(t, BLOCKER, null); } // 當前這個槽位一直沒有執行緒來交換資料,準備換個槽位試試 else if (U.getObjectVolatile(a, j) == p && U.compareAndSwapObject(a, j, p, null)) { //更新bound if (m != 0) // try to shrink U.compareAndSwapInt(this, BOUND, b, b + SEQ - 1); p.item = null; p.hash = h; // 減小索引值 往"第一個"槽位的方向挪動 i = p.index >>>= 1; // descend // 傳送中斷,返回null if (Thread.interrupted()) return null; // 超時 if (timed && m == 0 && ns <= 0L) return TIMED_OUT; break; // expired; restart 繼續主迴圈 } } } else //佔據槽位失敗,先清空item,防止成功交換資料後,p.item還引用著item p.item = null; // clear offer } else { // i 不在有效範圍,或者被其它執行緒搶先了 //更新p.bound if (p.bound != b) { // stale; reset p.bound = b; //新bound ,重置collides p.collides = 0; //i如果達到了最大,那麼就遞減 i = (i != m || m == 0) ? m : m - 1; } else if ((c = p.collides) < m || m == FULL || !U.compareAndSwapInt(this, BOUND, b, b + SEQ + 1)) { p.collides = c + 1; // 更新衝突 // i=0 那麼就從m開始,否則遞減i i = (i == 0) ? m : i - 1; // cyclically traverse } else //遞增,往後挪動 i = m + 1; // grow // 更新index p.index = i; } } } private final Object slotExchange(Object item, boolean timed, long ns) { // 得到一個初試的Node Node p = participant.get(); // 當前執行緒 Thread t = Thread.currentThread(); // 如果發生中斷,返回null,會重設中斷標誌位,並沒有直接拋異常 if (t.isInterrupted()) // preserve interrupt status so caller can recheck return null; for (Node q;;) { // 槽位 solt不為null,則說明已經有執行緒在這裡等待交換資料了 if ((q = slot) != null) { // 重置槽位 if (U.compareAndSwapObject(this, SLOT, q, null)) { //獲取交換的資料 Object v = q.item; //等待執行緒需要的資料 q.match = item; //等待執行緒 Thread w = q.parked; //喚醒等待的執行緒 if (w != null) U.unpark(w); return v; // 返回拿到的資料,交換完成 } // create arena on contention, but continue until slot null //存在競爭,其它執行緒搶先了一步該執行緒,因此需要採用多槽位模式,這個後面再分析 if (NCPU > 1 && bound == 0 && U.compareAndSwapInt(this, BOUND, 0, SEQ)) arena = new Node[(FULL + 2) << ASHIFT]; }else if (arena != null) //多槽位不為空,需要執行多槽位交換 return null; // caller must reroute to arenaExchange else { //還沒有其他執行緒來佔據槽位 p.item = item; // 設定槽位為p(也就是槽位被當前執行緒佔據) if (U.compareAndSwapObject(this, SLOT, null, p)) break; // 退出無限迴圈 p.item = null; // 如果設定槽位失敗,則有可能其他執行緒搶先了,重置item,重新迴圈 } } //當前執行緒佔據槽位,等待其它執行緒來交換資料 int h = p.hash; long end = timed ? System.nanoTime() + ns : 0L; int spins = (NCPU > 1) ? SPINS : 1; Object v; // 直到成功交換到資料 while ((v = p.match) == null) { if (spins > 0) { // 自旋 h ^= h << 1; h ^= h >>> 3; h ^= h << 10; if (h == 0) h = SPINS | (int)t.getId(); else if (h < 0 && (--spins & ((SPINS >>> 1) - 1)) == 0) // 主動讓出cpu,這樣可以提供cpu利用率(反正當前執行緒也自旋等待,還不如讓其它任務佔用cpu) Thread.yield(); } else if (slot != p) //其它執行緒來交換資料了,修改了solt,但是還沒有設定match,再稍等一會 spins = SPINS; //需要阻塞等待其它執行緒來交換資料 //沒發生中斷,並且是單槽交換,沒有設定超時或者超時時間未到 則繼續執行 else if (!t.isInterrupted() && arena == null && (!timed || (ns = end - System.nanoTime()) > 0L)) { // cas 設定BLOCKER,可以參考Thread 中的parkBlocker U.putObject(t, BLOCKER, this); // 需要掛起當前執行緒 p.parked = t; if (slot == p) U.park(false, ns); // 阻塞當前執行緒 // 被喚醒後 p.parked = null; // 清空 BLOCKER U.putObject(t, BLOCKER, null); } // 不滿足前面 else if 條件,交換失敗,需要重置solt else if (U.compareAndSwapObject(this, SLOT, p, null)) { v = timed && ns <= 0L && !t.isInterrupted() ? TIMED_OUT : null; break; } } //清空match U.putOrderedObject(p, MATCH, null); p.item = null; p.hash = h; // 返回交換得到的資料(失敗則為null) return v; } public Exchanger() { participant = new Participant(); } public V exchange(V x) throws InterruptedException { Object v; Object item = (x == null) ? NULL_ITEM : x; if ((arena != null || (v = slotExchange(item, false, 0L)) == null) && ((Thread.interrupted() || (v = arenaExchange(item, false, 0L)) == null))) throw new InterruptedException(); return (v == NULL_ITEM) ? null : (V)v; } public V exchange(V x, long timeout, TimeUnit unit) throws InterruptedException, TimeoutException { Object v; Object item = (x == null) ? NULL_ITEM : x; long ns = unit.toNanos(timeout); if ((arena != null || (v = slotExchange(item, true, ns)) == null) && ((Thread.interrupted() || (v = arenaExchange(item, true, ns)) == null))) throw new InterruptedException(); if (v == TIMED_OUT) throw new TimeoutException(); return (v == NULL_ITEM) ? null : (V)v; } private static final sun.misc.Unsafe U; private static final long BOUND; private static final long SLOT; private static final long MATCH; private static final long BLOCKER; private static final int ABASE; static { int s; try { U = sun.misc.Unsafe.getUnsafe(); Class<?> ek = Exchanger.class; Class<?> nk = Node.class; Class<?> ak = Node[].class; Class<?> tk = Thread.class; BOUND = U.objectFieldOffset (ek.getDeclaredField("bound")); SLOT = U.objectFieldOffset (ek.getDeclaredField("slot")); MATCH = U.objectFieldOffset (nk.getDeclaredField("match")); BLOCKER = U.objectFieldOffset (tk.getDeclaredField("parkBlocker")); s = U.arrayIndexScale(ak); ABASE = U.arrayBaseOffset(ak) + (1 << ASHIFT); } catch (Exception e) { throw new Error(e); } if ((s & (s-1)) != 0 || s > (1 << ASHIFT)) throw new Error("Unsupported array scale"); } }
類 Exchanger<V>
型別引數:
V
- 可以交換的物件型別
每個執行緒將條目上的某個方法呈現給exchange()
方法,與夥伴執行緒進行匹配,並且在返回時接收其夥伴的物件。
用法示例:使用 Exchanger
線上程間交換緩衝區。在需要時,填充緩衝區的執行緒獲取一個新騰空的緩衝區,並將填滿的緩衝區傳遞給騰空緩衝區的執行緒。
class FillAndEmpty { Exchanger<DataBuffer> exchanger = new Exchanger<DataBuffer>(); DataBuffer initialEmptyBuffer = ... DataBuffer initialFullBuffer = ... class FillingLoop implements Runnable { public void run() { DataBuffer currentBuffer = initialEmptyBuffer; try { while (currentBuffer != null) { addToBuffer(currentBuffer); if (currentBuffer.isFull()) currentBuffer = exchanger.exchange(currentBuffer); } } catch (InterruptedException ex) { ... handle ... } } } class EmptyingLoop implements Runnable { public void run() { DataBuffer currentBuffer = initialFullBuffer; try { while (currentBuffer != null) { takeFromBuffer(currentBuffer); if (currentBuffer.isEmpty()) currentBuffer = exchanger.exchange(currentBuffer); } } catch (InterruptedException ex) { ... handle ...} } } void start() { new Thread(new FillingLoop()).start(); new Thread(new EmptyingLoop()).start(); } }
構造方法摘要
Exchanger() 建立一個新的 Exchanger。 |
方法摘要
V |
exchange(V x) 等待另一個執行緒到達此交換點(除非當前執行緒被中斷),然後將給定的物件傳送給該執行緒,並接收該執行緒的物件。 |
V |
exchange(V x, long timeout, TimeUnit unit) 等待另一個執行緒到達此交換點(除非當前執行緒被中斷,或者超出了指定的等待時間),然後將給定的物件傳送給該執行緒,同時接收該執行緒的物件。 |
Exchanger
public Exchanger()
建立一個新的 Exchanger。
exchange
public V exchange(V x) throws InterruptedException
等待另一個執行緒到達此交換點(除非當前執行緒被 中斷),然後將給定的物件傳送給該執行緒,並接收該執行緒的物件。
如果另一個執行緒已經在交換點等待,則出於執行緒排程目的,繼續執行此執行緒,並接收當前執行緒傳入的物件。當前執行緒立即返回,接收其他執行緒傳遞的交換物件。
如果還沒有其他執行緒在交換點等待,則出於排程目的,禁用當前執行緒,且在發生以下兩種情況之一前,該執行緒將一直處於休眠狀態:
- 其他某個執行緒進入交換點;或者
- 其他某個執行緒
中斷
當前執行緒。
如果當前執行緒:
- 在進入此方法時已經設定了該執行緒的中斷狀態;或者
- 在等待交換時被中斷,
則丟擲 InterruptedException
,並且清除當前執行緒的已中斷狀態。
引數:
x
- 要交換的物件
返回:
另一個執行緒提供的物件
丟擲:
InterruptedException
- 如果當前執行緒在等待時被中斷
exchange
public V exchange(V x, long timeout, TimeUnit unit) throws InterruptedException, TimeoutException
等待另一個執行緒到達此交換點(除非當前執行緒被 中斷,或者超出了指定的等待時間),然後將給定的物件傳送給該執行緒,同時接收該執行緒的物件。
如果另一個執行緒已經在交換點上等待,則出於執行緒排程目的,繼續執行此執行緒,並接收當前執行緒傳入的物件。當前執行緒立即返回,並接收其他執行緒傳遞的交換物件。
如果還沒有其他執行緒在交換點等待,則出於排程目的,禁用當前執行緒,且在發生以下三種情況之一前,該執行緒將一直處於休眠狀態:
- 其他某個執行緒進入交換點;或者
- 其他某個執行緒中斷當前執行緒;或者
- 已超出指定的等待時間。
如果當前執行緒:
- 在進入此方法時已經設定其中斷狀態;或者
- 在等待交換時被中斷,
則丟擲 InterruptedException
,並且清除當前執行緒的已中斷狀態。
如果超出指定的等待時間,則丟擲 TimeoutException
異常。如果該時間小於等於零,則此方法根本不會等待。
引數:
x
- 要交換的物件
timeout
- 要等待的最長時間
unit
- timeout 引數的時間單位
返回:
其他執行緒提供的物件
丟擲:
InterruptedException
- 如果當前執行緒在等待時被中斷
TimeoutException
- 如果在另一個執行緒進入交換點之前已經到達指定的等待時間
使用例項:
執行緒交換各自擁有的值:
package com.thread;
import java.util.concurrent.Exchanger;
public class ExchangerDemo extends Thread {
private Exchanger<String> exchanger;
private String name;
public ExchangerDemo(String name, Exchanger<String> exchanger) {
this.exchanger = exchanger;
this.name = name;
}
public void run() {
try {
System.out.println(Thread.currentThread().getName() + ": " + exchanger.exchange(this.name));
} catch (InterruptedException e) {
e.printStackTrace();
}
}
public static void main(String[] args) {
Exchanger<String> exchanger = new Exchanger<>();
ExchangerDemo exchangerDemo1 = new ExchangerDemo("demo1", exchanger);
exchangerDemo1.setName("exchanger1");
ExchangerDemo exchangerDemo2 = new ExchangerDemo("demo2", exchanger);
exchangerDemo2.setName("exchanger2");
exchangerDemo1.start();
exchangerDemo2.start();
}
}
執行結果:
exchanger1: demo2
exchanger2: demo1
從輸出的值可以看到,兩個執行緒的值已經發生了交換。