CompletionService:批量執行非同步任務
技術標籤:Java併發性和多執行緒
我們思考下這個場景:從三個電商詢價,然後儲存在自己的資料庫裡。通過之前所學,我們可能這麼實現。
// 建立執行緒池 ExecutorService executor = Executors.newFixedThreadPool(3); // 異步向電商 S1 詢價 Future<Integer> f1 = executor.submit( ()->getPriceByS1()); // 異步向電商 S2 詢價 Future<Integer> f2 = executor.submit( ()->getPriceByS2()); // 異步向電商 S3 詢價 Future<Integer> f3 = executor.submit( ()->getPriceByS3()); // 獲取電商 S1 報價並儲存 r=f1.get(); executor.execute(()->save(r)); // 獲取電商 S2 報價並儲存 r=f2.get(); executor.execute(()->save(r)); // 獲取電商 S3 報價並儲存 r=f3.get(); executor.execute(()->save(r));
上面的這個方案本身沒有太大問題,但是有個地方的處理需要你注意,那就是如果獲取電商 S1 報價的耗時很長,那麼即便獲取電商 S2 報價的耗時很短,也無法讓儲存 S2 報價的操作先執行,因為這個主執行緒都阻塞在了f1.get()
,那我們如何解決了?
我們可以增加一個阻塞佇列,獲取到 S1、S2、S3 的報價都進入阻塞佇列,然後在主執行緒中消費阻塞佇列,這樣就能保證先獲取到的報價先儲存到資料庫了。下面的示例程式碼展示瞭如何利用阻塞佇列實現先獲取到的報價先儲存到資料庫。
// 建立阻塞佇列 BlockingQueue<Integer> bq = new LinkedBlockingQueue<>(); // 電商 S1 報價非同步進入阻塞佇列 executor.execute(()-> bq.put(f1.get())); // 電商 S2 報價非同步進入阻塞佇列 executor.execute(()-> bq.put(f2.get())); // 電商 S3 報價非同步進入阻塞佇列 executor.execute(()-> bq.put(f3.get())); // 非同步儲存所有報價 for (int i=0; i<3; i++) { Integer r = bq.take(); executor.execute(()->save(r)); }
利用 CompletionService 實現詢價系統
不過在實際專案中,並不建議你這樣做,因為 Java SDK 併發包裡已經提供了設計精良的 CompletionService。利用 CompletionService 能讓程式碼更簡練。
CompletionService 的實現原理也是內部維護了一個阻塞佇列,當任務執行結束就把任務的執行結果加入到阻塞佇列中,不同的是 CompletionService 是把任務執行結果的 Future 物件加入到阻塞佇列中,而上面的示例程式碼是把任務最終的執行結果放入了阻塞佇列中。
那到底該如何建立 CompletionService 呢?
CompletionService 介面的實現類是 ExecutorCompletionService,這個實現類的構造方法有兩個,分別是:
- ExecutorCompletionService(Executor executor)
- ExecutorCompletionService(Executor executor, BlockingQueue<Future<V>> completionQueue)
這兩個構造方法都需要傳入一個執行緒池,如果不指定 completionQueue,那麼預設會使用無界的 LinkedBlockingQueue。任務執行結果的 Future 物件就是加入到 completionQueue 中。
下面的示例程式碼完整地展示瞭如何利用 CompletionService 來實現高效能的詢價系統。其中,我們沒有指定 completionQueue,之後通過 CompletionService 介面提供的 submit() 方法提交了三個詢價操作,這三個詢價操作將會被 CompletionService 非同步執行。最後,我們通過 CompletionService 介面提供的 take() 方法獲取一個 Future 物件,呼叫 Future 物件的 get() 方法就能返回詢價操作的執行結果了。
// 建立執行緒池
ExecutorService executor =
Executors.newFixedThreadPool(3);
// 建立 CompletionService
CompletionService<Integer> cs = new
ExecutorCompletionService<>(executor);
// 異步向電商 S1 詢價
cs.submit(()->getPriceByS1());
// 異步向電商 S2 詢價
cs.submit(()->getPriceByS2());
// 異步向電商 S3 詢價
cs.submit(()->getPriceByS3());
// 將詢價結果非同步儲存到資料庫
for (int i=0; i<3; i++) {
Integer r = cs.take().get();
executor.execute(()->save(r));
}
CompletionService 介面說明
下面我們詳細地介紹一下 CompletionService 介面提供的方法,CompletionService 介面提供的方法有 5 個,這 5 個方法的方法簽名如下所示。
Future<V> submit(Callable<V> task);
Future<V> submit(Runnable task, V result);
Future<V> take()
throws InterruptedException;
Future<V> poll();
Future<V> poll(long timeout, TimeUnit unit)
throws InterruptedException;
CompletionService 後3 個方法,都是和阻塞佇列相關的,take()、poll() 都是從阻塞佇列中獲取並移除一個元素;它們的區別在於如果阻塞佇列是空的,那麼呼叫 take() 方法的執行緒會被阻塞,而 poll() 方法會返回 null 值。
利用 CompletionService 實現 Dubbo 中的 Forking Cluster
Dubbo 中有一種叫做Forking 的叢集模式
,這種叢集模式下,支援並行地呼叫多個查詢服務,只要有一個成功返回結果,整個服務就可以返回了。例如你需要提供一個地址轉座標的服務,為了保證該服務的高可用和效能,你可以並行地呼叫 3 個地圖服務商的 API,然後只要有 1 個正確返回了結果 r,那麼地址轉座標這個服務就可以直接返回 r 了。這種叢集模式可以容忍 2 個地圖服務商服務異常,但缺點是消耗的資源偏多。
geocoder(addr) {
// 並行執行以下 3 個查詢服務,
r1=geocoderByS1(addr);
r2=geocoderByS2(addr);
r3=geocoderByS3(addr);
// 只要 r1,r2,r3 有一個返回
// 則返回
return r1|r2|r3;
}
利用 CompletionService 可以快速實現 Forking 這種叢集模式,比如下面的示例程式碼就展示了具體是如何實現的。首先我們建立了一個執行緒池 executor 、一個 CompletionService 物件 cs 和一個Future<Integer>
型別的列表 futures,每次通過呼叫 CompletionService 的 submit() 方法提交一個非同步任務,會返回一個 Future 物件,我們把這些 Future 物件儲存在列表 futures 中。通過呼叫cs.take().get()
,我們能夠拿到最快返回的任務執行結果,只要我們拿到一個正確返回的結果,就可以取消所有任務並且返回最終結果了。
// 建立執行緒池
ExecutorService executor =
Executors.newFixedThreadPool(3);
// 建立 CompletionService
CompletionService<Integer> cs =
new ExecutorCompletionService<>(executor);
// 用於儲存 Future 物件
List<Future<Integer>> futures =
new ArrayList<>(3);
// 提交非同步任務,並儲存 future 到 futures
futures.add(
cs.submit(()->geocoderByS1()));
futures.add(
cs.submit(()->geocoderByS2()));
futures.add(
cs.submit(()->geocoderByS3()));
// 獲取最快返回的任務執行結果
Integer r = 0;
try {
// 只要有一個成功返回,則 break
for (int i = 0; i < 3; ++i) {
r = cs.take().get();
// 簡單地通過判空來檢查是否成功返回
if (r != null) {
break;
}
}
} finally {
// 取消所有任務
for(Future<Integer> f : futures)
f.cancel(true);
}
// 返回結果
return r;
總結
當需要批量提交非同步任務的時候建議你使用 CompletionService。CompletionService 將執行緒池 Executor 和阻塞佇列 BlockingQueue 的功能融合在了一起,能夠讓批量非同步任務的管理更簡單。除此之外,CompletionService 能夠讓非同步任務的執行結果有序化,先執行完的先進入阻塞佇列,利用這個特性,你可以輕鬆實現後續處理的有序性,避免無謂的等待,同時還可以快速實現諸如 Forking Cluster 這樣的需求。