1. 程式人生 > >java.util.Concurrent.Exchanger 原始碼

java.util.Concurrent.Exchanger 原始碼

類圖

 

    Exchanger 是一個用於執行緒間協作的工具類,Exchanger用於進行執行緒間的資料交換,它提供一個同步點,在這個同步點,兩個執行緒可以交換彼此的資料。這兩個執行緒通過exchange 方法交換資料,如果第一個執行緒先執行exchange 方法,它會一直等待第二個執行緒也執行exchange 方法,當兩個執行緒都到達同步點時,這兩個執行緒就可以交換資料。

 

原始碼

package java.util.concurrent;

import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.locks.LockSupport;


public class Exchanger<V> {

    private static final int ASHIFT = 7;

    private static final int MMASK = 0xff;

    private static final int SEQ = MMASK + 1;

    private static final int NCPU = Runtime.getRuntime().availableProcessors();

    static final int FULL = (NCPU >= (MMASK << 1)) ? MMASK : NCPU >>> 1;

    private static final int SPINS = 1 << 10;// 自旋次數

    private static final Object NULL_ITEM = new Object();//如果交換的資料為null,則用NULL_ITEM代替

    private static final Object TIMED_OUT = new Object();

    private final Participant participant;//每個執行緒的資料,ThreadLocal 子類

    private volatile Node[] arena;

    private volatile Node slot;// 用於交換資料的槽位

    private volatile int bound;


    @sun.misc.Contended static final class Node {
       int index;              //arena的下標,多個槽位的時候利用
       int bound;              // 上一次記錄的Exchanger.bound;
       int collides;           // 在當前bound下CAS失敗的次數;
       int hash;               // 用於自旋;
       Object item;            // 這個執行緒的當前項,也就是需要交換的資料;
       volatile Object match;  // 交換的資料
       volatile Thread parked; // 執行緒
    }


    static final class Participant extends ThreadLocal<Node> {
        // 初始值返回Node
        public Node initialValue() {
           return new Node();
        }
    }

    private final Object arenaExchange(Object item, boolean timed, long ns) {
        // 槽位陣列
        Node[] a = arena;
        //代表當前執行緒的Node
        Node p = participant.get(); // p.index 初始值為 0
        for (int i = p.index; ; ) {                      // access slot at i
            int b, m, c;
            long j;                       // j is raw array offset
            //在槽位陣列中根據"索引" i 取出資料 j相當於是 "第一個"槽位
            Node q = (Node) U.getObjectVolatile(a, j = (i << ASHIFT) + ABASE);
            // 該位置上有資料(即有執行緒在這裡等待交換資料)
            if (q != null && U.compareAndSwapObject(a, j, q, null)) {
                // 進行資料交換,這裡和單槽位的交換是一樣的
                Object v = q.item;                     // release
                q.match = item;
                Thread w = q.parked;
                if (w != null)
                    U.unpark(w);
                return v;
            }
            // bound 是最大的有效的 位置,和MMASK相與,得到真正的儲存資料的索引最大值
            else if (i <= (m = (b = bound) & MMASK) && q == null) {
                // i 在這個範圍內,該槽位也為空

                //將需要交換的資料 設定給p
                p.item = item;                         // offer
                //設定該槽位資料(在該槽位等待其它執行緒來交換資料)
                if (U.compareAndSwapObject(a, j, null, p)) {
                    long end = (timed && m == 0) ? System.nanoTime() + ns : 0L;
                    Thread t = Thread.currentThread(); // wait
                    // 進行一定時間的自旋
                    for (int h = p.hash, spins = SPINS; ; ) {
                        Object v = p.match;
                        //在自旋的過程中,有執行緒來和該執行緒交換資料
                        if (v != null) {
                            //交換資料後,清空部分設定,返回交換得到的資料,over
                            U.putOrderedObject(p, MATCH, null);
                            p.item = null;             // clear for next use
                            p.hash = h;
                            return v;
                        } else if (spins > 0) {
                            h ^= h << 1;
                            h ^= h >>> 3;
                            h ^= h << 10; // xorshift
                            if (h == 0)                // initialize hash
                                h = SPINS | (int) t.getId();
                            else if (h < 0 &&          // approx 50% true
                                    (--spins & ((SPINS >>> 1) - 1)) == 0)
                                Thread.yield();        // two yields per wait
                        }
                        // 交換資料的執行緒到來,但是還沒有設定好match,再稍等一會
                        else if (U.getObjectVolatile(a, j) != p)
                            spins = SPINS;
                            //符合條件,特別注意m==0 這個說明已經到達area 中最小的儲存資料槽位了
                            //沒有其他執行緒在槽位等待了,所有當前執行緒需要阻塞在這裡     
                        else if (!t.isInterrupted() && m == 0 &&
                                (!timed ||
                                        (ns = end - System.nanoTime()) > 0L)) {
                            U.putObject(t, BLOCKER, this); // emulate LockSupport
                            p.parked = t;              // minimize window
                            // 再次檢查槽位,看看在阻塞前,有沒有執行緒來交換資料
                            if (U.getObjectVolatile(a, j) == p)
                                U.park(false, ns); // 掛起
                            p.parked = null;
                            U.putObject(t, BLOCKER, null);
                        }
                        // 當前這個槽位一直沒有執行緒來交換資料,準備換個槽位試試
                        else if (U.getObjectVolatile(a, j) == p &&
                                U.compareAndSwapObject(a, j, p, null)) {
                            //更新bound
                            if (m != 0)                // try to shrink
                                U.compareAndSwapInt(this, BOUND, b, b + SEQ - 1);
                            p.item = null;
                            p.hash = h;
                            // 減小索引值 往"第一個"槽位的方向挪動
                            i = p.index >>>= 1;        // descend
                            // 傳送中斷,返回null
                            if (Thread.interrupted())
                                return null;
                            // 超時
                            if (timed && m == 0 && ns <= 0L)
                                return TIMED_OUT;
                            break;                     // expired; restart 繼續主迴圈
                        }
                    }
                } else
                    //佔據槽位失敗,先清空item,防止成功交換資料後,p.item還引用著item
                    p.item = null;                     // clear offer
            } else { // i 不在有效範圍,或者被其它執行緒搶先了
                //更新p.bound
                if (p.bound != b) {                    // stale; reset
                    p.bound = b;
                    //新bound ,重置collides
                    p.collides = 0;
                    //i如果達到了最大,那麼就遞減
                    i = (i != m || m == 0) ? m : m - 1;
                } else if ((c = p.collides) < m || m == FULL ||
                        !U.compareAndSwapInt(this, BOUND, b, b + SEQ + 1)) {
                    p.collides = c + 1; // 更新衝突
                    // i=0 那麼就從m開始,否則遞減i
                    i = (i == 0) ? m : i - 1;          // cyclically traverse
                } else
                    //遞增,往後挪動
                    i = m + 1;                         // grow
                // 更新index
                p.index = i;
            }
        }
    }


