第二部分:併發工具類25->CompletionService,批量執行非同步任務
1.如何優化一個詢價應用的核心程式碼呢?
ThreadPoolExecutor+Future的方案,
用3個執行緒非同步執行,通過3次呼叫future的get方法後去查價結果,然後價格儲存在資料庫中
// 建立執行緒池 ExecutorService executor = Executors.newFixedThreadPool(3); // 異步向電商S1詢價 Future<Integer> f1 = executor.submit( ()->getPriceByS1()); // 異步向電商S2詢價 Future<Integer> f2 = executor.submit( ()->getPriceByS2()); // 異步向電商S3詢價 Future<Integer> f3 = executor.submit( ()->getPriceByS3()); // 獲取電商S1報價並儲存 r=f1.get(); executor.execute(()->save(r)); // 獲取電商S2報價並儲存 r=f2.get(); executor.execute(()->save(r)); // 獲取電商S3報價並儲存 r=f3.get(); executor.execute(()->save(r));
問題
獲取s1報價耗時很長,那麼即使獲取s2的報價耗時端,也無法保證s2的報價先執行,因為住執行緒都阻塞在了f1.get()上
2.解決方案
利用阻塞佇列,將獲取報價的結果放入阻塞佇列,主執行緒消費阻塞佇列,保證先獲取的報價先儲存到資料庫中
在執行f1.get方法,不是在主執行緒中執行,而是用執行緒池新開執行緒執行,防止阻塞
// 建立阻塞佇列 BlockingQueue<Integer> bq = new LinkedBlockingQueue<>(); //電商S1報價非同步進入阻塞佇列 executor.execute(()-> bq.put(f1.get())); //電商S2報價非同步進入阻塞佇列 executor.execute(()-> bq.put(f2.get())); //電商S3報價非同步進入阻塞佇列 executor.execute(()-> bq.put(f3.get())); //非同步儲存所有報價 for (int i=0; i<3; i++) { Integer r = bq.take(); executor.execute(()->save(r)); }
3.使用CompletionService實現詢價系統
CompletionService,就是實現2中的類似功能
sdk併發包中提供的
內嵌一個阻塞佇列,然後把執行結果的future物件加入到阻塞佇列,也不是任務做種的結果
4.建立CompletionService
- ExecutorCompletionService(Executor executor)
- ExecutorCompletionService(Executor executor,BlockQueue<Future
> completionQueue)
共同引數都有執行緒池,如果不指定佇列,使用的是無界佇列LinkedBlockQueue,任務執行結果的future物件加入到completionQueue中
// 建立執行緒池
ExecutorService executor =
Executors.newFixedThreadPool(3);
// 建立CompletionService
CompletionService<Integer> cs = new
ExecutorCompletionService<>(executor);
// 異步向電商S1詢價
cs.submit(()->getPriceByS1());
// 異步向電商S2詢價
cs.submit(()->getPriceByS2());
// 異步向電商S3詢價
cs.submit(()->getPriceByS3());
// 將詢價結果非同步儲存到資料庫
for (int i=0; i<3; i++) {
Integer r = cs.take().get();
executor.execute(()->save(r));
}
5.CompletionService 介面說明
submit()方法,引數Callable
submit()方法,引數Runnable task和V result,類似於ThreadPoolExecutor的
task(),從阻塞佇列中獲取並移除一個元素,阻塞佇列是空,take會被阻塞
poll(),從阻塞對壘中獲取並移除一個元素,阻塞佇列是空,poll會返回null值
poll(long timeout,Timeunit unit)支援以超時的方式獲取並移除阻塞佇列頭部的一個元素,等待了timeout unit時間,阻塞佇列還是空,該方法會返回null值
6.Dubbo中的Forking cluster叢集模式的使用
dubblo中的叢集模式,支援並行呼叫多個查詢服務,只要有一個成功返回結果,整個服務就可以返回了
注意:是查詢服務,而不能是修改服務。
例如提供地址轉換座標的服務,保證高可用和高效能,並行呼叫3個地圖服務商的API,然後只要有1個正確返回了結果r,那麼地址座標整個服務就可以直接返回r了,這種叢集模式可以容忍2個地圖服務商異常,但缺點是消耗資源偏多
geocoder(addr) {
//並行執行以下3個查詢服務,
r1=geocoderByS1(addr);
r2=geocoderByS2(addr);
r3=geocoderByS3(addr);
//只要r1,r2,r3有一個返回
//則返回
return r1|r2|r3;
}
7.用CompletionService實現Forking叢集模式
// 建立執行緒池
ExecutorService executor =
Executors.newFixedThreadPool(3);
// 建立CompletionService
CompletionService<Integer> cs =
new ExecutorCompletionService<>(executor);
// 用於儲存Future物件
List<Future<Integer>> futures =
new ArrayList<>(3);
//提交非同步任務,並儲存future到futures
futures.add(
cs.submit(()->geocoderByS1()));
futures.add(
cs.submit(()->geocoderByS2()));
futures.add(
cs.submit(()->geocoderByS3()));
// 獲取最快返回的任務執行結果
Integer r = 0;
try {
// 只要有一個成功返回,則break
for (int i = 0; i < 3; ++i) {
r = cs.take().get();
//簡單地通過判空來檢查是否成功返回
if (r != null) {
break;
}
}
} finally {
//取消所有任務
for(Future<Integer> f : futures)
f.cancel(true);
}
// 返回結果
return r;
8.總結
批量提交非同步任務時,建議使用CompletionService,將執行緒池executor和blockingQueue功能融合一起
CompletionService能夠讓非同步任務執行結果有序化,先執行 完的先進入阻塞佇列
後續處理有序性,避免無謂等待
原創:做時間的朋友