RocketMQ原理學習--消費者消費訊息
阿新 • • 發佈:2018-12-31
在之前的一篇部落格《RocketMQ原理學習--訊息型別》中我們有介紹過RocketMQ的訊息型別,這篇部落格我們簡單介紹一下RocketMQ消費者是如何消費訊息的。
一、Pull or Push
簡單來說RocketMQ給我們提供了兩種訊息消費方式,Pull模式和Push模式,簡單理解我們可能會認為Pull模式是消費者主動去拉取訊息,Push模式是RocketMQ的Broker主動將訊息推送過來,其實RocketMQ對於這兩種方式都是採用的Pull拉取的方式,Push模式不過是通過回撥來實現的,讓我們理解為推送模式
1、Push示例:
提供一個MessageListenerConcurrently監聽器,當存在訊息時會回撥這個類的consumeMessage方法。
public class PushConsumer { public static void main(String[] args) throws InterruptedException, MQClientException { DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("rmq-group"); consumer.setNamesrvAddr("localhost:9876"); //consumer.setInstanceName("rmq-instance2"); consumer.subscribe("TopicA-test", "TagA"); consumer.registerMessageListener(new MessageListenerConcurrently() { public ConsumeConcurrentlyStatus consumeMessage( List<MessageExt> msgs, ConsumeConcurrentlyContext context) { for (MessageExt msg : msgs) { System.out.println(new String(msg.getBody())); } return ConsumeConcurrentlyStatus.RECONSUME_LATER; } }); consumer.start(); System.out.println("Consumer Started."); }
2、Pull模式:
維護MessageQueue和訊息offset不斷的去拉取訊息。
public class PullConsumer { private static final Map<MessageQueue, Long> OFFSE_TABLE = new HashMap<>(); public static void main(String [] args) throws Exception{ DefaultMQPullConsumer consumer = new DefaultMQPullConsumer("rmq-group"); consumer.start(); Set<MessageQueue> mqs = consumer.fetchSubscribeMessageQueues("TopicA-test"); for (MessageQueue mq : mqs) { System.out.printf("Consume from the queue: %s%n", mq); SINGLE_MQ: while (true) { try { PullResult pullResult = consumer.pullBlockIfNotFound(mq, null, getMessageQueueOffset(mq), 32); System.out.printf("%s%n", pullResult); putMessageQueueOffset(mq, pullResult.getNextBeginOffset()); switch (pullResult.getPullStatus()) { case FOUND: break; case NO_MATCHED_MSG: break; case NO_NEW_MSG: break SINGLE_MQ; case OFFSET_ILLEGAL: break; default: break; } } catch (Exception e) { e.printStackTrace(); } } } consumer.shutdown(); } private static long getMessageQueueOffset(MessageQueue mq) { Long offset = OFFSE_TABLE.get(mq); if (offset != null) return offset; return 0; } private static void putMessageQueueOffset(MessageQueue mq, long offset) { OFFSE_TABLE.put(mq, offset); } }
二、執行流程
1、Push模式:
對於Push模式,RocketMQ提供了PullMessageService執行緒,定時不斷的從RocketMQ中拉取訊息,最終來回調MessageListener的consumeMessage方法來消費訊息,如下執行流程圖: