RocketMQ(二)叢集配置
RocketMQ 網路部署特點
Name Server是一個幾乎無狀態節點,可叢集部署,節點之間無任何資訊同步。
Broker部署相對複雜,Broker 分為Master與Slave,一個Master 可以對應多個Slave,但是一個Slave只能對應一個Master,Master與Slave 的對應關係通過指定相同的BrokerName,不同的BrokerId來定 義,BrokerId為0 表示Master,非0 表示Slave。Master也可以部署多個。每個 Broker與 Name Server叢集中的所有節點建立長連線,定時註冊 Topic資訊到所有 Name Server。
Producer與Name Server叢集中的其中一個節點(隨機選擇)建立長連線,定期從Name Server取Topic 路由資訊,並向提供Topic服務的Master 建立長連線,且定時向Master傳送心跳。Producer完全無 狀態,可叢集部署。
Consumer與Name Server叢集中的其中一個節點(隨機選擇)建立長連線,定期從Name Server 取Topic路由資訊,並向提供Topic服務的Master、Slave建立長連線,且定時向Master、Slave傳送心跳。Consumer既可以從Master訂閱訊息,也可以從Slave訂閱訊息,訂閱規則由Broker 配置決定。
NameServer叢集 | IP地址 |
---|---|
NameServer-1 | 192.168.1.101 |
NameServer-2 | 192.168.1.102 |
分別啟動
nohup sh mqnamesrv &
tail -f -n 500 $ROCKETMQ_HOME/logs/rocketmqlogs/namesrv.log
眾所周知,RocketMQ有多種叢集部署方式,它們的配置檔案也是分開的,如下:
- 2m-noslave: 多Master模式
- 2m-2s-sync: 多Master多Slave模式,同步雙寫
- 2m-2s-async
RocketMQ預設提供的配置檔案都是最基本的,很多配置都是預設值,如下:
這些肯定不滿足我們的要求,我們可以自己手動來配置
#所屬叢集名字
brokerClusterName=rocketmq-cluster
#broker名字,注意此處不同的配置檔案填寫的不一樣
brokerName=broker-a|broker-b
#0 表示 Master,>0 表示 Slave
brokerId=0
#nameServer地址,分號分割
namesrvAddr=192.168.1.101:9876;192.168.1.102:9876
#在傳送訊息時,自動建立伺服器不存在的topic,預設建立的佇列數
defaultTopicQueueNums=4
#是否允許 Broker 自動建立Topic,建議線下開啟,線上關閉
autoCreateTopicEnable=true
#是否允許 Broker 自動建立訂閱組,建議線下開啟,線上關閉
autoCreateSubscriptionGroup=true
#Broker 對外服務的監聽埠
listenPort=10911
#刪除檔案時間點,預設凌晨 4點
deleteWhen=04
#檔案保留時間,預設 48 小時
fileReservedTime=120
#commitLog每個檔案的大小預設1G
mapedFileSizeCommitLog=1073741824
#ConsumeQueue每個檔案預設存30W條,根據業務情況調整
mapedFileSizeConsumeQueue=300000
#destroyMapedFileIntervalForcibly=120000
#redeleteHangedFileInterval=120000
#檢測物理檔案磁碟空間
diskMaxUsedSpaceRatio=88
#儲存路徑
storePathRootDir=/usr/local/alibaba-rocketmq/store
#commitLog 儲存路徑
storePathCommitLog=/usr/local/alibaba-rocketmq/store/commitlog
#消費佇列儲存路徑儲存路徑
storePathConsumeQueue=/usr/local/alibaba-rocketmq/store/consumequeue
#訊息索引儲存路徑
storePathIndex=/usr/local/alibaba-rocketmq/store/index
#checkpoint 檔案儲存路徑
storeCheckpoint=/usr/local/alibaba-rocketmq/store/checkpoint
#abort 檔案儲存路徑
abortFile=/usr/local/alibaba-rocketmq/store/abort
#限制的訊息大小
maxMessageSize=65536
#flushCommitLogLeastPages=4
#flushConsumeQueueLeastPages=2
#flushCommitLogThoroughInterval=10000
#flushConsumeQueueThoroughInterval=60000
#Broker 的角色
#- ASYNC_MASTER 非同步複製Master
#- SYNC_MASTER 同步雙寫Master
#- SLAVE
brokerRole=ASYNC_MASTER
#刷盤方式
#- ASYNC_FLUSH 非同步刷盤
#- SYNC_FLUSH 同步刷盤
flushDiskType=ASYNC_FLUSH
#checkTransactionMessageEnable=false
#發訊息執行緒池數量
#sendMessageThreadPoolNums=128
#拉訊息執行緒池數量
#pullMessageThreadPoolNums=128
Broker叢集部署方式主要有以下幾種:(Slave不可寫,但可讀)
這種方式風險較大,一旦Broker重啟或者宕機時,會導致整個服務不可用,不建議線上環境使用。
一個叢集無 Slave,全是 Master,例如 2 個 Master 或者 3 個 Master。
brokerName | brokerId | brokerRole | IP地址 |
---|---|---|---|
broker-a | 0 | ASYNC_MASTER | 192.168.1.101 |
broker-b | 0 | ASYNC_MASTER | 192.168.1.103 |
優點:配置簡單,單個Master 宕機或重啟維護對應用無影響,在磁碟配置為 RAID10 時,即使機器宕機不可恢復情況下,由於RAID10 磁碟非常可靠,訊息也不會丟(非同步刷盤丟失少量訊息,同步刷盤一條不丟)。效能最高。
缺點:單臺機器宕機期間,這臺機器上未被消費的訊息在機器恢復之前不可訂閱,訊息實時性會受到受到影響。
- 在192.168.1.101,啟動第一個 Master(-n 192.168.1.101:9876 可省略)
nohup sh mqbroker -c $ROCKETMQ_HOME/conf/2m-noslave/broker-a.properties >/dev/null 2>&1 &
tail -f -n 500 $ROCKETMQ_HOME/logs/rocketmqlogs/broker.log
- 在192.168.1.103,啟動第二個 Master(-n 192.168.1.101:9876 可省略)
nohup sh mqbroker -c $ROCKETMQ_HOME/conf/2m-noslave/broker-b.properties >/dev/null 2>&1 &
tail -f -n 500 $ROCKETMQ_HOME/logs/rocketmqlogs/broker.log
每個 Master 配置一個 Slave,有多對Master-Slave,HA 採用非同步複製方式,主備有短暫訊息延遲,毫秒級。
brokerName | brokerId | brokerRole | IP地址 |
---|---|---|---|
broker-a | 0 | ASYNC_MASTER | 192.168.1.101 |
broker-a | 1 | SLAVE | 192.168.1.102 |
broker-b | 0 | ASYNC_MASTER | 192.168.1.103 |
broker-b | 1 | SLAVE | 192.168.1.104 |
優點:即使磁碟損壞,訊息丟失的非常少,且訊息實時性不會受影響,因為 Master 宕機後,消費者仍然可以從 Slave 消費,此過程對應用透明。不需要人工干預。效能同多 Master 模式幾乎一樣。
缺點:Master宕機,磁碟損壞情況,會丟失少量訊息。
- 在192.168.1.101,啟動第一個 Master(-n 192.168.1.101:9876 可省略)
nohup sh mqbroker -c $ROCKETMQ_HOME/conf/2m-2s-async/broker-a.properties >$ROCKETMQ_HOME/log/mq.log >/dev/null 2>&1 &
tail -f -n 500 $ROCKETMQ_HOME/logs/rocketmqlogs/broker.log
- 在192.168.1.102,啟動第一個 Slave(-n 192.168.1.101:9876 可省略)
nohup sh mqbroker -c $ROCKETMQ_HOME/conf/2m-2s-async/broker-a-s.properties >$ROCKETMQ_HOME/log/mq.log >/dev/null 2>&1 &
tail -f -n 500 $ROCKETMQ_HOME/logs/rocketmqlogs/broker.log
- 在192.168.1.103,啟動第二個 Master(-n 192.168.1.101:9876 可省略)
nohup sh mqbroker -c $ROCKETMQ_HOME/conf/2m-2s-async/broker-b.properties >$ROCKETMQ_HOME/log/mq.log >/dev/null 2>&1 &
tail -f -n 500 $ROCKETMQ_HOME/logs/rocketmqlogs/broker.log
- 在機器 192.168.1.104,啟動第二個 Slave(-n 192.168.1.101:9876 可省略)
nohup sh mqbroker -c $ROCKETMQ_HOME/conf/2m-2s-async/broker-b-s.properties >$ROCKETMQ_HOME/log/mq.log >/dev/null 2>&1 &
每個 Master 配置一個 Slave,有多對Master-Slave,HA 採用同步雙寫方式,主備都寫成功,嚮應用才返回成功。
brokerName | brokerId | brokerRole | IP地址 |
---|---|---|---|
broker-a | 0 | SYNC_MASTER | 192.168.1.101 |
broker-a | 1 | SLAVE | 192.168.1.102 |
broker-b | 0 | SYNC_MASTER | 192.168.1.103 |
broker-b | 1 | SLAVE | 192.168.1.104 |
優點:資料與服務都無單點,Master宕機情況下,訊息無延遲,服務可用性與資料可用性都非常高
缺點:效能比非同步複製模式略低,大約低10%左右,傳送單個訊息的 RT 會略高。目前主宕機後,備機不能自動切換為主機,後續會支援自動切換功能。
- 在192.168.1.101,啟動第一個 Master(-n 192.168.1.101:9876 可省略)
nohup sh mqbroker -c $ROCKETMQ_HOME/conf/2m-2s-sync/broker-a.properties >$ROCKETMQ_HOME/log/mq.log >/dev/null 2>&1 &
tail -f -n 500 $ROCKETMQ_HOME/logs/rocketmqlogs/broker.log
- 在192.168.1.102,啟動第一個 Slave(-n 192.168.1.101:9876 可省略)
nohup sh mqbroker -c $ROCKETMQ_HOME/conf/2m-2s-sync/broker-a-s.properties >$ROCKETMQ_HOME/log/mq.log >/dev/null 2>&1 &
tail -f -n 500 $ROCKETMQ_HOME/logs/rocketmqlogs/broker.log
- 在192.168.1.103,啟動第二個 Master(-n 192.168.1.101:9876 可省略)
nohup sh mqbroker -c $ROCKETMQ_HOME/conf/2m-2s-sync/broker-b.properties >$ROCKETMQ_HOME/log/mq.log >/dev/null 2>&1 &
tail -f -n 500 $ROCKETMQ_HOME/logs/rocketmqlogs/broker.log
- 在192.168.1.104,啟動第二個 Slave(-n 192.168.1.101:9876 可省略)
nohup sh mqbroker -c $ROCKETMQ_HOME/conf/2m-2s-sync/broker-b-s.properties >$ROCKETMQ_HOME/log/mq.log >/dev/null 2>&1 &
tail -f -n 500 $ROCKETMQ_HOME/logs/rocketmqlogs/broker.log
以上 Broker 與 Slave 配對是通過指定相同的brokerName 引數來配對,Master 的 BrokerId必須是 0,
Slave的BrokerId必須是大於 0 的數。另外一個 Master 下面可以掛載多個 Slave,同一 Master下的多個 Slave 通過指定不同的 BrokerId 來區分。
package com.somnus.rocketmq;
import java.util.concurrent.TimeUnit;
import com.alibaba.rocketmq.client.exception.MQClientException;
import com.alibaba.rocketmq.client.producer.LocalTransactionExecuter;
import com.alibaba.rocketmq.client.producer.LocalTransactionState;
import com.alibaba.rocketmq.client.producer.SendResult;
import com.alibaba.rocketmq.client.producer.TransactionCheckListener;
import com.alibaba.rocketmq.client.producer.TransactionMQProducer;
import com.alibaba.rocketmq.common.message.Message;
import com.alibaba.rocketmq.common.message.MessageExt;
public class Producer {
public static void main(String[] args) throws MQClientException, InterruptedException {
/**
* 一個應用建立一個Producer,由應用來維護此物件,可以設定為全域性物件或者單例<br>
* 注意:ProducerGroupName需要由應用來保證唯一,一類Producer集合的名稱,這類Producer通常傳送一類訊息,
* 且傳送邏輯一致<br>
* ProducerGroup這個概念傳送普通的訊息時,作用不大,但是傳送分散式事務訊息時,比較關鍵,
* 因為伺服器會回查這個Group下的任意一個Producer
*/
final TransactionMQProducer producer = new TransactionMQProducer("ProducerGroupName");
// nameserver服務
producer.setNamesrvAddr("172.16.235.77:9876;172.16.235.78:9876");
producer.setInstanceName("Producer");
/**
* Producer物件在使用之前必須要呼叫start初始化,初始化一次即可<br>
* 注意:切記不可以在每次傳送訊息時,都呼叫start方法
*/
producer.start();
// 伺服器回撥Producer,檢查本地事務分支成功還是失敗
producer.setTransactionCheckListener(new TransactionCheckListener() {
public LocalTransactionState checkLocalTransactionState(
MessageExt msg) {
System.out.println("checkLocalTransactionState --" + new String(msg.getBody()));
return LocalTransactionState.COMMIT_MESSAGE;
}
});
/**
* 下面這段程式碼表明一個Producer物件可以傳送多個topic,多個tag的訊息。
* 注意:send方法是同步呼叫,只要不拋異常就標識成功。但是傳送成功也可會有多種狀態,<br>
* 例如訊息寫入Master成功,但是Slave不成功,這種情況訊息屬於成功,但是對於個別應用如果對訊息可靠性要求極高,<br>
* 需要對這種情況做處理。另外,訊息可能會存在傳送失敗的情況,失敗重試由應用來處理。
*/
for (int i = 0; i < 10; i++) {
try {
{
Message msg = new Message("TopicTest1", // topic
"TagA", // tag
"OrderID001", // key訊息關鍵詞,多個Key用KEY_SEPARATOR隔開(查詢訊息使用)
("Hello MetaQA").getBytes()); // body
SendResult sendResult = producer.sendMessageInTransaction(
msg, new LocalTransactionExecuter(){
public LocalTransactionState executeLocalTransactionBranch(Message msg, Object arg) {
System.out.println("executeLocalTransactionBranch--msg=" + new String(msg.getBody()));
System.out.println("executeLocalTransactionBranch--arg=" + arg);
return LocalTransactionState.COMMIT_MESSAGE;
}
},
"$$$");
System.out.println(sendResult);
}
{
Message msg = new Message("TopicTest2", // topic
"TagB", // tag
"OrderID0034", // key 訊息關鍵詞,多個Key用KEY_SEPARATOR隔開(查詢訊息使用)
("Hello MetaQB").getBytes()); // body
SendResult sendResult = producer.sendMessageInTransaction(
msg, new LocalTransactionExecuter(){
public LocalTransactionState executeLocalTransactionBranch(Message msg, Object arg) {
System.out.println("executeLocalTransactionBranch--msg=" + new String(msg.getBody()));
System.out.println("executeLocalTransactionBranch--arg=" + arg);
return LocalTransactionState.COMMIT_MESSAGE;
}
},
"$$$");
System.out.println(sendResult);
}
{
Message msg = new Message("TopicTest3", // topic
"TagC", // tag
"OrderID061", // key
("Hello MetaQC").getBytes()); // body
SendResult sendResult = producer.sendMessageInTransaction(
msg, new LocalTransactionExecuter(){
public LocalTransactionState executeLocalTransactionBranch(Message msg, Object arg) {
System.out.println("executeLocalTransactionBranch--msg=" + new String(msg.getBody()));
System.out.println("executeLocalTransactionBranch--arg=" + arg);
return LocalTransactionState.COMMIT_MESSAGE;
}
},
"$$$");
System.out.println(sendResult);
}
} catch (Exception e) {
e.printStackTrace();
}
TimeUnit.MILLISECONDS.sleep(1000);
}
/**
* 應用退出時,要呼叫shutdown來清理資源,關閉網路連線,從MetaQ伺服器上登出自己
* 注意:我們建議應用在JBOSS、Tomcat等容器的退出鉤子裡呼叫shutdown方法
*/
// producer.shutdown();
Runtime.getRuntime().addShutdownHook(new Thread(new Runnable() {
public void run() {
producer.shutdown();
}
}));
System.exit(0);
} // 執行本地事務,由客戶端回撥
}
package com.somnus.rocketmq;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import com.alibaba.rocketmq.client.consumer.DefaultMQPullConsumer;
import com.alibaba.rocketmq.client.consumer.PullResult;
import com.alibaba.rocketmq.client.exception.MQClientException;
import com.alibaba.rocketmq.common.message.MessageExt;
import com.alibaba.rocketmq.common.message.MessageQueue;
public class Consumer {
// Java快取
private static final Map<MessageQueue, Long> offseTable = new HashMap<MessageQueue, Long>();
/**
* 主動拉取方式消費
*
* @throws MQClientException
*/
public static void main(String[] args) throws MQClientException {
/**
* 一個應用建立一個Consumer,由應用來維護此物件,可以設定為全域性物件或者單例<br>
* 注意:ConsumerGroupName需要由應用來保證唯一 ,最好使用服務的包名區分同一服務,一類Consumer集合的名稱,
* 這類Consumer通常消費一類訊息,且消費邏輯一致
* PullConsumer:Consumer的一種,應用通常主動呼叫Consumer的拉取訊息方法從Broker拉訊息,主動權由應用控制
*/
DefaultMQPullConsumer consumer = new DefaultMQPullConsumer("ConsumerGroupName");
// //nameserver服務
consumer.setNamesrvAddr("172.16.235.77:9876;172.16.235.78:9876");
consumer.setInstanceName("Consumber");
consumer.start();
// 拉取訂閱主題的佇列,預設佇列大小是4
Set<MessageQueue> mqs = consumer.fetchSubscribeMessageQueues("TopicTest1");
for (MessageQueue mq : mqs) {
System.out.println("Consume from the queue: " + mq);
SINGLE_MQ: while (true) {
try {
PullResult pullResult = consumer.pullBlockIfNotFound(mq, null, getMessageQueueOffset(mq), 32);
List<MessageExt> list = pullResult.getMsgFoundList();
if (list != null && list.size() < 100) {
for (MessageExt msg : list) {
System.out.println(new String(msg.getBody()));
}
}
System.out.println(pullResult.getNextBeginOffset());
putMessageQueueOffset(mq, pullResult.getNextBeginOffset());
switch (pullResult.getPullStatus()) {
case FOUND:
break;
case NO_MATCHED_MSG:
break;
case NO_NEW_MSG:
break SINGLE_MQ;
case OFFSET_ILLEGAL:
break;
default:
break;
}
} catch (Exception e) {
e.printStackTrace();
}
}
}
consumer.shutdown();
}
private static void putMessageQueueOffset(MessageQueue mq, long offset) {
offseTable.put(mq, offset);
}
private static long getMessageQueueOffset(MessageQueue mq) {
Long offset = offseTable.get(mq);
if (offset != null) {
System.out.println(offset);
return offset;
}
return 0;
}
}