1. 程式人生 > >rocketmq消息重復推送的問題

rocketmq消息重復推送的問題

內部 ins div 維護 mq服務器 second override 客戶 enable

最近,在公司的測試環境,遇到個問題,每次重啟應用重啟後,原來消費過的消息又被重復推送了一遍,消費者和生產者代碼如下:

package com.tf56.queue.client;

import java.util.concurrent.TimeUnit;

import com.alibaba.rocketmq.client.exception.MQBrokerException;
import com.alibaba.rocketmq.client.exception.MQClientException;
import com.alibaba.rocketmq.client.producer.DefaultMQProducer;
import com.alibaba.rocketmq.client.producer.SendResult; import com.alibaba.rocketmq.common.message.Message; import com.alibaba.rocketmq.remoting.exception.RemotingException; /** * RocketMQ 生產者工具類 * @author zjhua * */ public class RocketMQProducer { private DefaultMQProducer producer;
private String namesrvAddr; private String groupName; private String instanceName; public String getNamesrvAddr() { return namesrvAddr; } public void setNamesrvAddr(String namesrvAddr) { this.namesrvAddr = namesrvAddr; } public String getGroupName() {
return groupName; } public void setGroupName(String groupName) { this.groupName = groupName; } public String getInstanceName() { return instanceName; } public void setInstanceName(String instanceName) { this.instanceName = instanceName; } public RocketMQProducer(String namesrvAddr, String groupName, String instanceName) { super(); this.namesrvAddr = namesrvAddr; this.groupName = groupName; this.instanceName = instanceName; this.producer = new DefaultMQProducer(groupName); producer.setNamesrvAddr(namesrvAddr); producer.setInstanceName(instanceName); producer.setVipChannelEnabled(false); try { producer.start(); } catch (MQClientException e) { e.printStackTrace(); } } /** * 發送消息工具類 * @param topic * @param tags * @param keys * @param body * @return * @throws MQClientException * @throws RemotingException * @throws MQBrokerException * @throws InterruptedException */ public SendResult send(String topic, String tags, String keys, byte[] body) throws MQClientException, RemotingException, MQBrokerException, InterruptedException { Message msg = new Message(topic, tags, keys, body); try { SendResult sendResult = this.producer.send(msg); return sendResult; } catch (MQClientException | RemotingException | MQBrokerException | InterruptedException e) { e.printStackTrace(); throw e; } } /** * 發送工具類 * @param topic * @param tags * @param keys * @param body * @param retryTimes * @param elapseMS * @return */ public SendResult send(String topic, String tags, String keys, byte[] body,int retryTimes,int elapseMS) { Message msg = new Message(topic, tags, keys, body); boolean success = false; int i = 0; SendResult sendResult = null; while (!success && i++ < retryTimes) { try { sendResult = this.producer.send(msg); return sendResult; } catch (MQClientException | RemotingException | MQBrokerException | InterruptedException e) { e.printStackTrace(); } try { Thread.sleep(elapseMS); } catch (InterruptedException e) { // TODO Auto-generated catch block e.printStackTrace(); } } return sendResult; } public static void main(String[] args) throws MQClientException, InterruptedException { RocketMQProducer producer = new RocketMQProducer("10.7.29.121:9876", "ProducerGroupName", "Producer"); for (int i = 0; i < 10; i++) { try { { SendResult sendResult = producer.send("TopicTestZJH",// topic "TagA",// tag "OrderID001",// key ("Hello MetaQ" + i).getBytes()); System.out.println(sendResult); } { SendResult sendResult = producer.send("TopicTestYIDU",// topic "TagB",// tag "OrderID0034",// key ("Hello MetaQ" + i).getBytes()); System.out.println(sendResult); } } catch (Exception e) { e.printStackTrace(); } TimeUnit.MILLISECONDS.sleep(1000); } /** * spring bean配置 */ // <bean id="rocketMQProducer" class="com.tf56.queue.client.RocketMQProducer"> // <constructor-arg name="namesvrAddr" value="10.7.29.121:9876"/> // <constructor-arg name="groupName" value="ProducerGroupName"/> // <constructor-arg name="instanceName" value="Producer"/> // </bean> } }

消費端代碼:

package tf56.sofa.util;

import java.net.InetAddress;
import java.net.UnknownHostException;
import java.util.List;

import com.alibaba.rocketmq.client.consumer.DefaultMQPushConsumer;
import com.alibaba.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import com.alibaba.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import com.alibaba.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import com.alibaba.rocketmq.client.exception.MQClientException;
import com.alibaba.rocketmq.common.consumer.ConsumeFromWhere;
import com.alibaba.rocketmq.common.message.MessageExt;
import com.alibaba.rocketmq.common.protocol.heartbeat.MessageModel;

public class RocketMQPushConsumer {
    
    private DefaultMQPushConsumer consumer;
    private String namesrvAddr;
    private String groupName;
    private String instanceName;
    private String topics;
    
    public RocketMQPushConsumer(String namesrvAddr, String groupName, String instanceName,String topics,MessageListenerConcurrently messageListener) {
        super();
        this.namesrvAddr = namesrvAddr;
        this.groupName = groupName;
        this.instanceName = instanceName;
        this.topics = topics;
        
        /**
         * 一個應用創建一個Consumer,由應用來維護此對象,可以設置為全局對象或者單例<br>
         * 註意:ConsumerGroupName需要由應用來保證唯一
         */
        consumer = new DefaultMQPushConsumer(groupName);
        consumer.setNamesrvAddr(namesrvAddr);
        consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
        consumer.setInstanceName(RocketMQPushConsumer.getInstanceName(namesrvAddr));
        consumer.setMessageModel(MessageModel.CLUSTERING);
        consumer.setVipChannelEnabled(false);
        try {
            /**
             * 訂閱指定topic下所有消息<br>
             * 註意:一個consumer對象可以訂閱多個topic
             */
            
            String[] topicsArr = topics.split(";");
            for(int i=0;i<topicsArr.length;i++) {
                consumer.subscribe(topicsArr[i], "*");
            }
    
            consumer.registerMessageListener(messageListener);
        } catch (Exception e) {
            e.printStackTrace();
            return;
        }
        System.out.println("Consumer Started.");
    }
    
    /**
     * Consumer對象在使用之前必須要調用start初始化,初始化一次即可<br>
     */
    public void init() {
        try {
            consumer.start();
        } catch (MQClientException e) {
            e.printStackTrace();
        }
    }

    private static String getInstanceName(String namesrvAddr) {
        return getHostAddress() + namesrvAddr;
    }
    
    private static String getHostAddress(){
        try {
            return InetAddress.getLocalHost().getHostAddress();
        } catch (UnknownHostException e) {
            e.printStackTrace();
        }
        return "";
    }

    /**
     * 當前例子是PushConsumer用法,使用方式給用戶感覺是消息從RocketMQ服務器推到了應用客戶端。<br>
     * 但是實際PushConsumer內部是使用長輪詢Pull方式從MetaQ服務器拉消息,然後再回調用戶Listener方法<br>
     */
    public static void main(String[] args) throws InterruptedException,
            MQClientException {
        MessageListenerConcurrently messageListener = new MessageListenerConcurrently() {
            @Override
            public ConsumeConcurrentlyStatus consumeMessage(
                    List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
                MessageExt msg = msgs.get(0);
                if (msg.getTopic().equals("TopicTestZJH")) {
                    System.out.println("TopicTestZJH->" + new String(msg.getBody()));
                } else if (msg.getTopic().equals("TopicTestYIDU")) {
                    System.out.println("TopicTestYIDU->" + new String(msg.getBody()));
                }
                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
            }
        };
        RocketMQPushConsumer consumer = new RocketMQPushConsumer("10.7.29.121:9876",
                "ConsumerGroupName", "Consumer","TopicTestZJH;TopicTestYIDU",messageListener);
        consumer.init();
        /**
         * spring構造器註入
         */
//        <bean id="rocketMQPushConsumer" class="com.tf56.queue.client.RocketMQPushConsumer">  
//            <constructor-arg name="namesvrAddr" value="10.7.29.121:9876"/>  
//            <constructor-arg name="groupName" value="ConsumerGroupName"/>
//            <constructor-arg name="instanceName" value="Consumer"/>
//            <constructor-arg name="topics" value="TopicTestZJH;TopicTestYIDU"/>
//            <constructor-arg name="messageListener" ref="messageListener"/>
//        </bean>
    }
}

rocketmq消息重復推送的問題