java.util.concurrent.Exchanger應用範例與原理淺析
java.util.concurrent.Exchanger應用範例與原理淺析--轉載
一、簡介
Exchanger是自jdk1.5起開始提供的工具套件,一般用於兩個工作執行緒之間交換資料。在本文中我將採取由淺入深的方式來介紹分析這個工具類。首先我們來看看官方的api文件中的敘述:
在以上的描述中,有幾個要點:
- 此類提供對外的操作是同步的;
- 用於成對出現的執行緒之間交換資料;
- 可以視作雙向的同步佇列;
- 可應用於基因演算法、流水線設計等場景。
接著看api文件,這個類提供對外的介面非常簡潔,一個無參建構函式,兩個過載的範型exchange方法:
public V exchange(V x) throws InterruptedException
public V exchange(V x, long timeout, TimeUnit unit) throws InterruptedException, TimeoutException
從官方的javadoc可以知道,當一個執行緒到達exchange呼叫點時,如果它的夥伴執行緒此前已經呼叫了此方法,那麼它的夥伴會被排程喚醒並與之進行物件交換,然後各自返回。如果它的夥伴還沒到達交換點,那麼當前執行緒將會被掛起,直至夥伴執行緒到達——完成交換正常返回;或者當前執行緒被中斷——丟擲中斷異常;又或者是等候超時——丟擲超時異常。
二、一個簡單的例子
按照某大師的觀點,行為知之先,在知道了Exchanger的大致用途並參閱了使用說明後,我們馬上動手寫個例子來跑一跑:
import java.util.concurrent.Exchanger; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; import org.apache.log4j.Logger; /** * @Title: ExchangerTest * @Description: Test class for Exchanger * @Company: CSAIR * @Author: lixuanbin * @Creation: 2014年12月14日 * @Version:1.0*/ public class ExchangerTest { protected static final Logger log = Logger.getLogger(ExchangerTest.class); private static volatile boolean isDone = false; static class ExchangerProducer implements Runnable { private Exchanger<Integer> exchanger; private static int data = 1; ExchangerProducer(Exchanger<Integer> exchanger) { this.exchanger = exchanger; } @Override public void run() { while (!Thread.interrupted() && !isDone) { for (int i = 1; i <= 3; i++) { try { TimeUnit.SECONDS.sleep(1); data = i; System.out.println("producer before: " + data); data = exchanger.exchange(data); System.out.println("producer after: " + data); } catch (InterruptedException e) { log.error(e, e); } } isDone = true; } } } static class ExchangerConsumer implements Runnable { private Exchanger<Integer> exchanger; private static int data = 0; ExchangerConsumer(Exchanger<Integer> exchanger) { this.exchanger = exchanger; } @Override public void run() { while (!Thread.interrupted() && !isDone) { data = 0; System.out.println("consumer before : " + data); try { TimeUnit.SECONDS.sleep(1); data = exchanger.exchange(data); } catch (InterruptedException e) { log.error(e, e); } System.out.println("consumer after : " + data); } } } /** * @param args */ public static void main(String[] args) { ExecutorService exec = Executors.newCachedThreadPool(); Exchanger<Integer> exchanger = new Exchanger<Integer>(); ExchangerProducer producer = new ExchangerProducer(exchanger); ExchangerConsumer consumer = new ExchangerConsumer(exchanger); exec.execute(producer); exec.execute(consumer); exec.shutdown(); try { exec.awaitTermination(30, TimeUnit.SECONDS); } catch (InterruptedException e) { log.error(e, e); } } }
這大致可以看作是一個簡易的生產者消費者模型,有兩個任務類,一個遞增地產生整數,一個產生整數0,然後雙方進行交易。每次交易前的生產者和每次交易後的消費者都會sleep 1秒來模擬資料處理的消耗,並在交易前後把整數值列印到控制檯以便檢測結果。在這個例子裡交易迴圈只執行三次,採用一個volatile boolean來控制交易雙方執行緒的退出。
我們來看看程式的輸出:
consumer before : 0
producer before: 1
consumer after : 1
producer after: 0
consumer before : 0
producer before: 2
producer after: 0
consumer after : 2
consumer before : 0
producer before: 3
producer after: 0
consumer after : 3
輸出結果驗證了以下兩件事情:
- exchange方法真的幫一對執行緒交換了資料;
- exchange方法真的會阻塞呼叫方執行緒直至另一方執行緒參與交易。
那麼在中斷和超時兩種情況下程式的執行表現會是怎樣呢?作為一個小練習,有興趣的觀眾可以設想並編寫測試用例覆蓋驗證之。接下來談談最近我在生產場景中對Exchanger的應用。
三、實戰場景
1.問題描述
最近接到外部專案組向我組提出的介面需求,需要查詢我們業務辦理量的統計情況。我們系統目前的情況是,有一個日增長十多萬、總資料量為千萬級別的業務辦理明細表(xxx_info),每人次的業務辦理結果會實時寫入其中。以往對外提供的業務統計介面是在每次被呼叫時候在明細表中執行SQL查詢(select、count、where、group by等),響應時間很長,對原生產業務的使用也有很大的影響。於是我決定趁著這次新增介面的上線機會對系統進行優化。
2.優化思路
首先是在明細表之外再建立一個數據統計(xxx_statistics)表,考慮到目前資料庫的壓力以及公司內部質管流控等因素,暫沒有分庫存放,仍舊與原明細表放在同一個庫。再設定一個定時任務於每日凌晨對明細表進行查詢、過濾、統計、排序等操作,把統計結果插入到統計表中。然後對外暴露統計介面查詢統計報表。現在的設計與原來的實現相比,雖然犧牲了統計表所佔用的少量額外的儲存空間(每日新增的十來萬條業務辦理明細記錄經過處理最終會變成幾百條統計表的記錄),但是卻能把select、count這樣耗時的資料統計操作放到凌晨時段執行以避開白天的業務辦理高峰,分表處理能夠大幅降低對生產業務明細表的效能影響,而對外提供的統計介面的查詢速度也將得到幾個數量級的提升。當然,還有一個缺點是,不能實時提供當天的統計資料,不過這也是雙方可以接受的。
3.設計實現
設計一個定時任務,每日凌晨執行。在定時任務中啟動兩個執行緒,一個執行緒負責對業務明細表(xxx_info)進行查詢統計,把統計的結果放置在記憶體緩衝區,另一個執行緒負責讀取緩衝區中的統計結果並插入到業務統計表(xxx_statistics)中。
親,這樣的場景是不是聽起來很有感覺?沒錯!兩個執行緒在記憶體中批量交換資料,這個事情我們可以使用Exchanger去做!我們馬上來看看程式碼如何實現。
生產者執行緒:
class ExchangerProducer implements Runnable { private Exchanger<Set<XXXStatistics>> exchanger; private Set<XXXStatistics> holder; private Date fltDate; private int threshold; ExchangerProducer(Exchanger<Set<XXXStatistics>> exchanger, Set<XXXStatistics> holder, Date fltDate, int threshold) { this.exchanger = exchanger; this.holder = holder; this.fltDate = fltDate; this.threshold = threshold; } @Override public void run() { try { while (!Thread.interrupted() && !isDone) { List<XXXStatistics> temp1 = null; List<XXXStatistics> temp11 = null; for (int i = 0; i < allCities.size(); i++) { try { temp1 = xxxDao .findStatistics1( fltDate, allCities.get(i)); temp11 = xxxDao .findStatistics2( fltDate, allCities.get(i), internationalList); if (temp1 != null && !temp1.isEmpty()) { calculationCounter.addAndGet(temp1.size()); if (temp11 != null && !temp11.isEmpty()) { // merge two lists into temp1 mergeLists(temp1, temp11); temp11.clear(); temp11 = null; } // merge temp1 into holder set mergeListToSet(holder, temp1); temp1.clear(); temp1 = null; } } catch (Exception e) { log.error(e, e); } // Insert every ${threshold} or the last into database. if (holder.size() >= threshold || i == (allCities.size() - 1)) { log.info("data collected: \n" + holder); holder = exchanger.exchange(holder); log.info("data submitted"); } } // all cities are calculated isDone = true; } log.info("calculation job done, calculated: " + calculationCounter.get()); } catch (InterruptedException e) { log.error(e, e); } exchanger = null; holder.clear(); holder = null; fltDate = null; } }
程式碼說明:
- threshold:緩衝區的容量閥值;
- allCities:城市列表,迭代這個列表作為入參來執行查詢統計;
- XXXStatistics:統計資料封裝實體類,實現了Serializable和Comparable介面,覆寫equals和compareTo方法,以利用TreeSet提供的去重和排序處理;
- isDone:volatile boolean,標識統計任務是否完成;
- holder:TreeSet<XXXStatistics>,存放統計結果的記憶體緩衝區,容量達到閥值後提交給Exchanger執行exchange操作;
- dao.findStatistics1,dao.findStatistics2:簡化的資料庫查詢統計操作,此處僅供示意;
- calculationCounter:AtomicInteger,標記生產端所提交的記錄總數;
- mergeLists,mergeListToSet:內部私有工具方法,把dao查詢返回的列表合併到holder中;
消費者執行緒:
class ExchangerConsumer implements Runnable { private Exchanger<Set<XXXStatistics>> exchanger; private Set<XXXStatistics> holder; ExchangerConsumer(Exchanger<Set<XXXStatistics>> exchanger, Set<XXXStatistics> holder) { this.exchanger = exchanger; this.holder = holder; } @Override public void run() { try { List<XXXStatistics> tempList; while (!Thread.interrupted() && !isDone) { holder = exchanger.exchange(holder); log.info("got data: \n" + holder); if (holder != null && !holder.isEmpty()) { try { // insert data into database tempList = convertSetToList(holder); insertionCounter.addAndGet(xxxDao .batchInsertXXXStatistics(tempList)); tempList.clear(); tempList = null; } catch (Exception e) { log.error(e, e); } // clear the set holder.clear(); } else { log.info("wtf, got an empty list"); } log.info("data processed"); } log.info("insert job done, inserted: " + insertionCounter.get()); } catch (InterruptedException e) { log.error(e, e); } exchanger = null; holder.clear(); holder = null; } }
程式碼說明:
- convertSetToList:由於dao介面的限制,需把交換得到的Set轉換為List;
- batchInsertXXXStatistics:使用jdbc4的batch update而實現的批量插入dao介面;
- insertionCounter:AtomicInteger,標記消費端插入成功的記錄總數;
排程器程式碼:
public boolean calculateStatistics(Date fltDate) { // initialization calculationCounter.set(0); insertionCounter.set(0); isDone = false; exec = Executors.newCachedThreadPool(); Set<XXXStatistics> producerSet = new TreeSet<XXXStatistics>(); Set<XXXStatistics> consumerSet = new TreeSet<XXXStatistics>(); Exchanger<Set<XXXStatistics>> xc = new Exchanger<Set<XXXStatistics>>(); ExchangerProducer producer = new ExchangerProducer(xc, producerSet, fltDate, threshold); ExchangerConsumer consumer = new ExchangerConsumer(xc, consumerSet); // execution exec.execute(producer); exec.execute(consumer); exec.shutdown(); boolean isJobDone = false; try { // wait for termination isJobDone = exec.awaitTermination(calculationTimeoutMinutes, TimeUnit.MINUTES); } catch (InterruptedException e) { log.error(e, e); } if (!isJobDone) { // force shutdown exec.shutdownNow(); log.error("time elapsed for " + calculationTimeoutMinutes + " minutes, but still not finished yet, shut it down anyway."); } // clean up exec = null; producerSet.clear(); producerSet = null; consumerSet.clear(); consumerSet = null; xc = null; producer = null; consumer = null; System.gc(); // return the result if (isJobDone && calculationCounter.get() > 0 && calculationCounter.get() == insertionCounter.get()) { return true; } return false; }
程式碼說明:
排程器的程式碼就四個步驟:初始化、提交任務並等候處理結果、清理、返回。初始化階段使用了jdk提供的執行緒池提交生產者和消費者任務,設定了最長等候時間calculationTimeoutMinutes,如果排程器執行緒被中斷或者任務執行超時,awaitTermination會返回false,此時就強行關閉執行緒池並記錄到日誌。統計操作每日凌晨執行一次,所以在任務退出前的清理階段建議jvm執行gc以儘早釋放計算時所產生的垃圾物件。在結果返回階段,如果查詢統計出來的記錄條數和插入成功的條數相等則返回true,否則返回false。
4.小結
在這個案例中,使用Exchanger進行批量的雙向資料交換可謂恰如其分:生產者在執行新的查詢統計任務填入資料到緩衝區的同時,消費者正在批量插入生產者換入的上一次產生的資料,系統的吞吐量得到平滑的提升;計算複雜度、記憶體消耗、系統性能也能通過相關的引數設定而得到有效的控制(在消費端也可以對holder進行再次分割以控制每次批插入的大小,建議參閱資料庫廠商以及資料庫驅動包的說明文件以確定jdbc的最優batch update size);程式碼的實現也很簡潔易懂。這些優點,是採用有界阻塞佇列所難以達到的。
程式的輸出結果與業務緊密相關,就不打印出來了。可以肯定的是,經過了一段時間的摸索調優,記憶體消耗、執行速度和處理結果還是比較滿意的。