1. 程式人生 > 其它 >第二部分:併發工具類25->CompletionService,批量執行非同步任務

第二部分:併發工具類25->CompletionService,批量執行非同步任務

1.如何優化一個詢價應用的核心程式碼呢?

ThreadPoolExecutor+Future的方案,
用3個執行緒非同步執行,通過3次呼叫future的get方法後去查價結果,然後價格儲存在資料庫中


// 建立執行緒池
ExecutorService executor =
  Executors.newFixedThreadPool(3);
// 異步向電商S1詢價
Future<Integer> f1 = 
  executor.submit(
    ()->getPriceByS1());
// 異步向電商S2詢價
Future<Integer> f2 = 
  executor.submit(
    ()->getPriceByS2());
// 異步向電商S3詢價
Future<Integer> f3 = 
  executor.submit(
    ()->getPriceByS3());
    
// 獲取電商S1報價並儲存
r=f1.get();
executor.execute(()->save(r));
  
// 獲取電商S2報價並儲存
r=f2.get();
executor.execute(()->save(r));
  
// 獲取電商S3報價並儲存  
r=f3.get();
executor.execute(()->save(r));

問題
獲取s1報價耗時很長,那麼即使獲取s2的報價耗時端,也無法保證s2的報價先執行,因為住執行緒都阻塞在了f1.get()上

2.解決方案

利用阻塞佇列,將獲取報價的結果放入阻塞佇列,主執行緒消費阻塞佇列,保證先獲取的報價先儲存到資料庫中
在執行f1.get方法,不是在主執行緒中執行,而是用執行緒池新開執行緒執行,防止阻塞


// 建立阻塞佇列
BlockingQueue<Integer> bq =
  new LinkedBlockingQueue<>();
//電商S1報價非同步進入阻塞佇列  
executor.execute(()->
  bq.put(f1.get()));
//電商S2報價非同步進入阻塞佇列  
executor.execute(()->
  bq.put(f2.get()));
//電商S3報價非同步進入阻塞佇列  
executor.execute(()->
  bq.put(f3.get()));
//非同步儲存所有報價  
for (int i=0; i<3; i++) {
  Integer r = bq.take();
  executor.execute(()->save(r));
}  

3.使用CompletionService實現詢價系統

CompletionService,就是實現2中的類似功能
sdk併發包中提供的
內嵌一個阻塞佇列,然後把執行結果的future物件加入到阻塞佇列,也不是任務做種的結果

4.建立CompletionService

  1. ExecutorCompletionService(Executor executor)
  2. ExecutorCompletionService(Executor executor,BlockQueue<Future> completionQueue)

共同引數都有執行緒池,如果不指定佇列,使用的是無界佇列LinkedBlockQueue,任務執行結果的future物件加入到completionQueue中


// 建立執行緒池
ExecutorService executor = 
  Executors.newFixedThreadPool(3);
// 建立CompletionService
CompletionService<Integer> cs = new 
  ExecutorCompletionService<>(executor);
// 異步向電商S1詢價
cs.submit(()->getPriceByS1());
// 異步向電商S2詢價
cs.submit(()->getPriceByS2());
// 異步向電商S3詢價
cs.submit(()->getPriceByS3());
// 將詢價結果非同步儲存到資料庫
for (int i=0; i<3; i++) {
  Integer r = cs.take().get();
  executor.execute(()->save(r));
}

5.CompletionService 介面說明

submit()方法,引數Callable task
submit()方法,引數Runnable task和V result,類似於ThreadPoolExecutor的 Future submit(Runnable task,T result)

task(),從阻塞佇列中獲取並移除一個元素,阻塞佇列是空,take會被阻塞
poll(),從阻塞對壘中獲取並移除一個元素,阻塞佇列是空,poll會返回null值
poll(long timeout,Timeunit unit)支援以超時的方式獲取並移除阻塞佇列頭部的一個元素,等待了timeout unit時間,阻塞佇列還是空,該方法會返回null值

6.Dubbo中的Forking cluster叢集模式的使用

dubblo中的叢集模式,支援並行呼叫多個查詢服務,只要有一個成功返回結果,整個服務就可以返回了
注意:是查詢服務,而不能是修改服務。

例如提供地址轉換座標的服務,保證高可用和高效能,並行呼叫3個地圖服務商的API,然後只要有1個正確返回了結果r,那麼地址座標整個服務就可以直接返回r了,這種叢集模式可以容忍2個地圖服務商異常,但缺點是消耗資源偏多


geocoder(addr) {
  //並行執行以下3個查詢服務, 
  r1=geocoderByS1(addr);
  r2=geocoderByS2(addr);
  r3=geocoderByS3(addr);
  //只要r1,r2,r3有一個返回
  //則返回
  return r1|r2|r3;
}

7.用CompletionService實現Forking叢集模式


// 建立執行緒池
ExecutorService executor =
  Executors.newFixedThreadPool(3);
// 建立CompletionService
CompletionService<Integer> cs =
  new ExecutorCompletionService<>(executor);
// 用於儲存Future物件
List<Future<Integer>> futures =
  new ArrayList<>(3);
//提交非同步任務,並儲存future到futures 
futures.add(
  cs.submit(()->geocoderByS1()));
futures.add(
  cs.submit(()->geocoderByS2()));
futures.add(
  cs.submit(()->geocoderByS3()));
// 獲取最快返回的任務執行結果
Integer r = 0;
try {
  // 只要有一個成功返回,則break
  for (int i = 0; i < 3; ++i) {
    r = cs.take().get();
    //簡單地通過判空來檢查是否成功返回
    if (r != null) {
      break;
    }
  }
} finally {
  //取消所有任務
  for(Future<Integer> f : futures)
    f.cancel(true);
}
// 返回結果
return r;

8.總結

批量提交非同步任務時,建議使用CompletionService,將執行緒池executor和blockingQueue功能融合一起
CompletionService能夠讓非同步任務執行結果有序化,先執行 完的先進入阻塞佇列

後續處理有序性,避免無謂等待

原創:做時間的朋友