RocketMq簡介及重要概念
目錄
RocketMq簡介
rocketmq是一個訊息中介軟體,基於Kafka的設計思想,但不是kafka的拷貝,它具有高吞吐量,高可用性,適用於大規模分散式系統的特點
RocketMq重要概念
1.producer
訊息生產者
2.Consumer
訊息消費者
3.PushConsumer
broker推送訊息到consumer
4.PullConsumer
consumer主動從broker定時pull訊息
5.ProducerGroup
一類producer的集合名稱,這類producer通常傳送一類訊息,且傳送邏輯一致
6.ConsumerGroup
一類consumer的集合名稱,這類consumer通常消費一類訊息,且消費邏輯一致
7.broker
訊息中轉站
8.nameserve
無狀態的資料節點,記錄broker的路由資訊,以及topic,佇列等
9.廣播訊息
一個訊息被多個consumer消費,即使這些consumer屬於同一個組,也會被組內的每個consumer都消費一次
10.叢集訊息
一個consumergroup中的consumer例項平均分攤消費訊息,即不需要自己做訊息消費的負載均衡,只需要擴充套件機器即可
11.Topic
表示一個類別
12.tag
Topic下的一個子類訊息,更進一步細分訊息型別
單機RocketMq搭建
1.上傳rocketmq jar包
2.tar zxvf 解壓rocketmq
3.修改虛擬機器引數
/bin/runserver.sh /bin/runbroker.sh 中的虛擬機器引數,因為預設4g,可以改小點
JAVA_OPT="${JAVA_OPT} -server -Xms1g -Xmx1g -Xmn512m -XX:PermSize=128m -XX:MaxPermSize=320m"
4.建立log日誌
在conf下更改日誌列印路徑
5.啟動nameserver和broker
啟動nameserver
nohup sh bin/mqnamesrv &
啟動broker
nohup sh bin/mqbroker -n localhost:9876 &
檢視連線此nameserve的broker
sh mqadmin clusterList -n localhost:9876
結果
可以看到此nameserve只有一個broker註冊
測試訊息的傳送和接受
生產者程式碼:
public class FirstSyncProducer {
public static void main(String[] args) throws Exception{
//Instantiate with a producer group name.
Defau ltMQProducer producer = new
DefaultMQProducer("please_rename_unique_group_name");
// Specify name server addresses.
producer.setNamesrvAddr("192.168.122.10:9876");
producer.setVipChannelEnabled(false);
//Launch the instance.
producer.start();
for (int i = 0; i < 10; i++) {
//Create a message instance, specifying topic, tag and message body.
Message msg = new Message("TopicTest" /* Topic */,
"TagA" /* Tag */,
("Hello RocketMQ " +
i).getBytes(RemotingHelper.DEFAULT_CHARSET) /* Message body */
);
//Call send message to deliver message to one of brokers.
SendResult sendResult = producer.send(msg);
System.out.printf("%s%n", sendResult);
}
//Shut down once the producer instance is not longer in use.
producer.shutdown();
}
}
傳送結果:
SendResult [sendStatus=SEND_OK, msgId=0A011949442418B4AAC27EC7255F0000, offsetMsgId=C0A87A0A00002A9F0000000000000000, messageQueue=MessageQueue [topic=TopicTest, brokerName=bogon, queueId=2], queueOffset=0]
SendResult [sendStatus=SEND_OK, msgId=0A011949442418B4AAC27EC725AB0001, offsetMsgId=C0A87A0A00002A9F00000000000000B2, messageQueue=MessageQueue [topic=TopicTest, brokerName=bogon, queueId=3], queueOffset=0]
SendResult [sendStatus=SEND_OK, msgId=0A011949442418B4AAC27EC725B50002, offsetMsgId=C0A87A0A00002A9F0000000000000164, messageQueue=MessageQueue [topic=TopicTest, brokerName=bogon, queueId=0], queueOffset=0]
SendResult [sendStatus=SEND_OK, msgId=0A011949442418B4AAC27EC725C20003, offsetMsgId=C0A87A0A00002A9F0000000000000216, messageQueue=MessageQueue [topic=TopicTest, brokerName=bogon, queueId=1], queueOffset=0]
SendResult [sendStatus=SEND_OK, msgId=0A011949442418B4AAC27EC725CE0004, offsetMsgId=C0A87A0A00002A9F00000000000002C8, messageQueue=MessageQueue [topic=TopicTest, brokerName=bogon, queueId=2], queueOffset=1]
SendResult [sendStatus=SEND_OK, msgId=0A011949442418B4AAC27EC725D60005, offsetMsgId=C0A87A0A00002A9F000000000000037A, messageQueue=MessageQueue [topic=TopicTest, brokerName=bogon, queueId=3], queueOffset=1]
SendResult [sendStatus=SEND_OK, msgId=0A011949442418B4AAC27EC725DE0006, offsetMsgId=C0A87A0A00002A9F000000000000042C, messageQueue=MessageQueue [topic=TopicTest, brokerName=bogon, queueId=0], queueOffset=1]
SendResult [sendStatus=SEND_OK, msgId=0A011949442418B4AAC27EC725E70007, offsetMsgId=C0A87A0A00002A9F00000000000004DE, messageQueue=MessageQueue [topic=TopicTest, brokerName=bogon, queueId=1], queueOffset=1]
SendResult [sendStatus=SEND_OK, msgId=0A011949442418B4AAC27EC725F50008, offsetMsgId=C0A87A0A00002A9F0000000000000590, messageQueue=MessageQueue [topic=TopicTest, brokerName=bogon, queueId=2], queueOffset=2]
SendResult [sendStatus=SEND_OK, msgId=0A011949442418B4AAC27EC725FF0009, offsetMsgId=C0A87A0A00002A9F0000000000000642, messageQueue=MessageQueue [topic=TopicTest, brokerName=bogon, queueId=3], queueOffset=2]
消費者程式碼:
public class FirstConsumer {
public static void main(String[] args) throws InterruptedException, MQClientException {
// Instantiate with specified consumer group name.
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("please_rename_unique_group_name");
// Specify name server addresses.
consumer.setNamesrvAddr("192.168.122.10:9876");
consumer.setVipChannelEnabled(false);
// Subscribe one more more topics to consume.
consumer.subscribe("TopicTest", "*");
// Register callback to execute on arrival of messages fetched from brokers.
consumer.registerMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,
ConsumeConcurrentlyContext context) {
System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msgs);
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
//Launch the consumer instance.
consumer.start();
System.out.printf("Consumer Started.%n");
}
}
結果:
ConsumeMessageThread_9 Receive New Messages: [MessageExt [queueId=2, storeSize=178, queueOffset=1, sysFlag=0, bornTimestamp=1540450180558, bornHost=/192.168.122.1:5393, storeTimestamp=1540450180559, storeHost=/192.168.122.10:10911, msgId=C0A87A0A00002A9F00000000000002C8, commitLogOffset=712, bodyCRC=601994070, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message [topic=TopicTest, flag=0, properties={MIN_OFFSET=0, MAX_OFFSET=3, CONSUME_START_TIME=1540450248729, UNIQ_KEY=0A011949442418B4AAC27EC725CE0004, WAIT=true, TAGS=TagA}, body=16]]]
ConsumeMessageThread_8 Receive New Messages: [MessageExt [queueId=3, storeSize=178, queueOffset=2, sysFlag=0, bornTimestamp=1540450180607, bornHost=/192.168.122.1:5393, storeTimestamp=1540450180613, storeHost=/192.168.122.10:10911, msgId=C0A87A0A00002A9F0000000000000642, commitLogOffset=1602, bodyCRC=1565577195, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message [topic=TopicTest, flag=0, properties={MIN_OFFSET=0, MAX_OFFSET=3, CONSUME_START_TIME=1540450248729, UNIQ_KEY=0A011949442418B4AAC27EC725FF0009, WAIT=true, TAGS=TagA}, body=16]]]
ConsumeMessageThread_5 Receive New Messages: [MessageExt [queueId=3, storeSize=178, queueOffset=1, sysFlag=0, bornTimestamp=1540450180566, bornHost=/192.168.122.1:5393, storeTimestamp=1540450180568, storeHost=/192.168.122.10:10911, msgId=C0A87A0A00002A9F000000000000037A, commitLogOffset=890, bodyCRC=1424393152, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message [topic=TopicTest, flag=0, properties={MIN_OFFSET=0, MAX_OFFSET=3, CONSUME_START_TIME=1540450248729, UNIQ_KEY=0A011949442418B4AAC27EC725D60005, WAIT=true, TAGS=TagA}, body=16]]]
ConsumeMessageThread_6 Receive New Messages: [MessageExt [queueId=0, storeSize=178, queueOffset=1, sysFlag=0, bornTimestamp=1540450180574, bornHost=/192.168.122.1:5393, storeTimestamp=1540450180576, storeHost=/192.168.122.10:10911, msgId=C0A87A0A00002A9F000000000000042C, commitLogOffset=1068, bodyCRC=1307562618, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message [topic=TopicTest, flag=0, properties={MIN_OFFSET=0, MAX_OFFSET=2, CONSUME_START_TIME=1540450248729, UNIQ_KEY=0A011949442418B4AAC27EC725DE0006, WAIT=true, TAGS=TagA}, body=16]]]
ConsumeMessageThread_4 Receive New Messages: [MessageExt [queueId=1, storeSize=178, queueOffset=1, sysFlag=0, bornTimestamp=1540450180583, bornHost=/192.168.122.1:5393, storeTimestamp=1540450180584, storeHost=/192.168.122.10:10911, msgId=C0A87A0A00002A9F00000000000004DE, commitLogOffset=1246, bodyCRC=988340972, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message [topic=TopicTest, flag=0, properties={MIN_OFFSET=0, MAX_OFFSET=2, CONSUME_START_TIME=1540450248729, UNIQ_KEY=0A011949442418B4AAC27EC725E70007, WAIT=true, TAGS=TagA}, body=16]]]
ConsumeMessageThread_1 Receive New Messages: [MessageExt [queueId=1, storeSize=178, queueOffset=0, sysFlag=0, bornTimestamp=1540450180546, bornHost=/192.168.122.1:5393, storeTimestamp=1540450180547, storeHost=/192.168.122.10:10911, msgId=C0A87A0A00002A9F0000000000000216, commitLogOffset=534, bodyCRC=1032136437, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message [topic=TopicTest, flag=0, properties={MIN_OFFSET=0, MAX_OFFSET=2, CONSUME_START_TIME=1540450248729, UNIQ_KEY=0A011949442418B4AAC27EC725C20003, WAIT=true, TAGS=TagA}, body=16]]]
ConsumeMessageThread_3 Receive New Messages: [MessageExt [queueId=0, storeSize=178, queueOffset=0, sysFlag=0, bornTimestamp=1540450180533, bornHost=/192.168.122.1:5393, storeTimestamp=1540450180538, storeHost=/192.168.122.10:10911, msgId=C0A87A0A00002A9F0000000000000164, commitLogOffset=356, bodyCRC=1250039395, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message [topic=TopicTest, flag=0, properties={MIN_OFFSET=0, MAX_OFFSET=2, CONSUME_START_TIME=1540450248729, UNIQ_KEY=0A011949442418B4AAC27EC725B50002, WAIT=true, TAGS=TagA}, body=16]]]
ConsumeMessageThread_7 Receive New Messages: [MessageExt [queueId=2, storeSize=178, queueOffset=0, sysFlag=0, bornTimestamp=1540450180448, bornHost=/192.168.122.1:5393, storeTimestamp=1540450180486, storeHost=/192.168.122.10:10911, msgId=C0A87A0A00002A9F0000000000000000, commitLogOffset=0, bodyCRC=613185359, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message [topic=TopicTest, flag=0, properties={MIN_OFFSET=0, MAX_OFFSET=3, CONSUME_START_TIME=1540450248729, UNIQ_KEY=0A011949442418B4AAC27EC7255F0000, WAIT=true, TAGS=TagA}, body=16]]]
ConsumeMessageThread_2 Receive New Messages: [MessageExt [queueId=3, storeSize=178, queueOffset=0, sysFlag=0, bornTimestamp=1540450180523, bornHost=/192.168.122.1:5393, storeTimestamp=1540450180522, storeHost=/192.168.122.10:10911, msgId=C0A87A0A00002A9F00000000000000B2, commitLogOffset=178, bodyCRC=1401636825, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message [topic=TopicTest, flag=0, properties={MIN_OFFSET=0, MAX_OFFSET=3, CONSUME_START_TIME=1540450248729, UNIQ_KEY=0A011949442418B4AAC27EC725AB0001, WAIT=true, TAGS=TagA}, body=16]]]
ConsumeMessageThread_10 Receive New Messages: [MessageExt [queueId=2, storeSize=178, queueOffset=2, sysFlag=0, bornTimestamp=1540450180597, bornHost=/192.168.122.1:5393, storeTimestamp=1540450180600, storeHost=/192.168.122.10:10911, msgId=C0A87A0A00002A9F0000000000000590, commitLogOffset=1424, bodyCRC=710410109, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message [topic=TopicTest, flag=0, properties={MIN_OFFSET=0, MAX_OFFSET=3, CONSUME_START_TIME=1540450248730, UNIQ_KEY=0A011949442418B4AAC27EC725F50008, WAIT=true, TAGS=TagA}, body=16]]]
從上面看到,訊息是消費成功的,並且預設的topic是四個佇列
RocketMq總體結構
Broker
a.連線
broker與每個nameserve保持長連線
b.心跳
心跳間隔:broker每隔30秒向所有的nameserve傳送心跳,心跳包含topic資訊
心跳超時:nameserver間隔10秒鐘掃描所有還存活的broker連線,若某個broker2分鐘內沒有傳送資料,則斷開連線
c.可用性
rocketmq一般都是使用master/slave結構,slave定期從master讀取資料,一旦master掛掉,消費者定期從slave消費資料,但slave不能寫入資料
d.可靠性
傳送到broker的資料有同步刷盤和非同步刷盤兩種機制,同步刷盤是資料寫道磁碟後,返回確認值,非同步刷盤是先返回在把資料寫入磁碟
e.讀寫效能
利用linux的sendfile機制,將訊息內容直接輸出到sokect管道,避免系統呼叫
採用零拷貝技術,資料操作很快
零拷貝:主要是從檔案的讀取,寫入,傳輸方面考慮,減少了資料的複製,可以參考下篇文章
https://www.linuxjournal.com/article/6345
客戶端定址方式
1.程式碼指定
Producer.setNamesrvAddr(“192.168.122.1:9876”);
或
Consumer.setNamesrvAddr(“192.168.122.1:9876”);
2.Java啟動時指定引數
-Drocketmq.namesrv.addr=192.168.122.1:9876
3.環境變數指定NameServe地址
export NAMESRV_ADDR=192.168.122.1:987
4.http靜態伺服器定址
客戶端啟動後,會定時訪問一個http靜態伺服器:http://jmenv.tbsite.net:8080/rocketmq/msaddr,該服務響應nameserver地址
客戶端預設每隔2分鐘訪問一次這個HTTP伺服器,並更新本地的NameServer地址。URL已經在程式碼中寫死,可通過修改/etc/hosts檔案來改變要訪問的伺服器,例如在/etc/hosts增加如下配置:
10.232.22.67 jmenv.tbsite.net