使用DelayQueue 和 FutureTask 實現java中的快取
阿新 • • 發佈:2019-01-28
使用DelayQueue、ConcurrentHashMap、FutureTask實現的快取工具類。
DelayQueue 簡介
DelayQueue是一個支援延時獲取元素的無界阻塞佇列。DelayQueue內部佇列使用PriorityQueue來實現。佇列中的元素必須實現Delayed介面,在建立元素時可以指定多久才能從佇列中獲取當前元素。只有在延遲期滿時才能從佇列中提取元素。
DelayQueue非常有用,可以將DelayQueue運用在以下應用場景。
- 快取系統的設計:可以用DelayQueue儲存快取元素的有效期,使用一個執行緒迴圈查詢
DelayQueue,一旦能從DelayQueue中獲取元素時,表示快取有效期到了。 - 定時任務排程:使用DelayQueue儲存當天將會執行的任務和執行時間,一旦從
DelayQueue中獲取到任務就開始執行,比如TimerQueue就是使用DelayQueue實現的。
ConcurrentHashMap和FutureTask,詳見以下:
快取工具類實現
- 支援快取多長時間,單位毫秒。
- 支援多執行緒併發。
比如:有一個比較耗時的操作,此時緩衝中沒有此快取值,一個執行緒開始計算這個耗時操作,而再次進來執行緒就不需要再次進行計算,只需要等上一個執行緒計算完成後(使用FutureTask)返回該值即可。
import java.util.concurrent.Callable;
import java.util.concurrent.CancellationException;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.DelayQueue;
import java.util.concurrent.Delayed;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.FutureTask;
import java.util.concurrent.TimeUnit;
public class CacheBean<V> {
// 快取計算的結果
private final static ConcurrentMap<String, Future<Object>> cache = new ConcurrentHashMap<>();
// 延遲佇列來判斷那些快取過期
private final static DelayQueue<DelayedItem<String>> delayQueue = new DelayQueue<>();
// 快取時間
private final int ms;
static {
// 定時清理過期快取
Thread t = new Thread() {
@Override
public void run() {
dameonCheckOverdueKey();
}
};
t.setDaemon(true);
t.start();
}
private final Computable<V> c;
/**
* @param c Computable
*/
public CacheBean(Computable<V> c) {
this(c, 60 * 1000);
}
/**
* @param c Computable
* @param ms 快取多少毫秒
*/
public CacheBean(Computable<V> c, int ms) {
this.c = c;
this.ms = ms;
}
public V compute(final String key) throws InterruptedException {
while (true) {
//根據key從快取中獲取值
Future<V> f = (Future<V>) cache.get(key);
if (f == null) {
Callable<V> eval = new Callable<V>() {
public V call() {
return (V) c.compute(key);
}
};
FutureTask<V> ft = new FutureTask<>(eval);
//如果快取中存在此可以,則返回已存在的value
f = (Future<V>) cache.putIfAbsent(key, (Future<Object>) ft);
if (f == null) {
//向delayQueue中新增key,並設定該key的存活時間
delayQueue.put(new DelayedItem<>(key, ms));
f = ft;
ft.run();
}
}
try {
return f.get();
} catch (CancellationException e) {
cache.remove(key, f);
} catch (ExecutionException e) {
e.printStackTrace();
}
}
}
/**
* 檢查過期的key,從cache中刪除
*/
private static void dameonCheckOverdueKey() {
DelayedItem<String> delayedItem;
while (true) {
try {
delayedItem = delayQueue.take();
if (delayedItem != null) {
cache.remove(delayedItem.getT());
System.out.println(System.nanoTime() + " remove " + delayedItem.getT() + " from cache");
}
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
class DelayedItem<T> implements Delayed {
private T t;
private long liveTime;
private long removeTime;
public DelayedItem(T t, long liveTime) {
this.setT(t);
this.liveTime = liveTime;
this.removeTime = TimeUnit.MILLISECONDS.convert(liveTime, TimeUnit.MILLISECONDS) + System.currentTimeMillis();
}
@Override
public int compareTo(Delayed o) {
if (o == null)
return 1;
if (o == this)
return 0;
if (o instanceof DelayedItem) {
DelayedItem<T> tmpDelayedItem = (DelayedItem<T>) o;
if (liveTime > tmpDelayedItem.liveTime) {
return 1;
} else if (liveTime == tmpDelayedItem.liveTime) {
return 0;
} else {
return -1;
}
}
long diff = getDelay(TimeUnit.MILLISECONDS) - o.getDelay(TimeUnit.MILLISECONDS);
return diff > 0 ? 1 : diff == 0 ? 0 : -1;
}
@Override
public long getDelay(TimeUnit unit) {
return unit.convert(removeTime - System.currentTimeMillis(), unit);
}
public T getT() {
return t;
}
public void setT(T t) {
this.t = t;
}
@Override
public int hashCode() {
return t.hashCode();
}
@Override
public boolean equals(Object object) {
if (object instanceof DelayedItem) {
return object.hashCode() == hashCode() ? true : false;
}
return false;
}
}
Computable 介面
public interface Computable<V> {
V compute(String k);
}
測試類
public class FutureTaskDemo {
public static void main(String[] args) throws InterruptedException {
// 子執行緒
Thread t = new Thread(() -> {
CacheBean<String> cb = new CacheBean<>(k -> {
try {
System.out.println("模擬計算資料,計算時長2秒。key=" + k);
TimeUnit.SECONDS.sleep(2);
} catch (InterruptedException e) {
e.printStackTrace();
}
return "你好:" + k;
}, 5000);
try {
while (true) {
System.out.println("thead2:" + cb.compute("b"));
TimeUnit.SECONDS.sleep(1);
}
} catch (InterruptedException e) {
e.printStackTrace();
}
});
t.start();
// 主執行緒
while (true) {
CacheBean<String> cb = new CacheBean<>(k -> {
try {
System.out.println("模擬計算資料,計算時長2秒。key=" + k);
TimeUnit.SECONDS.sleep(2);
} catch (InterruptedException e) {
e.printStackTrace();
}
return "你好:" + k;
}, 5000);
System.out.println("thead1:" + cb.compute("b"));
TimeUnit.SECONDS.sleep(1);
}
}
}
執行結果:
兩個執行緒同時訪問同一個key的快取。從執行結果發現,每次快取失效後,同一個key只執行一次計算,而不是多個執行緒併發執行同一個計算然後快取。