凡客傳說4-DelayQueue使用方式,原始碼分析,以及應用場景
阿新 • • 發佈:2020-12-20
DelayQueue 顧名思義,它是一個延時佇列
使用方式 :
假設我們生產者提交一個任務,消費者5秒鐘之後才可以執行,那麼我們可以把任務定義為如下格式,並實現Delayed介面,其中data是任務儲存的資訊。
/**
* 具體的任務
* @author wangshixiang
*/
public class Task implements Delayed {
/**
* 資料
*/
private final String data;
/**
* 任務執行時間
*/
private final long time;
public Task(String data,TimeUnit timeUnit,long time){
this.data=data;
this.time=System.currentTimeMillis()+timeUnit.toMillis(time);
}
@Override
public long getDelay(TimeUnit unit) {
long res= time-System.currentTimeMillis();
return unit.convert(res,TimeUnit.MILLISECONDS);
}
public String getData() {
return data;
}
@Override
public int compareTo(Delayed o) {
if (o instanceof Task ){
Task task= (Task) o;
return (int) (this.time-task.time);
}
return 0;
}
}
定義好任務後,我們需要定義一個任務佇列 QUEUE_TASK,來儲存訊息,實現效果為程式執行後 五秒鐘後輸出Hello...
private static final DelayQueue<Task> QUEUE_TASK =new DelayQueue<>();
public static void main(String[] args) throws InterruptedException {
QUEUE_TASK .add(new Task("Hello ... ", TimeUnit.SECONDS,5));
System.out.println(QUEUE_TASK .take().getData());
}
使用詳解
- Delayed 介面定義:
public interface DeDlayed extends Comparable<Delayed> {
long getDelay(TimeUnit unit);
}
我們發現Delayed介面繼承了Comparable介面,並且有一個getDelay方法,在程式執行的過程中,會呼叫頭部任務的這個方法,來返回該任務具體還有多長時間可以執行。當我們任務實現這個介面時 可以儲存任務的執行時間,通過執行時間-當前時間 計算出距離執行時間的差值,因此我們Task定義了一個任務的變數,在建立物件時設定任務的執行時間。
2. DelayQueue 延時佇列
首先我們看一下DelayQueue類繼承實現結構圖
可以理解為 DelayQueue 是一個帶延遲執行功能的阻塞佇列
深入理解
- 為什麼Delayed介面繼承了Comparable介面 ?
- DelayQueue是怎麼實現只有到預定時間才能取出任務 ?
- 向佇列裡放入一個任務時 發生了什麼事情 ?
帶著這幾個問題,我們來看一下DelayQueeu的原始碼 首先看一下主要的引數:
//鎖
private final transient ReentrantLock lock = new ReentrantLock();
//優先順序佇列 執行時間最早的排在第一個
private final PriorityQueue<E> q = new PriorityQueue<E>();
//是否有執行緒在等待任務到執行時間
private Thread leader;
//條件喚醒
private final Condition available = lock.newCondition();
那麼我們先看add(E e)方法 ,任務入佇列時做了哪些操作
public boolean add(E e) {
return offer(e);
}
public boolean offer(E e) {
final ReentrantLock lock = this.lock;
lock.lock();
try {
q.offer(e);
if (q.peek() == e) {
leader = null;
available.signal();
}
return true;
} finally {
lock.unlock();
}
}
入佇列時做了一下步驟:
- 獲取鎖
- 放入元素 (放入優先順序佇列)
- 如果自己排在第一個 則原來標記的leader執行緒已經失效 直接設定為null,並喚醒消費者
- 釋放鎖
接下來在看出佇列時take()方法做了哪些操作
public E take() throws InterruptedException {
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
for (;;) {
E first = q.peek();
if (first == null)
available.await();
else {
long delay = first.getDelay(NANOSECONDS);
if (delay <= 0L)
return q.poll();
first = null; // don't retain ref while waiting
if (leader != null)
available.await();
else {
Thread thisThread = Thread.currentThread();
leader = thisThread;
try {
available.awaitNanos(delay);
} finally {
if (leader == thisThread)
leader = null;
}
}
}
}
} finally {
if (leader == null && q.peek() != null)
//我拿到元素了 喚醒其他的執行緒
available.signal();
lock.unlock();
}
}
出佇列做了如下步驟:
- 獲取鎖(可中斷的鎖 獲取這種鎖允許其他執行緒中斷此執行緒)
- 取出第一個元素 如果第一個元素為空 則直接 await(),等待被喚醒(如放佇列時的喚醒)
- 如果第一個元素不為空,檢視是否到執行時間,如果沒有到執行時間 檢視是否有leader已經注意到這個任務 如果他注意到這個任務 我直接await()。如果沒人注意,那麼我就把自己設定為leader然後設定帶時間的await()。
- 睡眠到執行時間後 醒來後檢視leader是否還是自己 如果是的話 取消自己的leader身份。然後在嘗試獲取任務。
- 如果我獲取到了符合要求的元素,那麼我應該喚醒大家 來一塊競爭獲取下一個元素。
帶時間的出佇列方法 E poll(long timeout, TimeUnit unit) 的實現邏輯與take()方法的唯一區別就是。只有當自己剩餘等待時間大於第一個元素剩餘執行時間時 才允許把自己設定為leader
public E poll(long timeout, TimeUnit unit) throws InterruptedException {
long nanos = unit.toNanos(timeout);
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
for (;;) {
E first = q.peek();
if (first == null) {
if (nanos <= 0L)
return null;
else
//睡眠等待時間 有可能提前返回 那麼返回的是剩餘等待時間
nanos = available.awaitNanos(nanos);
} else {
long delay = first.getDelay(NANOSECONDS);
if (delay <= 0L)
return q.poll();
if (nanos <= 0L)
return null;
first = null; // don't retain ref while waiting
if (nanos < delay || leader != null)
//如果剩餘等待時間比第一個元素剩餘執行時間還短 那麼應該睡剩餘等待時間
nanos = available.awaitNanos(nanos);
else {
Thread thisThread = Thread.currentThread();
leader = thisThread;
try {
long timeLeft = available.awaitNanos(delay);
//計算剩餘等待時間
nanos -= delay - timeLeft;
} finally {
if (leader == thisThread)
leader = null;
}
}
}
}
} finally {
if (leader == null && q.peek() != null)
available.signal(http://www.jintianxuesha.com/);
lock.unlock();
}
}
應用場景:
在大多數業務場景中,我們會利用中介軟體提供的延時訊息的功能。比如利用redis zset實現 ,kafka rabbit mq 的延時佇列。我們需要根據我們的業務場景,來選擇合適的中介軟體。
- 訂單超時未支付取消.
- 呼叫其他系統時失敗間隔重試.
- 呼叫第三方介面時,過段時間非同步獲取結果。