RocketMQ深入學習
RocketMQ概述
RocketMQ 是一款分散式、佇列模型的訊息中介軟體,具有以下特點: 能夠保證嚴格的訊息順序 提供豐富的訊息拉取模式 高效的訂閱者水平擴充套件能力 實時的訊息訂閱機制 億級訊息堆積能力
RocketMQ包含的元件
NameServer:單點,供Producer和Consumer獲取Broker地址
Producer:產生併發送訊息
Consumer:接受並消費訊息
Broker:訊息暫存,訊息轉發
Name Server
Name Server是RocketMQ的定址服務。用於把Broker的路由資訊做聚合。客戶端依靠Name Server決定去獲取對應topic的路由資訊,從而決定對哪些Broker做連線。
Name Server是一個幾乎無狀態的結點,Name Server之間採取share-nothing的設計,互不通訊。
對於一個Name Server叢集列表,客戶端連線Name Server的時候,只會選擇隨機連線一個結點,以做到負載均衡。
Name Server所有狀態都從Broker上報而來,本身不儲存任何狀態,所有資料均在記憶體。
如果中途所有Name Server全都掛了,影響到路由資訊的更新,不會影響和Broker的通訊。
Broker
Broker是處理訊息儲存,轉發等處理的伺服器。
Broker以group分開,每個group只允許一個master,若干個slave。
只有master才能進行寫入操作,slave不允許。
slave從master中同步資料。同步策略取決於master的配置,可以採用同步雙寫,非同步複製兩種。
客戶端消費可以從master和slave消費。在預設情況下,消費者都從master消費,在master掛後,客戶端由於從Name Server中感知到Broker掛機,就會從slave消費。
Broker向所有的NameServer結點建立長連線,註冊Topic資訊。
RocketMQ優點
1.強調叢集無單點,可擴充套件
2.任意一點高可用,水平可擴充套件
3.海量訊息堆積能力,訊息堆積後,寫入低延遲。
4.支援上萬個佇列
5.訊息失敗重試機制
6.訊息可查詢
7.開源社群活躍
8.成熟度(經過雙十一考驗)
RocketMQ環境安裝
伺服器配置
192.168.110.187 nameServer1,brokerServer1
192.168.110.188 nameServer2,brokerServer2
新增Host檔案
vi /etc/hosts
192.168.110.187 rocketmq-nameserver1
192.168.110.187 rocketmq-master1
192.168.110.188 rocketmq-nameserver2
192.168.110.188 rocketmq-master2
service network restart
注意: Error:No suitable device found: no device found for connection "System eth0"
解決辦法:
(1)ifconfig -a 檢視物理 MAC HWADDR 的值
(2)vim 編輯檔案 /etc/sysconfig/network-scripts/ifcfg-eth0中修改ifconfig中查出的MAC HWADDR值;
上傳安裝包
# 上傳alibaba-rocketmq-3.2.6.tar.gz檔案至/usr/local
# tar -zxvf alibaba-rocketmq-3.2.6.tar.gz -C /usr/local
# mv alibaba-rocketmq alibaba-rocketmq-3.2.6
# ln -s alibaba-rocketmq-3.2.6 rocketmq
建立儲存路徑【兩臺機器】
mkdir /usr/local/rocketmq/store
# mkdir /usr/local/rocketmq/store/commitlog
# mkdir /usr/local/rocketmq/store/consumequeue
# mkdir /usr/local/rocketmq/store/index
RocketMQ配置檔案【兩臺機器】
# vim /usr/local/rocketmq/conf/2m-noslave/broker-a.properties
# vim /usr/local/rocketmq/conf/2m-noslave/broker-b.properties
修改日誌配置檔案【兩臺機器】
# mkdir -p /usr/local/rocketmq/logs
# cd /usr/local/rocketmq/conf && sed -i 's#${user.home}#/usr/local/rocketmq#g' *.xml
修改啟動NameServer【兩臺機器】
vim /usr/local/rocketmq/bin/runbroker.sh
JAVA_OPT="${JAVA_OPT} -server -Xms1g -Xmx1g -Xmn512m -
XX:PermSize=128m -XX:MaxPermSize=320m"
vim /usr/local/rocketmq/bin/runserver.sh
JAVA_OPT="${JAVA_OPT} -server -Xms1g -Xmx1g -Xmn512m -
XX:PermSize=128m -XX:MaxPermSize=320m"
啟動NameServer【兩臺機器】
# cd /usr/local/rocketmq/bin
# nohup sh mqnamesrv &
啟動BrokerServer A
cd /usr/local/rocketmq/bin
# nohup sh mqbroker -c /usr/local/rocketmq/conf/2m-noslave/broker-a.properties >/dev/null 2>&1 &
# netstat -ntlp
# jps
# tail -f -n 500 /usr/local/rocketmq/logs/rocketmqlogs/broker.log
# tail -f -n 500 /usr/local/rocketmq/logs/rocketmqlogs/namesrv.log
啟動BrokerServer B
cd /usr/local/rocketmq/bin
# nohup sh mqbroker -c /usr/local/rocketmq/conf/2m-noslave/broker-b.properties >/dev/null 2>&1 &
# netstat -ntlp
# jps
# tail -f -n 500 /usr/local/rocketmq/logs/rocketmqlogs/broker.log
# tail -f -n 500 /usr/local/rocketmq/logs/rocketmqlogs/namesrv.log
RocketMQ Console
將rocketmq-web-console 部署到webapps目錄中。
/usr/local/apache-tomcat-7.0.65/webapps/rocketmq-web-console/WEB-INF/classes/
修改config.properties
rocketmq.namesrv.addr=192.168.110.195:9876;192.168.110.199:9876
執行效果
安裝jdk環境
vi /etc/profile
export JAVA_HOME=/usr/local/jdk1.7.0_80
export CLASSPATH=.:$JAVA_HOME/lib/dt.jar:$JAVA_HOME/lib/tools.jar
export PATH=$JAVA_HOME/bin:$PATH
source /etc/profile
Java操作RocketMQ
Pom
<dependencies>
<dependency>
<groupId>com.alibaba.rocketmq</groupId>
<artifactId>rocketmq-client</artifactId>
<version>3.0.10</version>
</dependency>
<dependency>
<groupId>com.alibaba.rocketmq</groupId>
<artifactId>rocketmq-all</artifactId>
<version>3.0.10</version>
<type>pom</type>
</dependency>
<dependency>
<groupId>ch.qos.logback</groupId>
<artifactId>logback-classic</artifactId>
<version>1.1.1</version>
</dependency>
<dependency>
<groupId>ch.qos.logback</groupId>
<artifactId>logback-core</artifactId>
<version>1.1.1</version>
</dependency>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>4.10</version>
<scope>test</scope>
</dependency>
</dependencies>
生產者
public class Producer {
public static void main(String[] args) throws MQClientException {
DefaultMQProducer producer = new DefaultMQProducer("rmq-group");
producer.setNamesrvAddr("192.168.110.195:9876;192.168.110.199:9876");
producer.setInstanceName("producer");
producer.start();
try {
for (int i = 0; i < 10; i++) {
Thread.sleep(1000); // 每秒傳送一次MQ
Message msg = new Message("itmayiedu-topic", // topic 主題名稱
"TagA", // tag 臨時值
("itmayiedu-"+i).getBytes()// body 內容
);
SendResult sendResult = producer.send(msg);
System.out.println(sendResult.toString());
}
} catch (Exception e) {
e.printStackTrace();
}
producer.shutdown();
}
}
消費者
public class Consumer {
public static void main(String[] args) throws MQClientException {
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("rmq-group");
consumer.setNamesrvAddr("192.168.110.195:9876;192.168.110.199:9876");
consumer.setInstanceName("consumer");
consumer.subscribe("itmayiedu-topic", "TagA");
consumer.registerMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
for (MessageExt msg : msgs) {
System.out.println(msg.getMsgId()+"---"+new String(msg.getBody()));
}
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
consumer.start();
System.out.println("Consumer Started.");
}
}
RocketMQ重試機制
MQ 消費者的消費邏輯失敗時,可以通過設定返回狀態達到訊息重試的結果。
MQ 訊息重試只針對叢集消費方式生效;廣播方式不提供失敗重試特性,即消費失敗後,失敗訊息不再重試,繼續消費新的訊息。
public class Consumer {
public static void main(String[] args) throws MQClientException {
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("rmq-group");
consumer.setNamesrvAddr("192.168.110.195:9876;192.168.110.199:9876");
consumer.setInstanceName("consumer");
consumer.subscribe("itmayiedu-topic", "TagA");
consumer.registerMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
for (MessageExt msg : msgs) {
System.out.println(msg.getMsgId() + "---" + new String(msg.getBody()));
}
try {
int i = 1 / 0;
} catch (Exception e) {
e.printStackTrace();
// 需要重試
return ConsumeConcurrentlyStatus.RECONSUME_LATER;
}
// 不需要重試
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
consumer.start();
System.out.println("Consumer Started.");
}
}
注意:每次重試後,訊息ID都不一致,所以不能使用訊息ID判斷冪等。
RocketMQ如何解決訊息冪等
注意:每次重試後,訊息ID都不一致,所以不能使用訊息ID判斷冪等。
解決辦法:使用自定義全域性ID判斷冪等,例如流水ID、訂單號
使用msg.setKeys 進行區分
生產者:
public class Producer {
public static void main(String[] args) throws MQClientException {
DefaultMQProducer producer = new DefaultMQProducer("rmq-group");
producer.setNamesrvAddr("192.168.110.195:9876;192.168.110.199:9876");
producer.setInstanceName("producer");
producer.start();
try {
for (int i = 0; i < 1; i++) {
Thread.sleep(1000); // 每秒傳送一次MQ
Message msg = new Message("itmayiedu-topic", // topic 主題名稱
"TagA", // tag 臨時值
("itmayiedu-6" + i).getBytes()// body 內容
);
msg.setKeys(System.currentTimeMillis() + "");
SendResult sendResult = producer.send(msg);
System.out.println(sendResult.toString());
}
} catch (Exception e) {
e.printStackTrace();
}
producer.shutdown();
}
}
消費者:
static private Map<String, String> logMap = new HashMap<>();
public static void main(String[] args) throws MQClientException {
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("rmq-group");
consumer.setNamesrvAddr("192.168.110.195:9876;192.168.110.199:9876");
consumer.setInstanceName("consumer");
consumer.subscribe("itmayiedu-topic", "TagA");
consumer.registerMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
String key = null;
String msgId = null;
try {
for (MessageExt msg : msgs) {
key = msg.getKeys();
if (logMap.containsKey(key)) {
// 無需繼續重試。
System.out.println("key:"+key+",無需重試...");
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
msgId = msg.getMsgId();
System.out.println("key:" + key + ",msgid:" + msgId + "---" + new String(msg.getBody()));
int i = 1 / 0;
}
} catch (Exception e) {
e.printStackTrace();
return ConsumeConcurrentlyStatus.RECONSUME_LATER;
} finally {
logMap.put(key, msgId);
}
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
consumer.start();
System.out.println("Consumer Started.");
}
參考文獻
https://help.aliyun.com/document_detail/44397.html?spm=a2c4g.11186623.6.577.sPsbuu