1. 程式人生 > >JUC——線程同步輔助工具類(Exchanger,CompletableFuture)

JUC——線程同步輔助工具類(Exchanger,CompletableFuture)

UC syn imp unit change new ace 負責 工具類

Exchanger交換空間

如果現在有兩個線程,一個線程負責生產數據,另外一個線程負責消費數據,那麽這個兩個線程之間一定會存在一個公共的區域,那麽這個區域的實現在JUC包之中稱為Exchanger

java.util.concurrent.Exchanger類表示一種兩個線程可以進行互相交換對象的匯合點。

技術分享圖片

Exchanger類中定義的方法如下:

  • 構造方法:
    pubilc Exchanger();  //創建一個對象
  • 設置與取得:
    public V exchange(V x) throws InterruptedException

範例:使用Exchanger實現交換處理

package
so.strong.mall.concurrent; import java.util.concurrent.Exchanger; import java.util.concurrent.TimeUnit; public class ExchanerDemo { public static void main(String[] args) { final Exchanger<String> exchanger = new Exchanger<>(); //準備一個交換空間 for (int i = 0; i < 3; i++) { //
3個消費者 new Thread(new Runnable() { @Override public void run() { while (true) { try { String data = exchanger.exchange(null); TimeUnit.SECONDS.sleep(2);
if (data != null) { System.out.println("[" + Thread.currentThread().getName() + "]取得數據:" + data); } } catch (Exception e) { e.printStackTrace(); } } } }, "消費者-" + i).start(); } for (int i = 0; i < 2; i++) { //2個生產者 final int temp = i; new Thread(new Runnable() { @Override public void run() { for (int j = 0; j < 2; j++) { String data = "iTermis-" + temp + "-" + j; try { TimeUnit.SECONDS.sleep(2); //讓生產者節奏放慢 exchanger.exchange(data); System.out.println("[" + Thread.currentThread().getName() + "]生產了數據:" + data); } catch (Exception e) { e.printStackTrace(); } } } }, "生產者-" + i).start(); } } }
[生產者-1]生產了數據:iTermis-1-0
[生產者-1]生產了數據:iTermis-1-1
[消費者-1]取得數據:iTermis-1-0
[生產者-0]生產了數據:iTermis-0-0
[生產者-0]生產了數據:iTermis-0-1
[消費者-2]取得數據:iTermis-0-1

  

CompletableFuture線程回調

現在設想一個場景,例如:使用炮兵轟炸某一目標

技術分享圖片

所有的執行線程在接收到命令之前都要進入到阻塞狀態之中,一直到接收到具體的命令之後才會執行下一步操作處理。

java.util.concurrent.CompletableFutureJava8中添加的一個類,該類的主要作用是提供了新的方式來完成異步處理,包括合成和組合事件的非阻塞方式。

技術分享圖片

CompletableFuture類中有如下的方法:

  • 構造方法:
    public CompletableFuture();
  • 獲取命令:
    public T get() throws InterruptedException,ExecutionException  

範例:使用CompletableFuture實現炮兵轟炸操作

package com.itermis.concurrent;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;

public class CompletableFutureDemo { public static void main(String[] args) throws Exception { CompletableFuture<String> future = new CompletableFuture<>(); for (int i = 0; i < 4; i++) { new Thread(() -> { System.out.println("BEFORE[" + Thread.currentThread().getName() + "]進入炮兵陣地,等待命令,準備開火。"); try { String cmd = future.get(); //接收命令 if ("fire".equals(cmd)) { System.out.println("AFTER[" + Thread.currentThread().getName() + "]接收到命令,立刻開火,幹死那個死胖子。。"); } if ("cancel".equals(cmd)) { System.out.println("AFTER[" + Thread.currentThread().getName() + "]收到撤退命令,回家睡覺。。"); } } catch (Exception e) { e.printStackTrace(); } }, "炮兵-" + i).start(); } TimeUnit.SECONDS.sleep(3); //等待3秒鐘 future.complete("cancel"); //給出了執行命令 } }
BEFORE[炮兵-1]進入炮兵陣地,等待命令,準備開火。
BEFORE[炮兵-0]進入炮兵陣地,等待命令,準備開火。
BEFORE[炮兵-2]進入炮兵陣地,等待命令,準備開火。
BEFORE[炮兵-3]進入炮兵陣地,等待命令,準備開火。
//sleep 3 秒
AFTER[炮兵-1]收到撤退命令,回家睡覺。。
AFTER[炮兵-0]收到撤退命令,回家睡覺。。
AFTER[炮兵-2]收到撤退命令,回家睡覺。。
AFTER[炮兵-3]收到撤退命令,回家睡覺。。

該類的處理主要是建立在Future線程模型的基礎之上的實現操作。

對於本類而言,除了以上的使用方式之外還可以采用異步的線程執行方式處理。在創建CompletableFuture類對象的時候還可以使用這個類之中提供的一種靜態方法:

public static CompletableFuture<Void> runAsync(Runnable runnable)

範例:更換實現方式實現上述轟炸操作:

package com.itermis.concurrent;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;

public class CompletableFutureDemoII {
    public static void main(String[] args) {
        CompletableFuture<Void> future = CompletableFuture.runAsync(() -> {
            System.out.println("[FUTURE]將軍正在溫柔鄉裏美夢了,等著將軍睡醒開炮..");
            try {
                TimeUnit.SECONDS.sleep(3);
            } catch (Exception e) {
                e.printStackTrace();
            }
            System.out.println("[FUTURE]將軍醒了,開始幹活了..");
        });
        for (int i = 0; i < 4; i++) {
            new Thread(() -> {
                System.out.println("BEFORE[" + Thread.currentThread().getName() + "]進入炮兵陣地,等待命令,準備開火。");
                try {
                    System.out.println("AFTER[" + Thread.currentThread().getName() + "]接收到命令,立刻開火,幹死那個死胖子。。" + future.get());
                } catch (Exception e) {
                    e.printStackTrace();
                }
            }, "炮兵-" + i).start();
        }
    }
}
[FUTURE]將軍正在溫柔鄉裏美夢了,等著將軍睡醒開炮..
BEFORE[炮兵-1]進入炮兵陣地,等待命令,準備開火。
BEFORE[炮兵-0]進入炮兵陣地,等待命令,準備開火。
BEFORE[炮兵-2]進入炮兵陣地,等待命令,準備開火。
BEFORE[炮兵-3]進入炮兵陣地,等待命令,準備開火。
// sleep 3秒
[FUTURE]將軍醒了,開始幹活了..
AFTER[炮兵-2]接收到命令,立刻開火,幹死那個死胖子。。null
AFTER[炮兵-0]接收到命令,立刻開火,幹死那個死胖子。。null
AFTER[炮兵-3]接收到命令,立刻開火,幹死那個死胖子。。null
AFTER[炮兵-1]接收到命令,立刻開火,幹死那個死胖子。。null

CompletableFuture這個類最大的好處是提供有所有等待線程的執行觸發點。

JUC——線程同步輔助工具類(Exchanger,CompletableFuture)