RocketMQ4.4.0新特性分享
阿新 • • 發佈:2019-04-26
放棄 函數 繼續 default ren lint 刪除 持續時間 最新 keys:Message索引鍵,多個用空格隔開,RocketMQ可以根據這些key快速檢索到消息
對消息關鍵字的提取方便查詢,比如一條消息某個關鍵字是 運單號,之後我們可以使用這個運單號作為關鍵字進行查詢
waitStoreMsgOK:消息發送時是否等消息存儲完成後再返
delayTimeLevel:消息延遲級別,用於定時消息或消息重
User property:自定義消息屬性
批量發送
單批次消息不能超過maxMessageSize大小(默認4M)
客戶端instance:如果instance為默認值DEFAULT的話,RocketMQ會自動將instance設置為IP+進程ID(建議不要設置,默認生成就好),默認最大4M
鉤子方法
消費
批量消費總數為32,broker設置
如果消息消費次數超過maxReconsumeTimes還未成功,則將該消息轉移到一個失敗隊列,等待被刪除
消息消費超時時間,默認為15分鐘
消息最大重試次數,默認為16次
consumeConcurrentlyMaxSpan,並發消息消費時處理隊列最大跨度,默認2000,表示如果消息處理隊列中偏移量最大的消息與偏移量最小的消息的跨度超過2000則延遲50毫秒後再拉取消息
pullInterval=0,推模式下拉取任務間隔時間,默認一次拉取任務完成繼續拉取
consumeMessageBatchMaxSize:消息並發消費時一次消費消息條數,通俗點說就是每次傳入MessageListtener#consumeMessage中的消息條數
RocketMQ消息重試是以消費組為單位,而不是主題,消息重試主題名為%RETRY%+消費組名。消費者在啟動的時候會自動訂閱該主題,參與該主題的消息隊列負載
同一個消息隊列只會分配給一個消費者,故如果消費者個數大於消息隊列數量,則有些消費者無法消費消息。
如果延遲級別大於0,則會將消息的主題設置為SCHEDULE_TOPIC_XXXX
transactionId 事物ID會自己生成
ConsumeFromWhere
CONSUME_FROM_FIRST_OFFSET:從頭開始消費
ONSUME_FROM_TIMESTAMP:從消費者啟動的時間戳對應的消費進度開始消費
CONSUME_FROM_LAST_OFFSET:從隊列最新偏移量開始消費
CONSUME_SUCCESS:消費成功
RECONSUME_LATER:延遲消費,放棄本批次消息消費 類似於continue,如果有重試次數沒有達到最大上限會再次消費
消息消費模式
集群模式:默認模式,主題下的同一條消息只允許被其中一個消費者消費
消費進度存儲在服務端
廣播模式:主題下的同一條消息將被集群內的所有消費者消費一次
消費進度存儲在消費者本地
消息傳輸模式
拉取消息模式:消費端主動發起拉消息請求
長輪詢模式使得消息拉取能實現準實時
從服務器拉取消息->放入內存隊列->提交消息到處理線程池
並發消費
推送消息模式:RocketMQ消息推模式的實現基於拉模式
RocketMQ並沒有真正實現推模式,而是消費者主動向消息服務器拉取消息,RocketMQ推模式是循環向消息服務端發送消息拉取請求
單獨線程池拉取消息,然後調用監聽api接口
單獨線程池拉取->內存隊列->消息處理線程池處理->移除客戶端內存隊列消息並更新進度
順序消息
消費過程 消息隊列負載->消息拉取->消息消費->消息消費進度存儲。
支持局部順序消息消費,也就是保證同一個消息隊列上的消息順序消費
如果要實現某一主題的全局順序消息消費,可以將該主題的隊列數設置為1,犧牲高性能和可用性
順序消息在創建消息隊列拉取任務時需要在Broker服務器鎖定該消息隊列。
MAX_TIME_CONSUME_CONTINUOUSLY:每次消費任務最大持續時間,默認為60s,切換線程
順序消息消費的並發度為消息隊列。也就是一個消息消費隊列同一時刻只會被一個消費線程池中一個線程消費。
達到重試次數上限,轉移到死信隊列,繼續後續消息的消費
定時消息
消息發送之後並不立即被消費者消費,而是要等到特定的時間之後才能被消費
不支持任意時間精度定時發送,只支持配置級別的時間默認為"1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h",delayLevel=1表示延遲1s,delayLevel=2表示延遲5s,依次類推。
SCHEDULE_TOPIC_XXXX定時消息主題
消息過濾
tag
tag服務端只是驗證了TAG的hashcode,客戶端再次對消息進行tag值對比過濾
sql(SQL92表達式)
(官方示例有bug)表達式沒有想象的好用,建議大家接收到消息自己判斷篩選
類過濾:定制過濾消息
消息過濾服務器(不講解)
consumer->filterserver->broker
事物消息
1分鐘回查一次,默認5次
事物
單向消息(One-way)
其實就是UDP協議的實現
TCP協議是可靠消息傳輸協議,請求消息都會有相應和校驗,在會話層和傳輸層解決應答
4、填坑方案
如果有大量消息積壓
增加消費者數量
如果有大量消息積壓並且馬上就到了自動清理的時間
重新消費導流到新的topic,增大新topic的隊列數量
5、bug
netty epoll 4.4.0之前版本沒有實現
6、問題
為什麽某條消息報異常會阻塞整個隊列消費
ProcessQueue中隊列最大偏移量與最小偏離量的間距,不能超過consumeConcurrentlyMaxSpan,否則觸發流控。這裏主要的考量是擔心一條消息堵塞,消息進度無法向前推進,可能造成大量消息重復消費
7、初始化客戶端註解
@PostConstruct 由JSR-250提供,在構造函數執行完之後執行,等價於xml配置文件中bean的initMethod
如果同一個jvm中同時註入生產者和消費者使用bean註解
會有異常拋出
8、客戶端驅動程序
Java
Go
.net
Php
c++
Nodejs
rocketmq
1、架構
MQ歷史
由數據結構隊列發展而來
MQ使用場景
異步處理
解耦
削峰填谷
數據同步
2、隊列
3、使用
生產
同步(sync)
默認重試2次總共3次
默認等待超時時間為3s
異步(async)
總共重試2次
單向(oneway)
Message
topic:主題名稱
tag:消息TAG,用於消息過濾
對消息的整體分類,比如 topic為物流跟蹤軌跡 ,軌跡包含 攬收 出庫 入庫 派送 簽收,可以分別給這些相同topic不同類型的數據打標簽分類解析處理
對消息關鍵字的提取方便查詢,比如一條消息某個關鍵字是 運單號,之後我們可以使用這個運單號作為關鍵字進行查詢
waitStoreMsgOK:消息發送時是否等消息存儲完成後再返
delayTimeLevel:消息延遲級別,用於定時消息或消息重
User property:自定義消息屬性
批量發送
單批次消息不能超過maxMessageSize大小(默認4M)
客戶端instance:如果instance為默認值DEFAULT的話,RocketMQ會自動將instance設置為IP+進程ID(建議不要設置,默認生成就好),默認最大4M
消費
批量消費總數為32,broker設置
如果消息消費次數超過maxReconsumeTimes還未成功,則將該消息轉移到一個失敗隊列,等待被刪除
消息消費超時時間,默認為15分鐘
消息最大重試次數,默認為16次
consumeConcurrentlyMaxSpan,並發消息消費時處理隊列最大跨度,默認2000,表示如果消息處理隊列中偏移量最大的消息與偏移量最小的消息的跨度超過2000則延遲50毫秒後再拉取消息
pullInterval=0,推模式下拉取任務間隔時間,默認一次拉取任務完成繼續拉取
consumeMessageBatchMaxSize:消息並發消費時一次消費消息條數,通俗點說就是每次傳入MessageListtener#consumeMessage中的消息條數
同一個消息隊列只會分配給一個消費者,故如果消費者個數大於消息隊列數量,則有些消費者無法消費消息。
如果延遲級別大於0,則會將消息的主題設置為SCHEDULE_TOPIC_XXXX
transactionId 事物ID會自己生成
ConsumeFromWhere
CONSUME_FROM_FIRST_OFFSET:從頭開始消費
ONSUME_FROM_TIMESTAMP:從消費者啟動的時間戳對應的消費進度開始消費
CONSUME_FROM_LAST_OFFSET:從隊列最新偏移量開始消費
CONSUME_SUCCESS:消費成功
RECONSUME_LATER:延遲消費,放棄本批次消息消費 類似於continue,如果有重試次數沒有達到最大上限會再次消費
消息消費模式
集群模式:默認模式,主題下的同一條消息只允許被其中一個消費者消費
消費進度存儲在服務端
廣播模式:主題下的同一條消息將被集群內的所有消費者消費一次
消費進度存儲在消費者本地
消息傳輸模式
拉取消息模式:消費端主動發起拉消息請求
長輪詢模式使得消息拉取能實現準實時
從服務器拉取消息->放入內存隊列->提交消息到處理線程池
並發消費
推送消息模式:RocketMQ消息推模式的實現基於拉模式
RocketMQ並沒有真正實現推模式,而是消費者主動向消息服務器拉取消息,RocketMQ推模式是循環向消息服務端發送消息拉取請求
單獨線程池拉取消息,然後調用監聽api接口
單獨線程池拉取->內存隊列->消息處理線程池處理->移除客戶端內存隊列消息並更新進度
順序消息
消費過程 消息隊列負載->消息拉取->消息消費->消息消費進度存儲。
支持局部順序消息消費,也就是保證同一個消息隊列上的消息順序消費
如果要實現某一主題的全局順序消息消費,可以將該主題的隊列數設置為1,犧牲高性能和可用性
順序消息在創建消息隊列拉取任務時需要在Broker服務器鎖定該消息隊列。
MAX_TIME_CONSUME_CONTINUOUSLY:每次消費任務最大持續時間,默認為60s,切換線程
順序消息消費的並發度為消息隊列。也就是一個消息消費隊列同一時刻只會被一個消費線程池中一個線程消費。
達到重試次數上限,轉移到死信隊列,繼續後續消息的消費
定時消息
消息發送之後並不立即被消費者消費,而是要等到特定的時間之後才能被消費
不支持任意時間精度定時發送,只支持配置級別的時間默認為"1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h",delayLevel=1表示延遲1s,delayLevel=2表示延遲5s,依次類推。
SCHEDULE_TOPIC_XXXX定時消息主題
消息過濾
tag
tag服務端只是驗證了TAG的hashcode,客戶端再次對消息進行tag值對比過濾
sql(SQL92表達式)
(官方示例有bug)表達式沒有想象的好用,建議大家接收到消息自己判斷篩選
類過濾:定制過濾消息
消息過濾服務器(不講解)
consumer->filterserver->broker
事物消息
1分鐘回查一次,默認5次
事物
單向消息(One-way)
其實就是UDP協議的實現
TCP協議是可靠消息傳輸協議,請求消息都會有相應和校驗,在會話層和傳輸層解決應答
4、填坑方案
如果有大量消息積壓
增加消費者數量
如果有大量消息積壓並且馬上就到了自動清理的時間
重新消費導流到新的topic,增大新topic的隊列數量
5、bug
netty epoll 4.4.0之前版本沒有實現
6、問題
為什麽某條消息報異常會阻塞整個隊列消費
ProcessQueue中隊列最大偏移量與最小偏離量的間距,不能超過consumeConcurrentlyMaxSpan,否則觸發流控。這裏主要的考量是擔心一條消息堵塞,消息進度無法向前推進,可能造成大量消息重復消費
7、初始化客戶端註解
@PostConstruct 由JSR-250提供,在構造函數執行完之後執行,等價於xml配置文件中bean的initMethod
如果同一個jvm中同時註入生產者和消費者使用bean註解
會有異常拋出
8、客戶端驅動程序
Java
Go
.net
Php
c++
Nodejs
RocketMQ4.4.0新特性分享