一頭扎進多執行緒-構建高效且可伸縮的結果快取
通過多執行緒的所有元件學習之後,我們要學習到如何把一個類包裝成一個多執行緒安全類,下面通過構造一個——計算快取類,在構造的過程中一步一步的優化,最終來得到我們想要的計算快取類。
類提供的功能:類提供一個計算的功能,然後把計算傳入的值與結果快取在一個Map中,當第二次計算時先從快取裡面檢視看曾經有沒有計算過,有的話就直接返回結果,沒有的話就進行計算,存到快取再返回結果。
菜鳥的做法
如果是我的話,首先我會這麼做
定義一個計算的介面,裡面提供一個計算的方法,等待實現
public interface Computable<A,V> {//V代表返回結果
V computer(A arg);//A表示引數
}
定義一個具體的計算類,用來實現計算的功能
public class ExpensiveFunction implements Computable<String,BigInteger> {
/* (非 Javadoc)
* @see com.jjt.cache.Computable#computer(java.lang.Object)
*/
@Override
public BigInteger computer(String arg) {
//模擬經過長時間的計算
return new BigInteger(arg);
}
}
最後定義一個主類來實現我們想要的邏輯,這裡使用的是組合的方式來進行計算
public class Memosizer<A,V> implements Computable<A, V>{
private final Map<A,V> cache = new HashMap<A,V>();//快取資料結構
private final Computable<A,V> c;//具體計算類
/**
*
*/
public Memosizer(Computable<A, V> c) {
this.c=c;//建構函式傳入一個具體的計算類
}
/* (非 Javadoc)
* @see com.jjt.cache.Computable#computer(java.lang.Object)
*/
@Override
public synchronized V computer(A arg) {//實現我們想要的邏輯
V result = cache.get(arg);
if(result==null){
result = c.computer(arg);
cache.put(arg, result);
}
return result;
}
}
解析上面的程式碼,我們可以看到,由於hashMap是非執行緒安全的,所以我們只能通過synchronized 加一個隱形鎖來對我們的邏輯進行鎖定,但是這樣導致我們發生併發時,我們會因為計算時間的關係導致執行緒間進行長時間的等待,降低了吞吐率。那我們接下來看看如何改進。
把執行緒安全性交給執行緒安全Map類管理
// private final Map<A,V> cache = new HashMap<A,V>();
private final Map<A,V> cache = new ConcurrentHashMap<A,V>();
ConcurrentHashMap內部實現機構是分段鎖,去除隱式鎖換成併發Map有助於我們增加吞吐量,不過這裡又存在另外的一個問題。
延時任務交給FutureTask
我們已經知道有一個類能基本實現這個功能:FutureTask。FutureTask表示一個計算的過程。這個過程可能是已經計算完成,也可能是正在進行。如果結果可用,那麼 FutureTask.get將立即返回,否則會一直阻塞,直到結果計算出來再將其返回。
所以我們現在把原來的
ConcurrenHashMap<A,V>,更改為ConcurrentHashMap<A,Future<V>>
我們直接把計算的步驟放到FutureTask裡面,讓他幫我們進行處理,我們直接把FutureTask放到快取裡面,需要的時候再get出來,這樣就可以解決了我們第二個點提到的那個技術難題。改進後的Memosize如下:
public class Memosizer<A,V> implements Computable<A, V>{
// private final Map<A,V> cache = new HashMap<A,V>();
private final Map<A,Future<V>> cache = new ConcurrentHashMap<A,Future<V>>();
private final Computable<A,V> c;//具體計算類
/**
*
*/
public Memosizer(Computable<A, V> c) {
this.c=c;
}
/* (非 Javadoc)
* @see com.jjt.cache.Computable#computer(java.lang.Object)
*/
@Override
public V computer(A arg) throws InterruptedException, ExecutionException {//去除分段鎖之後
Future<V> result = cache.get(arg);
if(result==null){
Callable<V> callable = new Callable<V>() {
@Override
public V call() throws Exception {
return c.computer(arg);
}
};
FutureTask<V> f = new FutureTask<V>(callable);
result=f;
cache.put(arg, result);
f.run();//這裡將呼叫c.compute方法,在這裡執行緒被阻塞等沒關係,因為futureTask已經進了快取了
}
return result.get();
}
}
雖然上面的程式碼看起來好像幾乎是完美的,但是不要忘了還存在一個非原子操作