延時佇列DelayQueue
阿新 • • 發佈:2021-10-31
延時佇列DelayQueue的使用介紹
java.util.concurrent.DelayQueue
public class DelayQueue<E extends Delayed> extends AbstractQueue<E>
implements BlockingQueue<E>
延時佇列,最重要的特性就體現在它的延時屬性上,跟普通的佇列不一樣的是,普通佇列中的元素總是等著希望被早點取出處理,而延時佇列中的元素則是希望被在指定時間得到取出和處理,所以延時佇列中的元素是都是帶時間屬性的,通常來說是需要被處理的訊息或者任務。延時佇列就是用來存放需要在指定時間被處理的元素的佇列
簡介
- 以支援優先順序無界佇列的PriorityQueue作為一個容器,容器裡面的元素都應該實現Delayed介面,在每次往優先順序佇列中新增元素時以元素的過期時間作為排序條件,最先過期的元素放在優先順序最高。
- DelayQueue是一個沒有大小限制的佇列,因此往佇列中插入資料的操作(生產者)永遠不會被阻塞,而只有獲取資料的操作(消費者)才會被阻塞。
DelayQueue實現了阻塞佇列BlockingQueue介面,其中阻塞佇列涉及到入隊和出隊的操作在佇列不可用時的處理方式如下:
處理方式 | 丟擲異常 | 返回特殊值 | 阻塞 | 超時退出 |
---|---|---|---|---|
插入 | add(e) | offer(e) | put(e) | offer(e,time,unit) |
移除 | remove() | poll() | take() | poll(time,unit) |
檢視 | element() | peek() |
其中DelayQueue為無界佇列;其入隊和出隊方法主要有以下幾種:
入隊:add和put方法均使用的offer方法進行入隊操作,不會丟擲異常也不會阻塞,offer(e,time,unit)同樣使用的offer方法,它的另外兩個入參被忽略未實際使用,
出隊:
方法 | 入參 | 返回值 | 說明 |
---|---|---|---|
remove | 佇列元素 | Boolean | 移除佇列中的元素,返回移除是否成功標識 |
poll | - | 出隊元素 | 按優先順序出隊,無待出隊元素返回null |
take | - | 出隊元素 | 按優先順序出隊,無元素出隊時阻塞 |
poll | time,unit | 出隊元素 | 按優先順序出隊,無待出隊元素等待一段時間後再次嘗試出隊,無元素可以出隊時返回null |
簡單使用示例
// 自定義一個延時任務封裝類
public class DelayedTask<T> implements Delayed, Runnable {
/**
* 任務引數
*/
private final T taskParam;
/**
* 任務型別
*/
private final Integer type;
/**
* 任務函式
*/
private final Function<T, String> function;
/**
* 任務執行時刻,時間戳
*/
private final Long runTime;
public DelayedTask(T taskParam, Integer type, Function<T, String> function, Long runTime) {
this.taskParam = taskParam;
this.type = type;
this.function = function;
this.runTime = runTime;
}
@Override
public void run() {
if (taskParam != null) {
function.apply(taskParam);
}
}
@Override
public long getDelay(TimeUnit unit) {
return unit.convert(this.runTime - System.currentTimeMillis(), TimeUnit.MILLISECONDS);
}
@Override
public int compareTo(Delayed o) {
// 以執行時間戳值的大小比較,越小優先順序越高
if (o instanceof DelayedTask) {
return this.runTime.compareTo(((DelayedTask<?>) o).runTime);
} else {
return -1;
}
}
}
測試主類:
public class DelayQueueTest {
public static final int SIZE = 10;
public static void main(String[] args) {
//初始化執行緒池
BlockingQueue<Runnable> arrayBlockingQueue = new ArrayBlockingQueue<>(10);
ThreadPoolExecutor threadPool = new ThreadPoolExecutor(5, 10, 10, TimeUnit.MILLISECONDS,
arrayBlockingQueue, Executors.defaultThreadFactory(), new ThreadPoolExecutor.AbortPolicy());
DelayQueue<DelayedTask<String>> delayTaskQueue = new DelayQueue<>();
// 定義任務執行方法
Function<String, String> function = param -> {
StringBuilder builder = new StringBuilder(Thread.currentThread().getName());
builder.append(":").append(System.currentTimeMillis()).append(">>> log >>>")
.append(param.getClass().getSimpleName()).append(":").append(param);
System.out.println(builder);
return builder.toString();
};
//模擬10個延遲任務
long now = System.currentTimeMillis();
for (byte i = 0; i < SIZE; i++) {
// 設定每隔3s執行一個任務
Long runTime = now + 3000 * i;
String taskParam = "執行第" + i + "個任務";
delayTaskQueue.put(new DelayedTask<>(taskParam, 1, function, runTime));
}
while (delayTaskQueue.size() != 0) {
try {
//從延遲佇列中取值,如果沒有物件過期則取到null
DelayedTask<String> delayedTask = delayTaskQueue.poll();
if (delayedTask != null) {
threadPool.execute(delayedTask);
}
} catch (Exception e) {
e.printStackTrace();
}
}
threadPool.shutdown();
}
}
輸出結果:
pool-1-thread-1:1634822371554>>> log >>>String:執行第0個任務
pool-1-thread-2:1634822374552>>> log >>>String:執行第1個任務
pool-1-thread-3:1634822377552>>> log >>>String:執行第2個任務
pool-1-thread-4:1634822380552>>> log >>>String:執行第3個任務
pool-1-thread-5:1634822383552>>> log >>>String:執行第4個任務
pool-1-thread-1:1634822386552>>> log >>>String:執行第5個任務
pool-1-thread-2:1634822389552>>> log >>>String:執行第6個任務
pool-1-thread-3:1634822392552>>> log >>>String:執行第7個任務
pool-1-thread-4:1634822395552>>> log >>>String:執行第8個任務
pool-1-thread-5:1634822398552>>> log >>>String:執行第9個任務
常見使用場景
- 重試機制:比如當呼叫介面失敗後,把當前呼叫資訊放入delay=10s的元素,然後把元素放入佇列,那麼這個佇列就是一個重試佇列,一個執行緒通過take()方法獲取需要重試的介面,take()返回則介面進行重試,失敗則再次放入佇列,同時也可以在元素加上重試次數。
- 週期任務
- 訂單超時未支付取消
- 連線池關閉空閒一段時間後的連線
- 清理過期的記憶體資料