1. 程式人生 > >RocketMQ-quickstart(批量消費)

RocketMQ-quickstart(批量消費)

shu when 多重 代碼 color 最大 except 亂序 throws

一、專業術語

  • Producer

    消費生產者,負責產生消息,一般由業務系統負責產生消息

  • Consumer

    消息消費者,負責消費消息,一般是後臺系統負責異步消費

  • Push Consumer

    Consumer的一種,應用通常向Consumer對象註冊一個Listener接口,一旦收到消息,Consumer對象立刻回調Listener接口方法

  • Pull Consumer

    Consumer的一種,應用通常主動調用Consumer的拉消息方法,從Broker拉消息,主動權由應用控制

  • Producer Group

    一類Producer的集合名稱,這類Consumer通常發送一類消息,且發送邏輯一致。

  • Consumer Group

    一類Consumer的集合名稱,這類Consumer通常消費一類消息,且消費邏輯一致。

  • Broker

    消息中轉角色,負責存儲消息,轉發消息,一般也稱為Server。在JMS規範中稱為Provider。

  • 廣播消費

    一個消息被多個Consumer消費,即使這些Consumer屬於同一個Consumer Group,消息也會被Consumer Group中的每個Consumer都消費一 次,廣播消費中的Consumer Group概念可以認為在消息劃分方面無意義

    在CORBA Notification 規範中,消費方式都屬於廣播消費。

    在JMS規範中,相當於JMS publish/subscribe model

  • 集群消費

    一個Consumer Group 中的Consumer實例平均分攤消費消息。例如某個Topic有9條消息,其中一個Consumer Group有3個實例(可能是3個進程,或者3臺機器),那麽每個實例只消費其中的3條消息。

  • 順序消息

    消費消息的順序要同發送消息的順序一致,在RocketMq中,主要指的是局部順序,即一類消息為滿足順序性,必須Producer單線程順序發送,且發送到同一隊列,這樣Consumer就可以按照Producer發送的順序去消費消息。

  • 普通順序消息

    順序消息的一種,正常情況下可以保證完全的順序消息,但是一旦發生通信異常,Broker重啟,由於隊列總數發生變化,哈希取模後定位的隊列會變化,產生短暫的消息順序不一致。

    如果業務能容忍在集群一次情況(如某個Broker宕機或者重啟)下,消息短暫的亂序,使用普通順序方式比較合適。

  • 嚴格順序消息

    順序消費的一種,無論正常異常情況都能保證順序,但是犧牲了分布式Failover特性,即Broker集群中只要有一臺機器不可用,則整個集群都不可用,服務可用性大大降低。

    如果服務器部署為同步雙寫模式,此缺陷可通過備機自動切換為主避免,不過仍然會存在幾分鐘的服務不可用(依賴同步雙寫,主備自動切換,自動切換功能尚未實現)

  • Message Queue

    在RocketMq中,所有消息隊列都是持久化,長度無限的數據結構,所謂長度無限是指隊列中的每個存儲單元都是定長,訪問其中的存儲單元使用Offset來訪問,offset為java long類型,64位,理論上在100年內不會溢出,所以任務是長度無限,另外隊列中只保存最近幾天的數據,之前的數據會按照過期時間來刪除。

二、代碼示例

生產者:

package com.alibaba.rocketmq.example.quickstart;

import com.alibaba.rocketmq.client.exception.MQBrokerException;
import com.alibaba.rocketmq.client.exception.MQClientException;
import com.alibaba.rocketmq.client.producer.DefaultMQProducer;
import com.alibaba.rocketmq.client.producer.SendResult;
import com.alibaba.rocketmq.common.message.Message;
import com.alibaba.rocketmq.remoting.exception.RemotingException;

/**
 * @author : Jixiaohu
 * @Date : 2018-04-19.
 * @Time : 9:20.
 * @Description :
 */
public class produce {
    public static void main(String[] args) throws MQClientException, InterruptedException, MQBrokerException {
        DefaultMQProducer producer = new DefaultMQProducer("jxh_quickstart_produce");
        producer.setNamesrvAddr("192.168.1.114:9876;192.168.2.2:9876");
        producer.start();

        try {
            for (int i = 0; i < 100; i++) {
                Message msg = new Message("TopicQuickStart", "TagA",
                        ("Hello RoctetMq : " +i ).getBytes());
                SendResult sendResult = producer.send(msg);
                System.out.println(sendResult);
            }
        } catch (RemotingException e) {
            e.printStackTrace();
            Thread.sleep(1000);
        }
        producer.shutdown();
    }
}

生產者生產100條消息:

消費者:

package com.alibaba.rocketmq.example.quickstart;

import com.alibaba.rocketmq.client.consumer.DefaultMQPushConsumer;
import com.alibaba.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import com.alibaba.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import com.alibaba.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import com.alibaba.rocketmq.client.exception.MQClientException;
import com.alibaba.rocketmq.common.consumer.ConsumeFromWhere;
import com.alibaba.rocketmq.common.message.MessageExt;

import java.util.List;

/**
 * @author : Jixiaohu
 * @Date : 2018-04-19.
 * @Time : 9:20.
 * @Description :
 */
public class Consumer {

    public static void main(String[] args) throws MQClientException {
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("jxh_quickstart_produce");
        consumer.setNamesrvAddr("192.168.1.114:9876;192.168.2.2:9876");

        /**
         * 設置Consumer第一次啟動是從隊列頭部開始消費還是隊列尾部開始消費<br>
         * 如果非第一次啟動,那麽按照上次消費的位置繼續消費
         */
        consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);

