1. 程式人生 > >RocketMQ深入學習

RocketMQ深入學習

 

RocketMQ概述

RocketMQ 是一款分散式、佇列模型的訊息中介軟體,具有以下特點: 能夠保證嚴格的訊息順序 提供豐富的訊息拉取模式 高效的訂閱者水平擴充套件能力 實時的訊息訂閱機制 億級訊息堆積能力

RocketMQ包含的元件

 

NameServer:單點,供Producer和Consumer獲取Broker地址

Producer:產生併發送訊息

Consumer:接受並消費訊息

Broker:訊息暫存,訊息轉發

Name Server

Name Server是RocketMQ的定址服務。用於把Broker的路由資訊做聚合。客戶端依靠Name Server決定去獲取對應topic的路由資訊,從而決定對哪些Broker做連線。

Name Server是一個幾乎無狀態的結點,Name Server之間採取share-nothing的設計,互不通訊。

對於一個Name Server叢集列表,客戶端連線Name Server的時候,只會選擇隨機連線一個結點,以做到負載均衡。

Name Server所有狀態都從Broker上報而來,本身不儲存任何狀態,所有資料均在記憶體。

如果中途所有Name Server全都掛了,影響到路由資訊的更新,不會影響和Broker的通訊。

 

Broker

Broker是處理訊息儲存,轉發等處理的伺服器。

Broker以group分開,每個group只允許一個master,若干個slave。

只有master才能進行寫入操作,slave不允許。

slave從master中同步資料。同步策略取決於master的配置,可以採用同步雙寫,非同步複製兩種。

客戶端消費可以從master和slave消費。在預設情況下,消費者都從master消費,在master掛後,客戶端由於從Name Server中感知到Broker掛機,就會從slave消費。

Broker向所有的NameServer結點建立長連線,註冊Topic資訊。

 

RocketMQ優點

1.強調叢集無單點,可擴充套件

2.任意一點高可用,水平可擴充套件

3.海量訊息堆積能力,訊息堆積後,寫入低延遲。

4.支援上萬個佇列

5.訊息失敗重試機制

6.訊息可查詢

7.開源社群活躍

8.成熟度(經過雙十一考驗)

 

RocketMQ環境安裝

伺服器配置

192.168.110.187 nameServer1,brokerServer1

192.168.110.188 nameServer2,brokerServer2

新增Host檔案

vi /etc/hosts

192.168.110.187 rocketmq-nameserver1

192.168.110.187 rocketmq-master1

192.168.110.188 rocketmq-nameserver2

192.168.110.188 rocketmq-master2

 

service network restart

注意: Error:No suitable device found: no device found for connection "System eth0"

解決辦法:

(1)ifconfig -a 檢視物理 MAC HWADDR 的值

(2)vim 編輯檔案 /etc/sysconfig/network-scripts/ifcfg-eth0中修改ifconfig中查出的MAC HWADDR值;

 

上傳安裝

# 上傳alibaba-rocketmq-3.2.6.tar.gz檔案至/usr/local

# tar -zxvf alibaba-rocketmq-3.2.6.tar.gz -C /usr/local

# mv alibaba-rocketmq alibaba-rocketmq-3.2.6

# ln -s alibaba-rocketmq-3.2.6 rocketmq

 

建立儲存路徑【兩臺機器】

mkdir /usr/local/rocketmq/store

# mkdir /usr/local/rocketmq/store/commitlog

# mkdir /usr/local/rocketmq/store/consumequeue

# mkdir /usr/local/rocketmq/store/index

RocketMQ配置檔案【兩臺機器】

# vim /usr/local/rocketmq/conf/2m-noslave/broker-a.properties

# vim /usr/local/rocketmq/conf/2m-noslave/broker-b.properties

 

修改日誌配置檔案【兩臺機器】

# mkdir -p /usr/local/rocketmq/logs

# cd /usr/local/rocketmq/conf && sed -i 's#${user.home}#/usr/local/rocketmq#g' *.xml

修改啟動NameServer【兩臺機器】

