1. 程式人生 > >RocketMQ叢集解決方案----JAVA應用

RocketMQ叢集解決方案----JAVA應用

       上篇文章 RocketMQ叢集解決方案 已經講解了RocketMQ應用場景及效能、RocketMQ網路部署圖、實際叢集部署操作步驟(採用多Master多Slave,非同步複製叢集模式進行部署),這篇文章將講解JAVA應用中怎麼實際呼叫搭建的RocketMQ叢集環境。
     一、先搭建兩個JAVA  WEB工程,一個是訊息生產者工程ProducerProject,另外是一個訊息消費者工程ConsumerProject
     二、然後匯入RocketMQ的所需要JAR包(rocketmq-client-3.2.6.jar,rocketmq-common-3.2.6.jar,rocketmq-remoting-3.2.6.jar),點選此連結進行下載
    三、在ProducerProject 建立一個傳送訊息的工具類,用來向RocketMQ傳送訊息,程式碼如下:
    
package com.cluster.mq;


import java.io.UnsupportedEncodingException;


import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;


import com.alibaba.rocketmq.client.exception.MQBrokerException;
import com.alibaba.rocketmq.client.exception.MQClientException;
import com.alibaba.rocketmq.client.producer.DefaultMQProducer;
import com.alibaba.rocketmq.client.producer.SendResult;
import com.alibaba.rocketmq.common.message.Message;
import com.alibaba.rocketmq.remoting.exception.RemotingException;
/**
 * RocketMQ生產者類
 * @author pengtian
 *
 */
public class RocketMqProducer {
private static Log log = LogFactory.getLog(RocketMqProducer.class);
static DefaultMQProducer producer = null;
private static final Object obj = new Object();

/**
* 系統一啟動,就啟動執行生產者
*/
static {
if (producer == null) {
synchronized (obj) {
if (producer == null) {
try {
producer = new DefaultMQProducer("myProducer");
//producer.setNamesrvAddr(SystemGlobals.getProperties("nameServiceAddr"));// 連線MQ伺服器地址
producer.setNamesrvAddr("rocketmq-nameserver1:9876;rocketmq-nameserver2:9876;rocketmq-nameserver3:9876;rocketmq-nameserver4:9876");// 連線MQ伺服器地址;192.168.113.134:9876

producer.start();
} catch (MQClientException e) {
log.error("MQ producer初始化異常");
e.printStackTrace();
}
}
}
}
}


/**
* 傳送MQ訊息
* @param xml,String MQ訊息
* @param topic,String MQ訊息主題
* @param tags,String MQ訊息標誌
* @return
* @throws Exception
*/
public static boolean sendMessage(String xml, String topic, String tags)throws Exception {
// 以註冊為例,Message中引數為 Topic ; 註冊為memberRegister; 這兩個引數必須與收訊息方共同定義好,現在使用者註冊
// 為Topic,memberRegister,新的再加定義
SendResult sendResult = null;
try {

Message msg = new Message(topic, tags, xml.getBytes("UTF-8"));
msg.setKeys("KEYS_"+System.currentTimeMillis());
sendResult = producer.send(msg);
log.info("MSG_ID:"+sendResult.getMsgId()+",SendStatus:"+sendResult.getSendStatus());
   //producer.shutdown();
return true;
} catch (UnsupportedEncodingException e) {
log.error("傳送訊息到mq不支援的編碼異常--->");
e.printStackTrace();
} catch (MQClientException e) {
log.error("傳送訊息到mq MQ客戶端異常--->");
e.printStackTrace();
} catch (RemotingException e) {
log.error("傳送訊息到mq Remoting異常--->");
e.printStackTrace();
} catch (MQBrokerException e) {
log.error("傳送訊息到mq MQBroker異常--->");
e.printStackTrace();
} catch (Exception e) {
log.error("傳送訊息到mq未知異常--->");
e.printStackTrace();
}
return false;
}

public static void main(String[] args) {
for(int i=1;i<100;i++){
// 訊息的內容
StringBuffer xml = new StringBuffer("");
xml.append("<REQUEST id='mqMsg'>");
xml.append("<VERSION>1.1</VERSION>");
xml.append("<DDH>DD_1231eqwe123</DDH>");
xml.append("</REQUEST>");
String message = xml.toString();
try {
sendMessage(message, "Topic", "MY_TEST_01");
} catch (Exception e) {
e.printStackTrace();
}
}
}






}
   四、在ConsumerProject工程配置消費RocketMQ佇列中的訊息,需要和Spring進行整合:
     1、先建立RocketMQ 消費工具類,程式碼如下:
package com.cluster.mq;


import org.apache.log4j.Logger;
import org.springframework.beans.factory.annotation.Autowired;


import com.alibaba.rocketmq.client.consumer.DefaultMQPushConsumer;
import com.alibaba.rocketmq.client.exception.MQClientException;
import com.alibaba.rocketmq.common.consumer.ConsumeFromWhere;
import com.alibaba.rocketmq.common.protocol.heartbeat.MessageModel;
import com.cluster.util.SystemGlobals;


/**
 * RocketMq消費者
 * @author pengtian
 *
 */
public class RocketMqConsumers {

private static final Logger log = Logger.getLogger(RocketMqConsumers.class);
private DefaultMQPushConsumer consumer;
private static final String namesrvAddr = "rocketmq-nameserver1:9876;rocketmq-nameserver2:9876;rocketmq-nameserver3:9876;rocketmq-nameserver4:9876";
       private static final String consumerGroup = "myProducer";private static final String topicName = "Topic";