    private final Object slotExchange(Object item, boolean timed, long ns) {
       // 得到一個初試的Node
       Node p = participant.get();
       // 當前執行緒
       Thread t = Thread.currentThread();
       // 如果發生中斷,返回null,會重設中斷標誌位,並沒有直接拋異常
       if (t.isInterrupted()) // preserve interrupt status so caller can recheck
          return null;

       for (Node q;;) {
          // 槽位 solt不為null,則說明已經有執行緒在這裡等待交換資料了
          if ((q = slot) != null) {
             // 重置槽位
             if (U.compareAndSwapObject(this, SLOT, q, null)) {
                //獲取交換的資料
                Object v = q.item;
                //等待執行緒需要的資料
                q.match = item;
                //等待執行緒
                Thread w = q.parked;
                //喚醒等待的執行緒
                if (w != null)
                    U.unpark(w);
                return v; // 返回拿到的資料,交換完成
             }
             // create arena on contention, but continue until slot null
             //存在競爭,其它執行緒搶先了一步該執行緒,因此需要採用多槽位模式,這個後面再分析
             if (NCPU > 1 && bound == 0 &&
                U.compareAndSwapInt(this, BOUND, 0, SEQ))
                arena = new Node[(FULL + 2) << ASHIFT];
          }else if (arena != null) //多槽位不為空,需要執行多槽位交換
             return null; // caller must reroute to arenaExchange
          else { //還沒有其他執行緒來佔據槽位
             p.item = item;
             // 設定槽位為p(也就是槽位被當前執行緒佔據)
             if (U.compareAndSwapObject(this, SLOT, null, p))
                break; // 退出無限迴圈
             p.item = null; // 如果設定槽位失敗,則有可能其他執行緒搶先了,重置item,重新迴圈
          }
       }

       //當前執行緒佔據槽位,等待其它執行緒來交換資料
       int h = p.hash;
       long end = timed ? System.nanoTime() + ns : 0L;
       int spins = (NCPU > 1) ? SPINS : 1;
       Object v;
       // 直到成功交換到資料
       while ((v = p.match) == null) {
          if (spins > 0) { // 自旋
             h ^= h << 1; h ^= h >>> 3; h ^= h << 10;
             if (h == 0)
                h = SPINS | (int)t.getId();
             else if (h < 0 && (--spins & ((SPINS >>> 1) - 1)) == 0)
                // 主動讓出cpu,這樣可以提供cpu利用率(反正當前執行緒也自旋等待,還不如讓其它任務佔用cpu)
                Thread.yield(); 
          }
          else if (slot != p) //其它執行緒來交換資料了,修改了solt,但是還沒有設定match,再稍等一會
             spins = SPINS;
          //需要阻塞等待其它執行緒來交換資料
          //沒發生中斷,並且是單槽交換,沒有設定超時或者超時時間未到 則繼續執行
          else if (!t.isInterrupted() && arena == null &&
                 (!timed || (ns = end - System.nanoTime()) > 0L)) {
             // cas 設定BLOCKER,可以參考Thread 中的parkBlocker
             U.putObject(t, BLOCKER, this);
             // 需要掛起當前執行緒
             p.parked = t;
             if (slot == p)
                U.park(false, ns); // 阻塞當前執行緒
             // 被喚醒後    
             p.parked = null;
             // 清空 BLOCKER
             U.putObject(t, BLOCKER, null);
          }
          // 不滿足前面 else if 條件,交換失敗,需要重置solt
          else if (U.compareAndSwapObject(this, SLOT, p, null)) {
             v = timed && ns <= 0L && !t.isInterrupted() ? TIMED_OUT : null;
             break;
          }
       }
       //清空match
       U.putOrderedObject(p, MATCH, null);
       p.item = null;
       p.hash = h;
       // 返回交換得到的資料(失敗則為null)
       return v;
    }