vim /usr/local/rocketmq/bin/runbroker.sh

JAVA_OPT="${JAVA_OPT} -server -Xms1g -Xmx1g -Xmn512m -
XX:PermSize=128m -XX:MaxPermSize=320m"

 

vim /usr/local/rocketmq/bin/runserver.sh

 

JAVA_OPT="${JAVA_OPT} -server -Xms1g -Xmx1g -Xmn512m -
XX:PermSize=128m -XX:MaxPermSize=320m"

啟動NameServer【兩臺機器】

# cd /usr/local/rocketmq/bin
# nohup sh mqnamesrv &

 

啟動BrokerServer A

cd /usr/local/rocketmq/bin
# nohup sh mqbroker -c /usr/local/rocketmq/conf/2m-noslave/broker-a.properties >/dev/null 2>&1 &
# netstat -ntlp
# jps
# tail -f -n 500 /usr/local/rocketmq/logs/rocketmqlogs/broker.log
# tail -f -n 500 /usr/local/rocketmq/logs/rocketmqlogs/namesrv.log

啟動BrokerServer B

cd /usr/local/rocketmq/bin
# nohup sh mqbroker -c /usr/local/rocketmq/conf/2m-noslave/broker-b.properties >/dev/null 2>&1 &
# netstat -ntlp
# jps
# tail -f -n 500 /usr/local/rocketmq/logs/rocketmqlogs/broker.log
# tail -f -n 500 /usr/local/rocketmq/logs/rocketmqlogs/namesrv.log

 

RocketMQ Console

將rocketmq-web-console 部署到webapps目錄中。

/usr/local/apache-tomcat-7.0.65/webapps/rocketmq-web-console/WEB-INF/classes/

修改config.properties

rocketmq.namesrv.addr=192.168.110.195:9876;192.168.110.199:9876

 

 

執行效果

 

安裝jdk環境

vi /etc/profile

 

export JAVA_HOME=/usr/local/jdk1.7.0_80

export CLASSPATH=.:$JAVA_HOME/lib/dt.jar:$JAVA_HOME/lib/tools.jar

export PATH=$JAVA_HOME/bin:$PATH

source /etc/profile

Java操作RocketMQ

Pom

<dependencies>

<dependency>

<groupId>com.alibaba.rocketmq</groupId>

<artifactId>rocketmq-client</artifactId>

<version>3.0.10</version>

</dependency>

<dependency>

<groupId>com.alibaba.rocketmq</groupId>

<artifactId>rocketmq-all</artifactId>

<version>3.0.10</version>

<type>pom</type>

</dependency>

<dependency>

<groupId>ch.qos.logback</groupId>

<artifactId>logback-classic</artifactId>

<version>1.1.1</version>

</dependency>

<dependency>

<groupId>ch.qos.logback</groupId>

<artifactId>logback-core</artifactId>

<version>1.1.1</version>

</dependency>

<dependency>

<groupId>junit</groupId>

<artifactId>junit</artifactId>

<version>4.10</version>

<scope>test</scope>

</dependency>

</dependencies>

 

 

生產

public class Producer {

 

public static void main(String[] args) throws MQClientException {

DefaultMQProducer producer = new DefaultMQProducer("rmq-group");

producer.setNamesrvAddr("192.168.110.195:9876;192.168.110.199:9876");

producer.setInstanceName("producer");

producer.start();

try {

for (int i = 0; i < 10; i++) {

Thread.sleep(1000); // 每秒傳送一次MQ

Message msg = new Message("itmayiedu-topic", // topic 主題名稱

"TagA", // tag 臨時值

("itmayiedu-"+i).getBytes()// body 內容

);

SendResult sendResult = producer.send(msg);

System.out.println(sendResult.toString());

}

} catch (Exception e) {

e.printStackTrace();

}

producer.shutdown();

}

 

}

 

消費

 

 

