1. 程式人生 > 實用技巧 >手寫一個訊息佇列以及延遲訊息佇列

手寫一個訊息佇列以及延遲訊息佇列

一、什麼是訊息佇列?

訊息佇列(Message Queue),是分散式系統中重要的元件,其通用的使用場景可以簡單地描述為:
當不需要立即獲得結果,但是併發量又需要進行控制的時候,差不多就是需要使用訊息佇列的時候

二、訊息佇列有什麼用?

1. 提高響應速度

非同步處理,序列化的功能變成並行化,從而提升系統性能,縮短響應時間
常用於秒殺、傳送簡訊通知等,需要立即返回結果的場景

2. 流量控制

在高併發的情況,為了避免大量的請求衝擊後端服務,可以使用訊息佇列暫存請求,後端服務按照自己的重能力,從佇列中消費,例如秒殺、埋點場景。
這樣可以隨時增加服務的例項數量水平擴容,而不用對系統的其他部分做修改

3.系統解耦

例如一個下單的資訊需要同步多個子系統,每個子系統都需要儲存訂單的資料的一部分,如果光靠訂單服務的團隊去維護所有的子系統資料同步,代價太大
解決方法是,通過釋出訂閱模型,訂單服務在訂單變化時傳送一條訊息到一個主題中,所有的下游子系統都訂單主題,這樣可以每個子系統都可以獲得一份完整的訂單資料
即使是增加、減少子系統,也不會對訂單服務造成影響

三、訊息佇列有什麼缺點?

  1. 同步訊息改成了非同步,增加了系統的呼叫鏈,增加了系統的複雜度
  2. 降低了資料一致性,如果要保持一致性,需要高代價的補償(如分散式事務、對賬)
  3. 引入了訊息佇列帶來的延遲問題

四、如何自定義一個訊息佇列?

我們可使用 Queue 來實現訊息佇列,Queue 大體可分為以下三類:

雙端佇列(Deque)是 Queue 的子類也是 Queue 的補充類,頭部和尾部都支援元素插入和獲取;
阻塞佇列指的是在元素操作時(新增或刪除),如果沒有成功,會阻塞等待執行,比如當新增元素時,如果佇列元素已滿,佇列則會阻塞等待直到有空位時再插入;
非阻塞佇列,和阻塞佇列相反,它會直接返回操作的結果,而非阻塞等待操作,雙端佇列也屬於非阻塞佇列。

自定義訊息佇列的實現程式碼如下:
import java.util.LinkedList;
import java.util.Queue;

/**
 * @author james
 * @version 1.0.0
 * @Description 自定義實現訊息佇列
 * @createTime 2020年08月15日 16:34:00
 */
public class CustomQueue {

    /**
     * 定義訊息佇列
     */
    private static Queue<String> queue = new LinkedList<>();

    public static void main(String[] args) {
        producer();  // 呼叫生產者
        consumer();  // 呼叫消費者
    }

    public static void consumer() {
        while (!queue.isEmpty()){
            System.out.println(queue.poll());
        }
    }

    public static void  producer(){
        queue.add("hello,");
        queue.add("queue");
        queue.add("!");
    }
}

以上程式的執行結果為

hello,
queue
!

可以看出訊息是以先進先出順序進行消費的。

實現自定義延遲佇列需要實現 Delayed 介面,重寫 getDelay() 方法,延遲佇列完整實現程式碼如下:

import java.util.concurrent.Delayed;
import java.util.concurrent.TimeUnit;

/**
 * @author james
 * @version 1.0.0
 * @Description 延時訊息佇列
 * @createTime 2020年08月16日 10:27:00
 */
public class MyDelay implements Delayed {

    /**
     * 延遲截止時間(毫秒)
     */
    long delayTime = System.currentTimeMillis();


    private String msg;

    public String getMsg() {
        return msg;
    }

    public void setMsg(String msg) {
        this.msg = msg;
    }


    public MyDelay(long delayTime, String msg) {
        this.delayTime = this.delayTime + delayTime;
        this.msg = msg;
    }

    /**
     * 獲取剩餘時間
     *
     * @param unit
     * @return
     */
    @Override
    public long getDelay(TimeUnit unit) {
        return unit.convert(delayTime - System.currentTimeMillis(), TimeUnit.MILLISECONDS);
    }

    @Override
    public int compareTo(Delayed o) {
        if (this.getDelay(TimeUnit.MILLISECONDS) > o.getDelay(TimeUnit.MILLISECONDS)) {
            return 1;
        } else if (this.getDelay(TimeUnit.MILLISECONDS) < o.getDelay(TimeUnit.MILLISECONDS)) {
            return -1;
        } else {
            return 0;
        }
    }

    @Override
    public String toString() {
        return this.msg;
    }
}
import java.text.DateFormat;
import java.util.Date;
import java.util.concurrent.DelayQueue;


/**
 * @author james
 * @version 1.0.0
 * @Description TODO
 * @createTime 2020年08月16日 10:45:00
 */
public class CustomDelayQueue {

    public static final DelayQueue delayQueue = new DelayQueue();

    public static void main(String[] args) throws InterruptedException {
        producer();  // 呼叫生產者
        consumer();  // 呼叫消費者
    }

    public static void consumer() throws InterruptedException {
        System.out.println("開始執行時間:" +
                DateFormat.getDateTimeInstance().format(new Date()));
        while (!delayQueue.isEmpty()) {
            System.out.println(delayQueue.take());
        }
        System.out.println("結束執行時間:" +
                DateFormat.getDateTimeInstance().format(new Date()));
    }

    public static void producer() {
        // 新增訊息
        delayQueue.put(new MyDelay(1000, "hello,"));
        delayQueue.put(new MyDelay(60000, "delayQueue"));
    }
}

同樣,一個簡易版的延遲訊息佇列就這樣完成了!

關注我的技術公眾號,每天都有優質技術文章推送。
微信掃一掃下方二維碼即可關注: