Java關於DelayQueue做延時訊息推送
阿新 • • 發佈:2019-02-17
最近比較閒,看某專案原始碼時看到有用DelayQueue類來做延時的訊息推送。
DelayQueue是Delayed元素的一個無界阻塞佇列,只有在延遲期滿時才能從中提取元素。該佇列的頭部 是延遲期滿後儲存時間最長的Delayed元素。如果延遲都還沒有期滿,則佇列沒有頭部,並且poll將返回null。當一個元素的 getDelay( TimeUnit.NANOSECONDS) 方法返回一個小於等於 0 的值時,將發生到期。即使無法使用 take 或 poll 移除未到期的元素,也不會將這些元素作為正常元素對待。例如,size 方法同時返回到期和未到期元素的計數。此佇列不允許使用 null 元素。
訊息類,繼承Delayed:
package com.study.queue; import java.util.concurrent.Delayed; import java.util.concurrent.TimeUnit; public class Message implements Delayed { private int id; private String body; private long excuteTime; public int getId() { return id; } public void setId(int id) { this.id = id; } public String getBody() { return body; } public void setBody(String body) { this.body = body; } public long getExcuteTime() { return excuteTime; } public void setExcuteTime(long excuteTime) { this.excuteTime = excuteTime; } public Message(int id, String body, long delayTime) { this.id = id; this.body = body; this.excuteTime = TimeUnit.NANOSECONDS.convert(delayTime, TimeUnit.MILLISECONDS) + System.nanoTime(); } @Override public long getDelay(TimeUnit unit) { return unit.convert(this.excuteTime - System.nanoTime(), TimeUnit.NANOSECONDS); } @Override public int compareTo(Delayed o) { Message msg = (Message) o; return Long.valueOf(this.excuteTime) > Long.valueOf(msg.excuteTime) ? 1 : (Long.valueOf(this.excuteTime) < Long.valueOf(msg.id) ? -1 : 0); } }
消費類:檢查消費情況
package com.study.queue; import java.util.concurrent.DelayQueue; public class Consumer implements Runnable { private DelayQueue<Message> queue; public Consumer(DelayQueue<Message> queue) { this.queue = queue; } @Override public void run() { while (true) { try { Message take = queue.take(); System.out.println("消費資訊:" + take.getExcuteTime() + ":" + take.getBody()); } catch (Exception e) { e.printStackTrace(); } } } }
主程式:把Message放入延時佇列裡,打印出過時的Message
package com.study.queue;
import java.util.concurrent.DelayQueue;
public class DelayQueueTest {
public static void main(String[] args) {
// 建立延時佇列
DelayQueue<Message> queue = new DelayQueue<Message>();
// 新增延時訊息,m1 延時5s
Message m1 = new Message(1, "world", 3000);
// 新增延時訊息,m2 延時3s
Message m2 = new Message(2, "hello", 10000);
queue.offer(m2);
queue.offer(m1);
// 啟動消費執行緒
new Thread(new Consumer(queue)).start();
}
}
這裡有個需要注意的bug,這個過載方法裡需要返回的是納秒,如傳入的是毫秒,則回造成迴圈過快,造成cpu資源浪費。