public class Consumer {

public static void main(String[] args) throws MQClientException {

DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("rmq-group");

 

consumer.setNamesrvAddr("192.168.110.195:9876;192.168.110.199:9876");

consumer.setInstanceName("consumer");

consumer.subscribe("itmayiedu-topic", "TagA");

 

consumer.registerMessageListener(new MessageListenerConcurrently() {

@Override

public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {

for (MessageExt msg : msgs) {

System.out.println(msg.getMsgId()+"---"+new String(msg.getBody()));

}

return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;

}

});

consumer.start();

System.out.println("Consumer Started.");

}

}

 

 

RocketMQ重試機制

MQ 消費者的消費邏輯失敗時,可以通過設定返回狀態達到訊息重試的結果。

MQ 訊息重試只針對叢集消費方式生效;廣播方式不提供失敗重試特性,即消費失敗後,失敗訊息不再重試,繼續消費新的訊息。

public class Consumer {

public static void main(String[] args) throws MQClientException {

DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("rmq-group");

 

consumer.setNamesrvAddr("192.168.110.195:9876;192.168.110.199:9876");

consumer.setInstanceName("consumer");

consumer.subscribe("itmayiedu-topic", "TagA");

 

consumer.registerMessageListener(new MessageListenerConcurrently() {

@Override

public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {

for (MessageExt msg : msgs) {

System.out.println(msg.getMsgId() + "---" + new String(msg.getBody()));

}

try {

int i = 1 / 0;

} catch (Exception e) {

e.printStackTrace();

                                // 需要重試

return ConsumeConcurrentlyStatus.RECONSUME_LATER;

 

}

                         // 需要重試

return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;

}

});

consumer.start();

System.out.println("Consumer Started.");

}

}

 

注意:每次重試後,訊息ID都不一致所以不能使用訊息ID判斷冪等。

 

RocketMQ如何解決訊息冪等

注意:每次重試後,訊息ID都不一致所以不能使用訊息ID判斷冪等。

解決辦法:使用自定義全域性ID判斷冪等,例如流水ID、訂單號

使用msg.setKeys 進行區分

生產:

public class Producer {

 

public static void main(String[] args) throws MQClientException {

DefaultMQProducer producer = new DefaultMQProducer("rmq-group");

producer.setNamesrvAddr("192.168.110.195:9876;192.168.110.199:9876");

producer.setInstanceName("producer");

producer.start();

try {

for (int i = 0; i < 1; i++) {

Thread.sleep(1000); // 每秒傳送一次MQ

Message msg = new Message("itmayiedu-topic", // topic 主題名稱

"TagA", // tag 臨時值

("itmayiedu-6" + i).getBytes()// body 內容

);

msg.setKeys(System.currentTimeMillis() + "");

SendResult sendResult = producer.send(msg);

System.out.println(sendResult.toString());

}

} catch (Exception e) {

e.printStackTrace();

}

producer.shutdown();

}

 

}

消費:

 

 

static private Map<String, String> logMap = new HashMap<>();

 

public static void main(String[] args) throws MQClientException {

DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("rmq-group");

 

consumer.setNamesrvAddr("192.168.110.195:9876;192.168.110.199:9876");

consumer.setInstanceName("consumer");

consumer.subscribe("itmayiedu-topic", "TagA");

 

consumer.registerMessageListener(new MessageListenerConcurrently() {

@Override

public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {

String key = null;

String msgId = null;

try {

for (MessageExt msg : msgs) {

key = msg.getKeys();

if (logMap.containsKey(key)) {

// 無需繼續重試。

System.out.println("key:"+key+",無需重試...");

return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;

}

msgId = msg.getMsgId();

System.out.println("key:" + key + ",msgid:" + msgId + "---" + new String(msg.getBody()));

int i = 1 / 0;

}

 

} catch (Exception e) {

e.printStackTrace();

return ConsumeConcurrentlyStatus.RECONSUME_LATER;

} finally {

logMap.put(key, msgId);

}

return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;

}

});

consumer.start();

System.out.println("Consumer Started.");

}

 

參考文獻

https://help.aliyun.com/document_detail/44397.html?spm=a2c4g.11186623.6.577.sPsbuu