詳解RocketMQ中的consumer
阿新 • • 發佈:2019-02-08
上一篇部落格著重講解了一下RocketMQ中的Producer,那麼接下來這篇部落格來帶大家來了解一下RocketMQ中的Consumer角色
上述就是MQ中有關Consumer的類圖,下面來介紹一下每個類
1.MQAdmin:底層類,上篇部落格已經提過,就不再此重提
2.MQConsumer:Consumer公共的介面,常用的方法如下
如果消費失敗的話,訊息將會返回到broker中,並且延遲一會消費的時間
void sendMessageBack(final MessageExt msg, final int delayLevel, final String brokerName) throws RemotingException, MQBrokerException, InterruptedException, MQClientException;
3.MQPushConsumer:Consumer的一種,應用通常向Consumer物件註冊一個Listener介面,一旦收到訊息,Consumer物件立刻回撥Listener介面方法
4.MQPullConsumer:Consumer的一種,應用通常主動呼叫Consumer的拉訊息方法從Broker拉訊息,主動權由應用控制
在上圖中出現了兩類的消費者分別是PushConsumer和PullConsumer,下面來看一下
PushConsumer:通過註冊監聽的方式來消費資訊
[java] view plain copy print?- <span style="font-family:Comic Sans MS;font-size:18px;"
- * @FileName: Consumer.java
- * @Package:com.test
- * @Description: TODO
- * @author: LUCKY
- * @date:2015年12月28日 下午2:43:23
- * @version V1.0
- */
- package com.test;
- import java.util.List;
- import com.alibaba.rocketmq.client.consumer.DefaultMQPushConsumer;
- import com.alibaba.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
- import com.alibaba.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
- import com.alibaba.rocketmq.client.consumer.listener.MessageListenerConcurrently;
- import com.alibaba.rocketmq.common.consumer.ConsumeFromWhere;
- import com.alibaba.rocketmq.common.message.Message;
- import com.alibaba.rocketmq.common.message.MessageExt;
- /**
- * @ClassName: Consumer
- * @Description: 模擬消費者
- * @author: LUCKY
- * @date:2015年12月28日 下午2:43:23
- */
- publicclass ConsumerTest {
- publicstaticvoid main(String[] args) {
- DefaultMQPushConsumer consumer=new DefaultMQPushConsumer("broker-a");
- consumer.setNamesrvAddr("100.66.154.81:9876");
- try {
- // 訂閱PushTopic下Tag為push的訊息,都訂閱訊息
- consumer.subscribe("PushTopic", "push");
- // 程式第一次啟動從訊息佇列頭獲取資料
- consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
- //可以修改每次消費訊息的數量,預設設定是每次消費一條
- // consumer.setConsumeMessageBatchMaxSize(10);
- //註冊消費的監聽
- consumer.registerMessageListener(new MessageListenerConcurrently() {
- //在此監聽中消費資訊,並返回消費的狀態資訊
- public ConsumeConcurrentlyStatus consumeMessage(
- List<MessageExt> msgs,
- ConsumeConcurrentlyContext context) {
- // msgs中只收集同一個topic,同一個tag,並且key相同的message
- // 會把不同的訊息分別放置到不同的佇列中
- for(Message msg:msgs){
- System.out.println(new String(msg.getBody()));
- }
- return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
- }
- });
- consumer.start();
- Thread.sleep(5000);
- //5秒後掛載消費端消費
- consumer.suspend();
- } catch (Exception e) {
- e.printStackTrace();
- }
- }
- }
- </span>
PullConsumer:通過拉去的方式來消費訊息
[java] view plain copy print?- <span style="font-family:Comic Sans MS;font-size:18px;">/**
- * @FileName: Consumer.java
- * @Package:com.test
- * @Description: TODO
- * @author: LUCKY
- * @date:2015年12月28日 下午2:43:23
- * @version V1.0
- */
- package com.test;
- import java.util.Set;
- import com.alibaba.rocketmq.client.consumer.DefaultMQPullConsumer;
- import com.alibaba.rocketmq.client.consumer.MessageQueueListener;
- import com.alibaba.rocketmq.common.message.MessageQueue;
- /**
- * @ClassName: Consumer
- * @Description: 模擬消費者
- * @author: LUCKY
- * @date:2015年12月28日 下午2:43:23
- */
- publicclass ConsumerPullTest {
- publicstaticvoid main(String[] args) {
- DefaultMQPullConsumer consumer=new DefaultMQPullConsumer();
- consumer.setNamesrvAddr("100.66.154.81:9876");
- consumer.setConsumerGroup("broker");
- try {
- consumer.start();
- Set<MessageQueue> messageQueues= consumer.fetchSubscribeMessageQueues("PushTopic");
- for(MessageQueue messageQueue:messageQueues){
- System.out.println(messageQueue.getTopic());
- }
- //訊息佇列的監聽
- consumer.registerMessageQueueListener("", new MessageQueueListener() {
- @Override
- //訊息佇列有改變,就會觸發
- publicvoid messageQueueChanged(String topic, Set<MessageQueue> mqAll,
- Set<MessageQueue> mqDivided) {
- // TODO Auto-generated method stub
- }
- });
- } catch (Exception e) {
- e.printStackTrace();
- }
- }
- }
- </span>
一般在應用中都會採用push的方法來自動的消費資訊