RockerMQ實戰之快速入門
最近 RocketMQ 剛剛上生產環境,閒暇之時在這裡做一些分享,主要目的是讓初學者能快速上手RocketMQ。
RocketMQ 是什麼
Github 上關於 RocketMQ 的介紹:
RcoketMQ 是一款低延遲、高可靠、可伸縮、易於使用的訊息中介軟體。具有以下特性:
- 支援釋出/訂閱(Pub/Sub)和點對點(P2P)訊息模型
- 在一個佇列中可靠的先進先出(FIFO)和嚴格的順序傳遞
- 支援拉(pull)和推(push)兩種訊息模式
- 單一佇列百萬訊息的堆積能力
- 支援多種訊息協議,如 JMS、MQTT 等
- 分散式高可用的部署架構,滿足至少一次訊息傳遞語義
- 提供 docker 映象用於隔離測試和雲集群部署
- 提供配置、指標和監控等功能豐富的 Dashboard
對於這些特性描述,大家簡單過一眼就即可,深入學習之後自然就明白了。
專業術語
Producer
訊息生產者,生產者的作用就是將訊息傳送到 MQ,生產者本身既可以產生訊息,如讀取文字資訊等。也可以對外提供介面,由外部應用來呼叫介面,再由生產者將收到的訊息傳送到 MQ。
Producer Group
生產者組,簡單來說就是多個傳送同一類訊息的生產者稱之為一個生產者組。在這裡可以不用關心,只要知道有這麼一個概念即可。
Consumer
訊息消費者,簡單來說,消費 MQ 上的訊息的應用程式就是消費者,至於訊息是否進行邏輯處理,還是直接儲存到資料庫等取決於業務需要。
Consumer Group
消費者組,和生產者類似,消費同一類訊息的多個 consumer 例項組成一個消費者組。
Topic
Topic 是一種訊息的邏輯分類,比如說你有訂單類的訊息,也有庫存類的訊息,那麼就需要進行分類,一個是訂單 Topic 存放訂單相關的訊息,一個是庫存 Topic 儲存庫存相關的訊息。
Message
Message 是訊息的載體。一個 Message 必須指定 topic,相當於寄信的地址。Message 還有一個可選的 tag 設定,以便消費端可以基於 tag 進行過濾訊息。也可以新增額外的鍵值對,例如你需要一個業務 key 來查詢 broker 上的訊息,方便在開發過程中診斷問題。
Tag
標籤可以被認為是對 Topic 進一步細化。一般在相同業務模組中通過引入標籤來標記不同用途的訊息。
Broker
Broker 是 RocketMQ 系統的主要角色,其實就是前面一直說的 MQ。Broker 接收來自生產者的訊息,儲存以及為消費者拉取訊息的請求做好準備。
Name Server
Name Server 為 producer 和 consumer 提供路由資訊。
RocketMQ 架構
由這張圖可以看到有四個叢集,分別是 NameServer 叢集、Broker 叢集、Producer 叢集和 Consumer 叢集:
- NameServer: 提供輕量級的服務發現和路由。 每個 NameServer記錄完整的路由資訊,提供等效的讀寫服務,並支援快速儲存擴充套件。
- Broker: 通過提供輕量級的 Topic 和 Queue機制來處理訊息儲存,同時支援推(push)和拉(pull)模式以及主從結構的容錯機制。
- Producer:生產者,產生訊息的例項,擁有相同 Producer Group 的 Producer 組成一個叢集。
- Consumer:消費者,接收訊息進行消費的例項,擁有相同 Consumer Group 的Consumer 組成一個叢集。
簡單說明一下圖中箭頭含義,從 Broker 開始,Broker Master1 和 Broker Slave1 是主從結構,它們之間會進行資料同步,即 Date Sync。同時每個 Broker 與
NameServer 叢集中的所有節
點建立長連線,定時註冊 Topic 資訊到所有 NameServer 中。
Producer 與 NameServer 叢集中的其中一個節點(隨機選擇)建立長連線,定期從 NameServer 獲取 Topic 路由資訊,並向提供 Topic 服務的 Broker Master 建立長連線,且定時向 Broker 傳送心跳。Producer 只能將訊息傳送到 Broker master,但是 Consumer 則不一樣,它同時和提供 Topic 服務的 Master 和 Slave
建立長連線,既可以從 Broker Master 訂閱訊息,也可以從 Broker Slave 訂閱訊息。
RocketMQ 叢集部署模式
單 master 模式
也就是隻有一個 master 節點,稱不上是叢集,一旦這個 master 節點宕機,那麼整個服務就不可用,適合個人學習使用。
多 master 模式
多個 master 節點組成叢集,單個 master 節點宕機或者重啟對應用沒有影響。
- 優點:所有模式中效能最高
- 缺點:單個 master 節點宕機期間,未被消費的訊息在節點恢復之前不可用,訊息的實時性就受到影響。
- 注意:使用同步刷盤可以保證訊息不丟失,同時 Topic 相對應的 queue 應該分佈在叢集中各個節點,而不是隻在某各節點上,否則,該節點宕機會對訂閱該 topic 的應用造成影響。
多 master 多 slave 非同步複製模式
在多 master 模式的基礎上,每個 master 節點都有至少一個對應的 slave。master
節點可讀可寫,但是 slave 只能讀不能寫,類似於 mysql 的主備模式。
- 優點: 在 master 宕機時,消費者可以從 slave 讀取訊息,訊息的實時性不會受影響,效能幾乎和多 master 一樣。
- 缺點:使用非同步複製的同步方式有可能會有訊息丟失的問題。
多 master 多 slave 同步雙寫模式
同多 master 多 slave 非同步複製模式類似,區別在於 master 和 slave 之間的資料同步方式。
- 優點:同步雙寫的同步模式能保證資料不丟失。
- 缺點:傳送單個訊息 RT 會略長,效能相比非同步複製低10%左右。
- 刷盤策略:同步刷盤和非同步刷盤(指的是節點自身資料是同步還是非同步儲存)
- 同步方式:同步雙寫和非同步複製(指的一組 master 和 slave 之間資料的同步)
- 注意:要保證資料可靠,需採用同步刷盤和同步雙寫的方式,但效能會較其他方式低。
RocketMQ 單主部署
鑑於是快速入門,我選擇的是第一種單 master 的部署模式。先說明一下我的安裝環境:
- List item
- Centos 7.2
- jdk 1.8
- Maven 3.2.x
- Git
這裡 git 可用可不用,主要是用來直接下載 github 上的原始碼。也可以選擇自己到
github 上下載,然後上傳到伺服器上。以git操作為示例。
- clone 原始碼並用 maven 編譯
> git clone https://github.com/alibaba/RocketMQ.git /opt/RocketMQ
> cd /opt/RocketMQ && mvn -Dmaven.test.skip=true clean package install assembly:assembly -U
> cd target/alibaba-rocketmq-broker/alibaba-rocketmq
此處可能遇到的問題
一、執行"git clone https://github.com/alibaba/RocketMQ.git /home/inspkgs/RocketMQ
"時出現以下提示:
fatal: unable to access 'https://github.com/alibaba/RocketMQ.git/': Could not resolve host: github.com; Unknown error
解決辦法:一般是由於網路原因造成的,執行以下命令
> ping github.com
確定可以 ping 通之後,再重新執行 git clone 命令。
二、執行"mvn -Dmaven.test.skip=true clean package install assembly:assembly -U
"編譯時,可能出現下載相關jar很慢的情況。
這也是由於預設 maven 中央倉庫在國外的原因,可以根據需要在 /home/maven/conf/setting.xml
中的 <mirrors></mirrors>
新增以下內容後重新編譯:
<mirror>
<id>aliyun</id>
<mirrorOf>central</mirrorOf>
<name>aliyun maven</name>
<url>http://maven.aliyun.com/nexus/content/groups/public/</url>
</mirror>
- 啟動 Name Server
> nohup sh /opt/RocketMQ/bin/mqnamesrv &
//執行 jps 檢視程序
> jps
25913 NamesrvStartup
//檢視日誌確保服務已正常啟動
> tail -f ~/logs/rocketmqlogs/namesrv.log
The Name Server boot success...
- 啟動 broker
> nohup sh /opt/RocketMQ/bin/mqnamesrv &
//執行 jps 檢視程序
> jps
25913 NamesrvStartup
//檢視日誌確保服務已正常啟動
> tail -f ~/logs/rocketmqlogs/namesrv.log
The Name Server boot success...
- 傳送和接收訊息
傳送/接收訊息之前,我們需要告訴客戶端 NameServer 地址。RocketMQ 提供了多種方式來實現這一目標。為簡單起見,我們使用環境變數 NAMESRV_ADDR。
> export NAMESRV_ADDR=localhost:9876
> sh /opt/RocketMQ/bin/tools.sh com.alibaba.rocketmq.example.quickstart.Producer
SendResult [sendStatus=SEND_OK, msgId= ...
> sh /opt/RocketMQ/bin/tools.sh com.alibaba.rocketmq.example.quickstart.Consumer
ConsumeMessageThread_%d Receive New Messages: [MessageExt...
- 關閉服務
> sh /opt/RocketMQ/bin/mqshutdown broker
The mqbroker(36695) is running...
Send shutdown request to mqbroker(36695) OK
> sh /opt/RocketMQ/bin/mqshutdown namesrv
The mqnamesrv(36664) is running...
Send shutdown request to mqnamesrv(36664) OK
生產者、消費者 Demo
生產者
public class Producer {
public static void main(String[] args) throws MQClientException, InterruptedException {
//宣告並初始化一個producer
//需要一個producer group名字作為構造方法的引數,這裡為producer1
DefaultMQProducer producer = new DefaultMQProducer("producer1");
//設定NameServer地址,此處應改為實際NameServer地址,多個地址之間用;分隔
//NameServer的地址必須有,但是也可以通過環境變數的方式設定,不一定非得寫死在程式碼裡
producer.setNamesrvAddr("10.1.54.121:9876;10.1.54.122:9876");
//呼叫start()方法啟動一個producer例項
producer.start();
//傳送10條訊息到Topic為TopicTest,tag為TagA,訊息內容為“Hello RocketMQ”拼接上i的值
for (int i = 0; i < 10; i++) {
try {
Message msg = new Message("TopicTest",// topic
"TagA",// tag
("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET)// body
);
//呼叫producer的send()方法傳送訊息
//這裡呼叫的是同步的方式,所以會有返回結果
SendResult sendResult = producer.send(msg);
//列印返回結果,可以看到訊息傳送的狀態以及一些相關資訊
System.out.println(sendResult);
} catch (Exception e) {
e.printStackTrace();
Thread.sleep(1000);
}
}
//傳送完訊息之後,呼叫shutdown()方法關閉producer
producer.shutdown();
}
}
消費者
public class Consumer {
public static void main(String[] args) throws InterruptedException, MQClientException {
//宣告並初始化一個consumer
//需要一個consumer group名字作為構造方法的引數,這裡為consumer1
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("consumer1");
//同樣也要設定NameServer地址
consumer.setNamesrvAddr("10.1.54.121:9876;10.1.54.122:9876");
//這裡設定的是一個consumer的消費策略
//CONSUME_FROM_LAST_OFFSET 預設策略,從該佇列最尾開始消費,即跳過歷史訊息
//CONSUME_FROM_FIRST_OFFSET 從佇列最開始開始消費,即歷史訊息(還儲存在broker的)全部消費一遍
//CONSUME_FROM_TIMESTAMP 從某個時間點開始消費,和setConsumeTimestamp()配合使用,預設是半個小時以前
consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
//設定consumer所訂閱的Topic和Tag,*代表全部的Tag
consumer.subscribe("TopicTest", "*");
//設定一個Listener,主要進行訊息的邏輯處理
consumer.registerMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,
ConsumeConcurrentlyContext context) {
System.out.println(Thread.currentThread().getName() + " Receive New Messages: " + msgs);
//返回消費狀態
//CONSUME_SUCCESS 消費成功
//RECONSUME_LATER 消費失敗,需要稍後重新消費
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
//呼叫start()方法啟動consumer
consumer.start();
System.out.println("Consumer Started.");
}
}
本篇到此完結,第一次寫文章,表達不清楚的地方還請各位小夥伴海涵。如有問題,可以多多交流。如果後面有時間的話,我也會分享更多的乾貨,包括producer、consumer 的 API 詳解、如何與 spring boot 整合、生產環境部署的一些注意事項,以及學習 RocketMQ 這一路踩過的坑等等。
最後,如果覺得寫的還過得去,點個喜歡或者小小打賞一下都是一種肯定,謝謝大家的支援。
ps:偶然的機會看到自己寫的文章出現在一個個人網站,當我通過QQ聯絡那個人詢問這件事時,還被直接拉黑,真的很鬱悶。在此,我想說,想要用我的作品可以,但一定要徵得我同意,這是對一個作者最基本的尊重。
最新更新文章如下,包括修改的內容都會重新整合:
- 3分鐘快速入門RocketMQ(上)
- 3分鐘快速入門RocketMQ(下)
- 必知必會的RocketMQ訊息型別
- RocketMQ的訊息傳送方式(20180121更新)
- 細談RocketMQ的消費模式(20180126更新)
- 使用RocketMQ的小細節(上)(20180205更新)
- 使用RocketMQ的小細節(下)(20180301更新)