       private static final String tagsName = "MY_TEST_01||mytest";//代表獲取Tags為MY_TEST_01、mytest的訊息

       private static final int consumeThreadMin = 3;

       private static final int consumeThreadMax = 5;

@Autowiredprivate 

MsgListenerConcurrently msgListenerConcurrently;

/** * Spring bean init-method */

public void init() throws MQClientException {

try {

 // 引數資訊log.info("DefaultMQPushConsumer初始化!");

 log.info("consumerGroup:"+consumerGroup);

 log.info("namesrvAddr:"+namesrvAddr);

 log.info("topicName:"+topicName);

 log.info("tagsName:"+tagsName);

// 一個應用建立一個Consumer,由應用來維護此物件,可以設定為全域性物件或者單例<br>// 注意:ConsumerGroupName需要由應用來保證唯一

consumer = new DefaultMQPushConsumer(consumerGroup);

consumer.setNamesrvAddr(namesrvAddr);

consumer.setInstanceName(String.valueOf(System.currentTimeMillis()));// 訂閱指定topic下tags等於tag

consumer.subscribe(topicName, tagsName);

//MessageModel.CLUSTERING 叢集消費 :只能有一個消費者消費一次 ,MessageModel.BROADCASTING 廣播模式 所有的消費者都會消費一次

consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);

consumer.setMessageModel(MessageModel.CLUSTERING);

consumer.registerMessageListener(msgListenerConcurrently);

consumer.setConsumeThreadMin(consumeThreadMin);

consumer.setConsumeThreadMax(consumeThreadMax);

consumer.start();log.info("DefaultMQPushConsumer初始化成功!");

} catch (Exception ex) {

       log.error("DefaultMQPushConsumer初始化異常=" + ex.getMessage());ex.printStackTrace();

}

}

/** * Spring bean destroy-method */

public void destory() {

   consumer.shutdown();

}

/** * @param args */

public static void main(String[] args) {

  / / TODO Auto-generated method stub

}

}
package com.cluster.mq;


import java.util.List;
import java.util.Map;


import org.apache.log4j.Logger;


import com.alibaba.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import com.alibaba.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import com.alibaba.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import com.alibaba.rocketmq.common.message.MessageExt;
import com.bwdz.fpt.common.xml.XmlUtil;


/**
 * PushConsumer回撥Listener方法
 * 
 */
@SuppressWarnings("unchecked")
public class MsgListenerConcurrently implements MessageListenerConcurrently {


private static final Logger log = Logger
.getLogger(MsgListenerConcurrently.class);




public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,
ConsumeConcurrentlyContext context) {
for (MessageExt msg : msgs) {
String body = "";
String tags = msg.getTags();
String topic = msg.getTopic();
log.info("【tags:"+tags+",topic:"+topic+"】");
try {
body = new String(msg.getBody(), "UTF-8");
System.out.println("【MSG:"+body+"】");
//Map map = XmlUtil.Dom2Map(body);
if("topic".equals(topic)&&"A".equals(tags)){
System.out.println("A===============");
}else if("topic".equals(topic)&&"B".equals(tags)){
System.out.println("B===============");
}
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
} catch (Exception e) {
String delay = msg.getProperty("DELAY");
if (delay != null && Integer.parseInt(delay) > 4) {// 重試次數超過3次丟棄此條訊息
log.warn("訊息重試次數過多,不再消費:" + body);
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
log.error("mq訊息消費異常,本條訊息是:" + body);
e.printStackTrace();
}
}
return ConsumeConcurrentlyStatus.RECONSUME_LATER;
}


}




   2、將上面建立的RocketMQ消費者工具類整合到Spring中,在spring配置檔案中進行如下配置:
    
    <!-- 訊息消費者Consumer -->
    <bean id="msgListenerConcurrently" class="com.cluster.mq.MsgListenerConcurrently" scope="singleton">
    </bean>
    <!-- 訊息消費者Consumer -->
    <bean class="com.cluster.mq.RocketMqConsumers" init-method="init" destroy-method="destory" scope="singleton" />



五、接下是檢驗成敗的時候了
   1、先啟動好RocketMQ叢集(具體請檢視 RocketMQ叢集解決方案),然後啟動好ConsumerProject
   2、單元執行ProducerProject工程 RocketMqProducer.java Main函式,執行後,ProducerProject工程控制檯列印如下資訊,說明通訊正常,然後再檢視ConsumerProject就會收到傳送的報文資訊。
16-11-24 14:06:18  INFO RocketMqProducer:63 - MSG_ID:C0A8718A00002A9F000000000000EC08,SendStatus:SEND_OK
16-11-24 14:06:18  INFO RocketMqProducer:63 - MSG_ID:C0A8718A00002A9F000000000000EEEA,SendStatus:SEND_OK
16-11-24 14:06:18  INFO RocketMqProducer:63 - MSG_ID:C0A8718A00002A9F000000000000F1CC,SendStatus:SEND_OK
16-11-24 14:06:18  INFO RocketMqProducer:63 - MSG_ID:C0A8718A00002A9F000000000000F4AE,SendStatus:SEND_OK
16-11-24 14:06:18  INFO RocketMqProducer:63 - MSG_ID:C0A8718A00002A9F000000000000F790,SendStatus:SEND_OK