    public Exchanger() {
        participant = new Participant();
    }

    public V exchange(V x) throws InterruptedException {
        Object v;
        Object item = (x == null) ? NULL_ITEM : x;
        if ((arena != null ||
             (v = slotExchange(item, false, 0L)) == null) &&
            ((Thread.interrupted() ||
              (v = arenaExchange(item, false, 0L)) == null)))
            throw new InterruptedException();
        return (v == NULL_ITEM) ? null : (V)v;
    }


    public V exchange(V x, long timeout, TimeUnit unit)
        throws InterruptedException, TimeoutException {
        Object v;
        Object item = (x == null) ? NULL_ITEM : x;
        long ns = unit.toNanos(timeout);
        if ((arena != null ||
             (v = slotExchange(item, true, ns)) == null) &&
            ((Thread.interrupted() ||
              (v = arenaExchange(item, true, ns)) == null)))
            throw new InterruptedException();
        if (v == TIMED_OUT)
            throw new TimeoutException();
        return (v == NULL_ITEM) ? null : (V)v;
    }

    
    private static final sun.misc.Unsafe U;
    private static final long BOUND;
    private static final long SLOT;
    private static final long MATCH;
    private static final long BLOCKER;
    private static final int ABASE;
    static {
        int s;
        try {
            U = sun.misc.Unsafe.getUnsafe();
            Class<?> ek = Exchanger.class;
            Class<?> nk = Node.class;
            Class<?> ak = Node[].class;
            Class<?> tk = Thread.class;
            BOUND = U.objectFieldOffset
                (ek.getDeclaredField("bound"));
            SLOT = U.objectFieldOffset
                (ek.getDeclaredField("slot"));
            MATCH = U.objectFieldOffset
                (nk.getDeclaredField("match"));
            BLOCKER = U.objectFieldOffset
                (tk.getDeclaredField("parkBlocker"));
            s = U.arrayIndexScale(ak);
            
            ABASE = U.arrayBaseOffset(ak) + (1 << ASHIFT);

        } catch (Exception e) {
            throw new Error(e);
        }
        if ((s & (s-1)) != 0 || s > (1 << ASHIFT))
            throw new Error("Unsupported array scale");
    }
}

類 Exchanger<V>

    型別引數:

    V - 可以交換的物件型別

    每個執行緒將條目上的某個方法呈現給exchange()方法,與夥伴執行緒進行匹配,並且在返回時接收其夥伴的物件。

    用法示例:使用 Exchanger 線上程間交換緩衝區。在需要時,填充緩衝區的執行緒獲取一個新騰空的緩衝區,並將填滿的緩衝區傳遞給騰空緩衝區的執行緒。

class FillAndEmpty {
   Exchanger<DataBuffer> exchanger = new Exchanger<DataBuffer>();

   DataBuffer initialEmptyBuffer = ...
   DataBuffer initialFullBuffer = ...

   class FillingLoop implements Runnable {
     public void run() {
       DataBuffer currentBuffer = initialEmptyBuffer;
       try {
         while (currentBuffer != null) {
           addToBuffer(currentBuffer);
           if (currentBuffer.isFull())
             currentBuffer = exchanger.exchange(currentBuffer);
         }
       } catch (InterruptedException ex) { ... handle ... }
     }
   }

