1. 程式人生 > >ActiveMQ—訊息特性(延遲和定時訊息投遞)

ActiveMQ—訊息特性(延遲和定時訊息投遞)

有時候我們不希望訊息馬上被broker投遞出去,而是想要訊息60秒以後發給消費者,或者我們想讓訊息沒隔一定時間投遞一次,一共投遞指定的次數。。。

類似這種需求,ActiveMQ提供了一種broker端訊息定時排程機制。

首先要修改activemq.xml配置檔案,啟用延時投遞。

 <broker xmlns="http://activemq.apache.org/schema/core" ... schedulerSupport="true">
    ...
  </broker>

然後我們只需要把幾個描述訊息定時排程方式的引數作為屬性新增到訊息,broker端的排程器就會按照我們想要的行為去處理訊息。

一共有四個屬性:

Property name type description
AMQ_SCHEDULED_DELAY long 延遲投遞的時間
AMQ_SCHEDULED_PERIOD long 重複投遞的時間間隔
AMQ_SCHEDULED_REPEAT int 重複投遞次數
AMQ_SCHEDULED_CRON String Cron表示式

當然ActiveMQ也提供了一個封裝的訊息型別:org.apache.activemq.ScheduledMessage.

使用示例,延遲60秒:

        MessageProducer producer = session.createProducer(destination);
        TextMessage message = session.createTextMessage("test msg"
); long time = 60 * 1000; message.setLongProperty(ScheduledMessage.AMQ_SCHEDULED_DELAY, time); producer.send(message);

延遲30秒,投遞10次,間隔10秒:

        MessageProducer producer = session.createProducer(destination);
        TextMessage message = session.createTextMessage("test msg");
        long
delay = 30 * 1000; long period = 10 * 1000; int repeat = 9; message.setLongProperty(ScheduledMessage.AMQ_SCHEDULED_DELAY, delay); message.setLongProperty(ScheduledMessage.AMQ_SCHEDULED_PERIOD, period); message.setIntProperty(ScheduledMessage.AMQ_SCHEDULED_REPEAT, repeat); producer.send(message);

使用 CRON 表示式的例子:

        MessageProducer producer = session.createProducer(destination);
        TextMessage message = session.createTextMessage("test msg");
        message.setStringProperty(ScheduledMessage.AMQ_SCHEDULED_CRON, "0 * * * *");
        producer.send(message);

CRON表示式的優先順序高於另外三個引數,如果在設定了CRON的同時,也有repeat和period引數,則會在每次CRON執行的時候,重複投遞repeat次,每次間隔為period。就是說設定是疊加的效果。例如每小時都會發生訊息被投遞10次,延遲1秒開始,每次間隔1秒:

        MessageProducer producer = session.createProducer(destination);
        TextMessage message = session.createTextMessage("test msg");
        message.setStringProperty(ScheduledMessage.AMQ_SCHEDULED_CRON, "0 * * * *");
        message.setLongProperty(ScheduledMessage.AMQ_SCHEDULED_DELAY, 1000);
        message.setLongProperty(ScheduledMessage.AMQ_SCHEDULED_PERIOD, 1000);
        message.setIntProperty(ScheduledMessage.AMQ_SCHEDULED_REPEAT, 9);
        producer.send(message);