1. 程式人生 > >使用CompletableFuture構建非同步應用(二)

使用CompletableFuture構建非同步應用(二)

原文地址:https://blog.csdn.net/wangdong5678999/article/details/81842451

本文主要介紹Java 8 中的非同步處理的方式,主要是 CompletableFuture類的一些特性。 
為了展示CompletableFuture的強大特性,我們會建立一個名為“最佳價格查詢器” (best-price-finder)的應用,它會查詢多個線上商店,依據給定的產品或服務找出最低的價格。這個過程中,你會學到幾個重要的技能。

  • 首先,你會學到如何為你的客戶提供非同步API。(如果你擁有一間線上商店的話,這是非常有幫助的)。
  • 其次,你會掌握如何讓你使用了同步API的程式碼變為非阻塞程式碼。你會了解如何使用流水線將兩個接續的非同步操作合併為一個非同步計算操作。這種情況肯定會出現,比如,線上 商店返回了你想要購買商品的原始價格,並附帶著一個折扣程式碼——最終,要計算出該 商品的實際價格,你不得不訪問第二個遠端折扣服務,查詢該折扣程式碼對應的折扣比率。
  • 你還會學到如何以響應式的方式處理非同步操作的完成事件,以及隨著各個商店返回它的 商品價格,最佳價格查詢器如何持續地更新每種商品的最佳推薦,而不是等待所有的商店都返回他們各自的價格(這種方式存在著一定的風險,一旦某家商店的服務中斷,用 戶可能遭遇白屏)。

一、獲取商品價格的同步方法

import java.util.Random;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Future;

public class Shop {

    private final String name;
    private final Random random;

    public Shop(String name) {
        this.name = name;
        random = new Random(name.charAt(0) * name.charAt(1) * name.charAt(2));
    }

    /**
     * 獲取產品價格的同步方法
     * @param product 產品名稱
     * @return 產品價格
     */
    public double getPrice(String product) {
        return calculatePrice(product);
    }

    private double calculatePrice(String product) {
        //一個模擬的延遲方法
        delay();
        return random.nextDouble() * product.charAt(0) + product.charAt(1);
    }

    public static void delay() {
        int delay = 1000;
        //int delay = 500 + RANDOM.nextInt(2000);
        try {
            Thread.sleep(delay);
        } catch (InterruptedException e) {
            throw new RuntimeException(e);
        }
    }

    public String getName() {
        return name;
    }
}

很明顯,這個API的使用者(這個例子中為最佳價格查詢器)呼叫該方法時,它依舊會被阻塞。為等待同步事件完成而等待1秒鐘,這是無法接受的,尤其是考慮到最佳價格查詢器對 網路中的所有商店都要重複這種操作。在本文的下個小節中,你會了解如何以非同步方式使用同 步API解決這個問題。

二、將同步方法轉換為非同步方法

我們使用新的CompletableFuture類來將getPrice方法轉換為非同步的getPriceAsync方法。

/**
 * 非同步的獲取產品價格
 *
 * @param product 產品名
 * @return 最終價格
 */
 public Future<Double> getPriceAsync(String product) {
    //建立CompletableFuture 物件,它會包含計算的結果
    CompletableFuture<Double> futurePrice = new CompletableFuture<>();
    //在另一個執行緒中以非同步方式執行計算
    new Thread(() -> {
        double price = calculatePrice(product);
        //需長時間計算的任務結 束並得出結果時,設定 Future的返回值
        futurePrice.complete(price);
    }).start();
    // 無需等待還沒結束的計算,直接返回Future物件
    return futurePrice;
}

在這段程式碼中,你建立了一個代表非同步計算的CompletableFuture物件例項,它在計算完 成時會包含計算的結果。 
使用這個API的客戶端,可以通過下面的這段 程式碼對其進行呼叫。

import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;

public class ShopMain {

  public static void main(String[] args) {
    Shop shop = new Shop("BestShop");
    long start = System.nanoTime();
    //查詢商店,試圖 取得商品的價格
    Future<Double> futurePrice = shop.getPriceAsync("my favorite product");
    long invocationTime = ((System.nanoTime() - start) / 1_000_000);
    System.out.println("Invocation returned after " + invocationTime 
                                                    + " msecs");
    // 執行更多工,比如查詢其他商店
    doSomethingElse();
    // 在計算商品價格的同時
    try {
        //從Future物件中讀 取價格,如果價格 未知,會發生阻塞
        double price = futurePrice.get();
        System.out.printf("Price is %.2f%n", price);
    } catch (ExecutionException | InterruptedException e) {
        throw new RuntimeException(e);
    }
    long retrievalTime = ((System.nanoTime() - start) / 1_000_000);
    System.out.println("Price returned after " + retrievalTime + " msecs");
  }

  private static void doSomethingElse() {
      System.out.println("Doing something else...");
  }
}
Output: 
Invocation returned after 43 msecs 
Price is 123.26 
Price returned after 1045 msecs

你會發現getPriceAsync方法的呼叫返回遠遠早於最終價格計算完成的時間。接下來我們看看如何正確地管理 非同步任務執行過程中可能出現的錯誤。

三、錯誤處理

如果沒有意外,我們目前開發的程式碼工作得很正常。但是,如果價格計算過程中產生了錯誤 會怎樣呢?非常不幸,這種情況下你會得到一個相當糟糕的結果:用於提示錯誤的異常會被限制 在試圖計算商品價格的當前執行緒的範圍內,最終會殺死該執行緒,而這會導致等待get方法返回結 果的客戶端永久地被阻塞。 
解決這種問題的方法有兩種:

  1. 客戶端可以使用過載版本的get方法,它使用一個超時引數來避免發生這樣的情況。
  2. 通過非同步處理中發生的異常,根據不同的異常型別來進行不同的處理。

為了讓客戶端能瞭解商店無法提供請求商品價格的原因,你需要使用 CompletableFuture的completeExceptionally方法將導致CompletableFuture內發生問 題的異常丟擲。程式碼如下所示:

/**
 * 丟擲CompletableFuture內的異常版本的getPriceAsyncForException方法
 *
 * @param product 產品名
 * @return 最終價格
 */
public Future<Double> getPriceAsyncForException(String product) {
	CompletableFuture<Double> futurePrice = new CompletableFuture<>();
	new Thread(() -> {
		try {
			double price = calculatePrice(product);
			//如果價格計算正常結束,完成Future操作並設定商品價格
			futurePrice.complete(price);
		} catch (Exception ex) {
			//否則就丟擲導致失敗的異常,完成這 次Future操作
			futurePrice.completeExceptionally(ex);
		}

	}).start();
	return futurePrice;
}

如果該方法丟擲了一個運 行時異常“product not available”,客戶端就會得到像下面這樣一段ExecutionException:

java.util.concurrent.ExecutionException: java.lang.RuntimeException: product not available
 at java.util.concurrent.CompletableFuture.get(CompletableFuture.java:2237)
 at lambdasinaction.chap11.AsyncShopClient.main(AsyncShopClient.java:14)
        ... 5 more
    Caused by: java.lang.RuntimeException: product not available
at lambdasinaction.chap11.AsyncShop.calculatePrice(AsyncShop.java:36)
at lambdasinaction.chap11.AsyncShop.lambda$getPrice$0(AsyncShop.java:23) at lambdasinaction.chap11.AsyncShop$$Lambda$1/24071475.run(Unknown Source) at java.lang.Thread.run(Thread.java:744)

目前為止我們已經瞭解瞭如何通過程式設計建立CompletableFuture物件以及如何獲取返回值了。