   class EmptyingLoop implements Runnable {
     public void run() {
       DataBuffer currentBuffer = initialFullBuffer;
       try {
         while (currentBuffer != null) {
           takeFromBuffer(currentBuffer);
           if (currentBuffer.isEmpty())
             currentBuffer = exchanger.exchange(currentBuffer);
         }
       } catch (InterruptedException ex) { ... handle ...}
     }
   }

   void start() {
     new Thread(new FillingLoop()).start();
     new Thread(new EmptyingLoop()).start();
   }
}

 

構造方法摘要

Exchanger() 
          建立一個新的 Exchanger。

 

方法摘要

 V exchange(V x) 
          等待另一個執行緒到達此交換點(除非當前執行緒被中斷),然後將給定的物件傳送給該執行緒,並接收該執行緒的物件。
 V exchange(V x, long timeout, TimeUnit unit) 
          等待另一個執行緒到達此交換點(除非當前執行緒被中斷,或者超出了指定的等待時間),然後將給定的物件傳送給該執行緒,同時接收該執行緒的物件。

  

Exchanger

public Exchanger()

建立一個新的 Exchanger。

 

exchange

public V exchange(V x) throws InterruptedException

    等待另一個執行緒到達此交換點(除非當前執行緒被 中斷),然後將給定的物件傳送給該執行緒,並接收該執行緒的物件。

    如果另一個執行緒已經在交換點等待,則出於執行緒排程目的,繼續執行此執行緒,並接收當前執行緒傳入的物件。當前執行緒立即返回,接收其他執行緒傳遞的交換物件。

    如果還沒有其他執行緒在交換點等待,則出於排程目的,禁用當前執行緒,且在發生以下兩種情況之一前,該執行緒將一直處於休眠狀態:

  • 其他某個執行緒進入交換點;或者
  • 其他某個執行緒中斷當前執行緒。

    如果當前執行緒:

  • 在進入此方法時已經設定了該執行緒的中斷狀態;或者
  • 在等待交換時被中斷

    則丟擲 InterruptedException,並且清除當前執行緒的已中斷狀態。

    引數:

    x - 要交換的物件

    返回:

        另一個執行緒提供的物件

    丟擲:

    InterruptedException - 如果當前執行緒在等待時被中斷

 

exchange

public V exchange(V x, long timeout, TimeUnit unit) throws InterruptedException, TimeoutException

    等待另一個執行緒到達此交換點(除非當前執行緒被 中斷,或者超出了指定的等待時間),然後將給定的物件傳送給該執行緒,同時接收該執行緒的物件。

    如果另一個執行緒已經在交換點上等待,則出於執行緒排程目的,繼續執行此執行緒,並接收當前執行緒傳入的物件。當前執行緒立即返回,並接收其他執行緒傳遞的交換物件。

    如果還沒有其他執行緒在交換點等待,則出於排程目的,禁用當前執行緒,且在發生以下三種情況之一前,該執行緒將一直處於休眠狀態:

  • 其他某個執行緒進入交換點;或者
  • 其他某個執行緒中斷當前執行緒;或者
  • 已超出指定的等待時間。

    如果當前執行緒:

  • 在進入此方法時已經設定其中斷狀態;或者
  • 在等待交換時被中斷

    則丟擲 InterruptedException,並且清除當前執行緒的已中斷狀態。

    如果超出指定的等待時間,則丟擲 TimeoutException 異常。如果該時間小於等於零,則此方法根本不會等待。

    引數:

    x - 要交換的物件

    timeout - 要等待的最長時間

    unit - timeout 引數的時間單位

    返回:

        其他執行緒提供的物件

    丟擲:

    InterruptedException - 如果當前執行緒在等待時被中斷

    TimeoutException - 如果在另一個執行緒進入交換點之前已經到達指定的等待時間

使用例項:

    執行緒交換各自擁有的值:

package com.thread;

import java.util.concurrent.Exchanger;

public class ExchangerDemo extends Thread {
    private Exchanger<String> exchanger;
    private String name;

    public ExchangerDemo(String name, Exchanger<String> exchanger) {
        this.exchanger = exchanger;
        this.name = name;
    }

    public void run() {
        try {
            System.out.println(Thread.currentThread().getName() + ": " + exchanger.exchange(this.name));
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }

    public static void main(String[] args) {
        Exchanger<String> exchanger = new Exchanger<>();
        ExchangerDemo exchangerDemo1 = new ExchangerDemo("demo1", exchanger);
        exchangerDemo1.setName("exchanger1");
        ExchangerDemo exchangerDemo2 = new ExchangerDemo("demo2", exchanger);
        exchangerDemo2.setName("exchanger2");
        exchangerDemo1.start();
        exchangerDemo2.start();
    }
}

    執行結果:

exchanger1: demo2
exchanger2: demo1

    從輸出的值可以看到,兩個執行緒的值已經發生了交換。