手寫一個訊息佇列以及延遲訊息佇列
一、什麼是訊息佇列?
訊息佇列(Message Queue),是分散式系統中重要的元件,其通用的使用場景可以簡單地描述為:
當不需要立即獲得結果,但是併發量又需要進行控制的時候,差不多就是需要使用訊息佇列的時候
二、訊息佇列有什麼用?
1. 提高響應速度
非同步處理,序列化的功能變成並行化,從而提升系統性能,縮短響應時間
常用於秒殺、傳送簡訊通知等,需要立即返回結果的場景
2. 流量控制
在高併發的情況,為了避免大量的請求衝擊後端服務,可以使用訊息佇列暫存請求,後端服務按照自己的重能力,從佇列中消費,例如秒殺、埋點場景。
這樣可以隨時增加服務的例項數量水平擴容,而不用對系統的其他部分做修改
3.系統解耦
例如一個下單的資訊需要同步多個子系統,每個子系統都需要儲存訂單的資料的一部分,如果光靠訂單服務的團隊去維護所有的子系統資料同步,代價太大
解決方法是,通過釋出訂閱模型,訂單服務在訂單變化時傳送一條訊息到一個主題中,所有的下游子系統都訂單主題,這樣可以每個子系統都可以獲得一份完整的訂單資料
即使是增加、減少子系統,也不會對訂單服務造成影響
三、訊息佇列有什麼缺點?
- 同步訊息改成了非同步,增加了系統的呼叫鏈,增加了系統的複雜度
- 降低了資料一致性,如果要保持一致性,需要高代價的補償(如分散式事務、對賬)
- 引入了訊息佇列帶來的延遲問題
四、如何自定義一個訊息佇列?
我們可使用 Queue 來實現訊息佇列,Queue 大體可分為以下三類:
雙端佇列(Deque)是 Queue 的子類也是 Queue 的補充類,頭部和尾部都支援元素插入和獲取;
阻塞佇列指的是在元素操作時(新增或刪除),如果沒有成功,會阻塞等待執行,比如當新增元素時,如果佇列元素已滿,佇列則會阻塞等待直到有空位時再插入;
非阻塞佇列,和阻塞佇列相反,它會直接返回操作的結果,而非阻塞等待操作,雙端佇列也屬於非阻塞佇列。
自定義訊息佇列的實現程式碼如下:
import java.util.LinkedList; import java.util.Queue; /** * @author james * @version 1.0.0 * @Description 自定義實現訊息佇列 * @createTime 2020年08月15日 16:34:00 */ public class CustomQueue { /** * 定義訊息佇列 */ private static Queue<String> queue = new LinkedList<>(); public static void main(String[] args) { producer(); // 呼叫生產者 consumer(); // 呼叫消費者 } public static void consumer() { while (!queue.isEmpty()){ System.out.println(queue.poll()); } } public static void producer(){ queue.add("hello,"); queue.add("queue"); queue.add("!"); } }
以上程式的執行結果為
hello,
queue
!
可以看出訊息是以先進先出順序進行消費的。
實現自定義延遲佇列需要實現 Delayed 介面,重寫 getDelay() 方法,延遲佇列完整實現程式碼如下:
import java.util.concurrent.Delayed;
import java.util.concurrent.TimeUnit;
/**
* @author james
* @version 1.0.0
* @Description 延時訊息佇列
* @createTime 2020年08月16日 10:27:00
*/
public class MyDelay implements Delayed {
/**
* 延遲截止時間(毫秒)
*/
long delayTime = System.currentTimeMillis();
private String msg;
public String getMsg() {
return msg;
}
public void setMsg(String msg) {
this.msg = msg;
}
public MyDelay(long delayTime, String msg) {
this.delayTime = this.delayTime + delayTime;
this.msg = msg;
}
/**
* 獲取剩餘時間
*
* @param unit
* @return
*/
@Override
public long getDelay(TimeUnit unit) {
return unit.convert(delayTime - System.currentTimeMillis(), TimeUnit.MILLISECONDS);
}
@Override
public int compareTo(Delayed o) {
if (this.getDelay(TimeUnit.MILLISECONDS) > o.getDelay(TimeUnit.MILLISECONDS)) {
return 1;
} else if (this.getDelay(TimeUnit.MILLISECONDS) < o.getDelay(TimeUnit.MILLISECONDS)) {
return -1;
} else {
return 0;
}
}
@Override
public String toString() {
return this.msg;
}
}
import java.text.DateFormat;
import java.util.Date;
import java.util.concurrent.DelayQueue;
/**
* @author james
* @version 1.0.0
* @Description TODO
* @createTime 2020年08月16日 10:45:00
*/
public class CustomDelayQueue {
public static final DelayQueue delayQueue = new DelayQueue();
public static void main(String[] args) throws InterruptedException {
producer(); // 呼叫生產者
consumer(); // 呼叫消費者
}
public static void consumer() throws InterruptedException {
System.out.println("開始執行時間:" +
DateFormat.getDateTimeInstance().format(new Date()));
while (!delayQueue.isEmpty()) {
System.out.println(delayQueue.take());
}
System.out.println("結束執行時間:" +
DateFormat.getDateTimeInstance().format(new Date()));
}
public static void producer() {
// 新增訊息
delayQueue.put(new MyDelay(1000, "hello,"));
delayQueue.put(new MyDelay(60000, "delayQueue"));
}
}
同樣,一個簡易版的延遲訊息佇列就這樣完成了!
關注我的技術公眾號,每天都有優質技術文章推送。
微信掃一掃下方二維碼即可關注: