rocketmq消息重復推送的問題
阿新 • • 發佈:2017-06-11
內部 ins div 維護 mq服務器 second override 客戶 enable
最近,在公司的測試環境,遇到個問題,每次重啟應用重啟後,原來消費過的消息又被重復推送了一遍,消費者和生產者代碼如下:
package com.tf56.queue.client; import java.util.concurrent.TimeUnit; import com.alibaba.rocketmq.client.exception.MQBrokerException; import com.alibaba.rocketmq.client.exception.MQClientException; import com.alibaba.rocketmq.client.producer.DefaultMQProducer;import com.alibaba.rocketmq.client.producer.SendResult; import com.alibaba.rocketmq.common.message.Message; import com.alibaba.rocketmq.remoting.exception.RemotingException; /** * RocketMQ 生產者工具類 * @author zjhua * */ public class RocketMQProducer { private DefaultMQProducer producer;private String namesrvAddr; private String groupName; private String instanceName; public String getNamesrvAddr() { return namesrvAddr; } public void setNamesrvAddr(String namesrvAddr) { this.namesrvAddr = namesrvAddr; } public String getGroupName() {return groupName; } public void setGroupName(String groupName) { this.groupName = groupName; } public String getInstanceName() { return instanceName; } public void setInstanceName(String instanceName) { this.instanceName = instanceName; } public RocketMQProducer(String namesrvAddr, String groupName, String instanceName) { super(); this.namesrvAddr = namesrvAddr; this.groupName = groupName; this.instanceName = instanceName; this.producer = new DefaultMQProducer(groupName); producer.setNamesrvAddr(namesrvAddr); producer.setInstanceName(instanceName); producer.setVipChannelEnabled(false); try { producer.start(); } catch (MQClientException e) { e.printStackTrace(); } } /** * 發送消息工具類 * @param topic * @param tags * @param keys * @param body * @return * @throws MQClientException * @throws RemotingException * @throws MQBrokerException * @throws InterruptedException */ public SendResult send(String topic, String tags, String keys, byte[] body) throws MQClientException, RemotingException, MQBrokerException, InterruptedException { Message msg = new Message(topic, tags, keys, body); try { SendResult sendResult = this.producer.send(msg); return sendResult; } catch (MQClientException | RemotingException | MQBrokerException | InterruptedException e) { e.printStackTrace(); throw e; } } /** * 發送工具類 * @param topic * @param tags * @param keys * @param body * @param retryTimes * @param elapseMS * @return */ public SendResult send(String topic, String tags, String keys, byte[] body,int retryTimes,int elapseMS) { Message msg = new Message(topic, tags, keys, body); boolean success = false; int i = 0; SendResult sendResult = null; while (!success && i++ < retryTimes) { try { sendResult = this.producer.send(msg); return sendResult; } catch (MQClientException | RemotingException | MQBrokerException | InterruptedException e) { e.printStackTrace(); } try { Thread.sleep(elapseMS); } catch (InterruptedException e) { // TODO Auto-generated catch block e.printStackTrace(); } } return sendResult; } public static void main(String[] args) throws MQClientException, InterruptedException { RocketMQProducer producer = new RocketMQProducer("10.7.29.121:9876", "ProducerGroupName", "Producer"); for (int i = 0; i < 10; i++) { try { { SendResult sendResult = producer.send("TopicTestZJH",// topic "TagA",// tag "OrderID001",// key ("Hello MetaQ" + i).getBytes()); System.out.println(sendResult); } { SendResult sendResult = producer.send("TopicTestYIDU",// topic "TagB",// tag "OrderID0034",// key ("Hello MetaQ" + i).getBytes()); System.out.println(sendResult); } } catch (Exception e) { e.printStackTrace(); } TimeUnit.MILLISECONDS.sleep(1000); } /** * spring bean配置 */ // <bean id="rocketMQProducer" class="com.tf56.queue.client.RocketMQProducer"> // <constructor-arg name="namesvrAddr" value="10.7.29.121:9876"/> // <constructor-arg name="groupName" value="ProducerGroupName"/> // <constructor-arg name="instanceName" value="Producer"/> // </bean> } }
消費端代碼:
package tf56.sofa.util; import java.net.InetAddress; import java.net.UnknownHostException; import java.util.List; import com.alibaba.rocketmq.client.consumer.DefaultMQPushConsumer; import com.alibaba.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext; import com.alibaba.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus; import com.alibaba.rocketmq.client.consumer.listener.MessageListenerConcurrently; import com.alibaba.rocketmq.client.exception.MQClientException; import com.alibaba.rocketmq.common.consumer.ConsumeFromWhere; import com.alibaba.rocketmq.common.message.MessageExt; import com.alibaba.rocketmq.common.protocol.heartbeat.MessageModel; public class RocketMQPushConsumer { private DefaultMQPushConsumer consumer; private String namesrvAddr; private String groupName; private String instanceName; private String topics; public RocketMQPushConsumer(String namesrvAddr, String groupName, String instanceName,String topics,MessageListenerConcurrently messageListener) { super(); this.namesrvAddr = namesrvAddr; this.groupName = groupName; this.instanceName = instanceName; this.topics = topics; /** * 一個應用創建一個Consumer,由應用來維護此對象,可以設置為全局對象或者單例<br> * 註意:ConsumerGroupName需要由應用來保證唯一 */ consumer = new DefaultMQPushConsumer(groupName); consumer.setNamesrvAddr(namesrvAddr); consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET); consumer.setInstanceName(RocketMQPushConsumer.getInstanceName(namesrvAddr)); consumer.setMessageModel(MessageModel.CLUSTERING); consumer.setVipChannelEnabled(false); try { /** * 訂閱指定topic下所有消息<br> * 註意:一個consumer對象可以訂閱多個topic */ String[] topicsArr = topics.split(";"); for(int i=0;i<topicsArr.length;i++) { consumer.subscribe(topicsArr[i], "*"); } consumer.registerMessageListener(messageListener); } catch (Exception e) { e.printStackTrace(); return; } System.out.println("Consumer Started."); } /** * Consumer對象在使用之前必須要調用start初始化,初始化一次即可<br> */ public void init() { try { consumer.start(); } catch (MQClientException e) { e.printStackTrace(); } } private static String getInstanceName(String namesrvAddr) { return getHostAddress() + namesrvAddr; } private static String getHostAddress(){ try { return InetAddress.getLocalHost().getHostAddress(); } catch (UnknownHostException e) { e.printStackTrace(); } return ""; } /** * 當前例子是PushConsumer用法,使用方式給用戶感覺是消息從RocketMQ服務器推到了應用客戶端。<br> * 但是實際PushConsumer內部是使用長輪詢Pull方式從MetaQ服務器拉消息,然後再回調用戶Listener方法<br> */ public static void main(String[] args) throws InterruptedException, MQClientException { MessageListenerConcurrently messageListener = new MessageListenerConcurrently() { @Override public ConsumeConcurrentlyStatus consumeMessage( List<MessageExt> msgs, ConsumeConcurrentlyContext context) { MessageExt msg = msgs.get(0); if (msg.getTopic().equals("TopicTestZJH")) { System.out.println("TopicTestZJH->" + new String(msg.getBody())); } else if (msg.getTopic().equals("TopicTestYIDU")) { System.out.println("TopicTestYIDU->" + new String(msg.getBody())); } return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; } }; RocketMQPushConsumer consumer = new RocketMQPushConsumer("10.7.29.121:9876", "ConsumerGroupName", "Consumer","TopicTestZJH;TopicTestYIDU",messageListener); consumer.init(); /** * spring構造器註入 */ // <bean id="rocketMQPushConsumer" class="com.tf56.queue.client.RocketMQPushConsumer"> // <constructor-arg name="namesvrAddr" value="10.7.29.121:9876"/> // <constructor-arg name="groupName" value="ConsumerGroupName"/> // <constructor-arg name="instanceName" value="Consumer"/> // <constructor-arg name="topics" value="TopicTestZJH;TopicTestYIDU"/> // <constructor-arg name="messageListener" ref="messageListener"/> // </bean> } }
rocketmq消息重復推送的問題