        consumer.subscribe("TopicQuickStart", "*");

        //不配置默認一條
        consumer.setConsumeMessageBatchMaxSize(10);

        consumer.registerMessageListener(new MessageListenerConcurrently() {
            @Override
            public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext consumeConcurrentlyContext) {
                System.out.println("拉取消息條數 " + msgs.size());
                try {
                    for (MessageExt msg : msgs) {
                        String topic = msg.getTopic();
                        String msgBody = new String(msg.getBody(), "utf-8");
                        String tags = msg.getTags();
                        System.out.println("收到信息:" + " topic:" + topic + " msgBody:" + msgBody + " tags:" + tags);
                    }
                } catch (Exception e) {
                    e.printStackTrace();
                    return ConsumeConcurrentlyStatus.RECONSUME_LATER;
                }
                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
            }
        });

        consumer.start();
        System.out.println("Consumer Started");

    }
}

這邊設置了每次消費條數,我這邊先啟動Consumer訂閱ropic,然後啟動produce,看一下打印結果:

技術分享圖片

produce發送了100條數據,下面看一下,Consumer消費數據的情況

技術分享圖片

你會發現,為什麽每次獲取的消息都是1條,那上面設置的每次獲取最大的消息數目“10”,是不是不起作用了?

其實是這樣的,

我們正常的流程一般都是先啟動Consumer端,然後再啟動Producer端。Consumer端都是一條一條的消費的。但是有時候會出現先啟動Producer端,然後再啟動Consumer端這種情況,那這個時候就是會批量消費了,這個參數就會有作用了。

三、消息的重試,

  消息的重試大致分為三種情況,

技術分享圖片

①:Produce發送消息到MQ上,發送失敗。

看一下produce的代碼是怎麽實現的,這邊看一個大概的情況

public class produce {
    public static void main(String[] args) throws MQClientException, InterruptedException, MQBrokerException {
        DefaultMQProducer producer = new DefaultMQProducer("jxh_quickstart_produce");
        //消息發送至mq失敗後的重試次數
        producer.setRetryTimesWhenSendFailed(10);
        producer.setNamesrvAddr("192.168.1.114:9876;192.168.2.2:9876");
        producer.start();

        try {
            for (int i = 0; i < 100; i++) {
                Message msg = new Message("TopicQuickStart", "TagA",
                        ("Hello RoctetMq : " + i).getBytes());
//                SendResult sendResult = producer.send(msg);
                //增加一個超時參數,單位為毫秒
                SendResult sendResult = producer.send(msg, 1000);
                System.out.println(sendResult);
            }
        } catch (RemotingException e) {
            e.printStackTrace();
            Thread.sleep(1000);
        }
        producer.shutdown();
    }
}

②MQ推送消息至Consumer超時失敗(網絡波動)timeout。

這種情況,timeout,MQ會無限循環,直到把消息推送至Consumer,MQ沒有接收到RECONSUME_LATER或CONSUME_SUCCESS

③Consumer處理消息後,返回RECONSUME_LATER,MQ也會按照策略發送消息 exception。

消息重試的測試是 1s,5s,10s,30s,1m,2m,3m,4m,5m,6m,7m,8m,9m,10m,20m,30m,1h,2h

RocketMQ為我們提供了這麽多次數的失敗重試,但是在實際中也許我們並不需要這麽多重試,比如重試3次,還沒有成功,我們希望把這條消息存儲起來並采用另一種方式處理,而且希望RocketMQ不要再重試呢,因為重試解決不了問題了!這該如何做呢?

public class Consumer {

    public static void main(String[] args) throws MQClientException {
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("jxh_quickstart_produce");
        consumer.setNamesrvAddr("192.168.1.114:9876;192.168.2.2:9876");

        /**
         * 設置Consumer第一次啟動是從隊列頭部開始消費還是隊列尾部開始消費<br>
         * 如果非第一次啟動,那麽按照上次消費的位置繼續消費
         */
        consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);

        consumer.subscribe("TopicQuickStart", "*");

        //不配置默認一條
        consumer.setConsumeMessageBatchMaxSize(10);

        consumer.registerMessageListener(new MessageListenerConcurrently() {
            @Override
            public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext consumeConcurrentlyContext) {
                System.out.println("拉取消息條數 " + msgs.size());
                try {
//                    for (MessageExt msg : msgs) {
                    MessageExt msg = msgs.get(0);
                    String topic = msg.getTopic();
                    String msgBody = new String(msg.getBody(), "utf-8");
                    String tags = msg.getTags();
                    System.out.println("收到信息:" + " topic:" + topic + " msgBody:" + msgBody + " tags:" + tags + "msgs:" + msgs);
                    //註意,要先啟動Consumer,在進行發送消息(也就是先訂閱服務,再發送)
                    if ("Hello RoctetMq : 4".equals(msgBody)) {
                        System.out.println("===========失敗消息開始========");
                        System.out.println(msgBody);
                        System.out.println("===========失敗消息結束========");
                        //異常
                        int a = 1 / 0;
                    }
//                    }
                } catch (Exception e) {
                    e.printStackTrace();
                    if (msgs.get(0).getReconsumeTimes() == 3) {
                        //   該條消息可以存儲到DB或者LOG日誌中,或其他處理方式
                        return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
                    }

                    return ConsumeConcurrentlyStatus.RECONSUME_LATER;
                }
                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
            }
        });

        consumer.start();
        System.out.println("Consumer Started");

    }
}

查看下打印結果:

技術分享圖片

這邊能看到重試次數。

RocketMQ-quickstart(批量消費)