Java必知必會-RocketMQ高階應用
架構
核心模組
- rocketmq-broker:接受生產者發來的訊息並存儲(通過呼叫rocketmq-store),消費者從這裡取得訊息
- rocketmq-client:提供傳送、接受訊息的客戶端API。
- rocketmq-namesrv:NameServer,類似於Zookeeper,這裡儲存著訊息的TopicName,佇列等執行時的元資訊。
- rocketmq-common:通用的一些類,方法,資料結構等。
- rocketmq-remoting:基於Netty4的client/server + fastjson序列化 + 自定義二進位制協議。
- rocketmq-store:訊息、索引儲存等。
- rocketmq-filtersrv:訊息過濾器Server,需要注意的是,要實現這種過濾,需要上傳程式碼到MQ!(一般而言,我們利用Tag足以滿足大部分的過濾需求,如果更靈活更復雜的過濾需求,可以考慮filtersrv元件)。
- rocketmq-tools:命令列工具。
NameServer
NameServer壓力不會太大,平時主要開銷是在維持心跳和提供Topic-Broker的關係資料。每個NameServer節點互相之間是獨立的,沒有任何資訊互動。
Broker向NameServer發心跳時, 會帶上當前自己所負責的所有Topic資訊,如果Topic個數太多(萬級別),會導致一次心跳中,就Topic的資料就幾十M,網路情況差的話, 網路傳輸失敗,心跳失敗,導致NameServer誤認為Broker心跳失敗。
NameServer 被設計成幾乎無狀態的,可以橫向擴充套件,節點之間相互之間無通訊,通過部署多臺機器來標記自己是一個偽叢集。
每個 Broker 在啟動的時候會到 NameServer 註冊,Producer 在傳送訊息前會根據 Topic 到 NameServer 獲取到 Broker 的路由資訊,Consumer 也會定時獲取 Topic 的路由資訊。
所以從功能上看NameServer應該是和 ZooKeeper 差不多
Producer
每個 Broker 在啟動的時候會到 NameServer 註冊,Producer 在傳送訊息前會根據 Topic 到 NameServer 獲取到 Broker 的路由資訊,Consumer 也會定時獲取 Topic 的路由資訊。
所以從功能上看NameServer應該是和 ZooKeeper 差不多,據說 RocketMQ 的早期版本確實是使用的 ZooKeeper ,後來改為了自己實現的 NameServer 。
- 同步傳送:同步傳送指訊息傳送方發出資料後會在收到接收方發回響應之後才發下一個數據包。一般用於重要通知訊息,例如重要通知郵件、營銷簡訊。
- 非同步傳送:非同步傳送指傳送方發出資料後,不等接收方發回響應,接著傳送下個數據包,一般用於可能鏈路耗時較長而對響應時間敏感的業務場景,例如使用者視訊上傳後通知啟動轉碼服務。
- 單向傳送:單向傳送是指只負責傳送訊息而不等待伺服器迴應且沒有回撥函式觸發,適用於某些耗時非常短但對可靠性要求並不高的場景,例如日誌收集。
Broker
- Broker是具體提供業務的伺服器,單個Broker節點與所有的NameServer節點保持長連線及心跳,並會定時將Topic資訊註冊到NameServer,順帶一提底層的通訊和連線都是基於Netty實現的。
- Broker負責訊息儲存,以Topic為緯度支援輕量級的佇列,單機可以支撐上萬佇列規模,支援訊息推拉模型。
- 官網上有資料顯示:具有上億級訊息堆積能力,同時可嚴格保證訊息的有序性。
Consumer
- Consumer也由使用者部署,支援PUSH和PULL兩種消費模式,支援叢集消費和廣播訊息,提供實時的訊息訂閱機制。
- Pull:拉取型消費者(Pull Consumer)主動從訊息伺服器拉取資訊,只要批量拉取到訊息,使用者應用就會啟動消費過程,所以 Pull 稱為主動消費型。
- Push:推送型消費者(Push Consumer)封裝了訊息的拉取、消費進度和其他的內部維護工作,將訊息到達時執行的回撥介面留給使用者應用程式來實現。所以 Push 稱為被動消費型別,但從實現上看還是從訊息伺服器中拉取訊息,不同於 Pull 的是 Push 首先要註冊消費監聽器,當監聽器處觸發後才開始消費訊息。
訊息領域模型
Message
Message(訊息)就是要傳輸的資訊。
一條訊息必須有一個主題(Topic),主題可以看做是你的信件要郵寄的地址。
一條訊息也可以擁有一個可選的標籤(Tag)和額處的鍵值對,它們可以用於設定一個業務 Key 並在 Broker 上查詢此訊息以便在開發期間查詢問題。
Topic
Topic(主題)可以看做訊息的規類,它是訊息的第一級型別。比如一個電商系統可以分為:交易訊息、物流訊息等,一條訊息必須有一個 Topic 。
Topic 與生產者和消費者的關係非常鬆散,一個 Topic 可以有0個、1個、多個生產者向其傳送訊息,一個生產者也可以同時向不同的 Topic 傳送訊息。
一個 Topic 也可以被 0個、1個、多個消費者訂閱。
Tag
Tag(標籤)可以看作子主題,它是訊息的第二級型別,用於為使用者提供額外的靈活性。使用標籤,同一業務模組不同目的的訊息就可以用相同 Topic 而不同的 Tag 來標識。比如交易訊息又可以分為:交易建立訊息、交易完成訊息等,一條訊息可以沒有 Tag 。
標籤有助於保持您的程式碼乾淨和連貫,並且還可以為 RocketMQ 提供的查詢系統提供幫助。
Group
分組,一個組可以訂閱多個Topic。
分為ProducerGroup,ConsumerGroup,代表某一類的生產者和消費者,一般來說同一個服務可以作為Group,同一個Group一般來說傳送和消費的訊息都是一樣的
Queue
在Kafka中叫Partition,每個Queue內部是有序的,在RocketMQ中分為讀和寫兩種佇列,一般來說讀寫佇列數量一致,如果不一致就會出現很多問題。
Message Queue
Message Queue(訊息佇列),主題被劃分為一個或多個子主題,即訊息佇列。
一個 Topic 下可以設定多個訊息佇列,傳送訊息時執行該訊息的 Topic ,RocketMQ 會輪詢該 Topic 下的所有佇列將訊息發出去。
訊息的物理管理單位。一個Topic下可以有多個Queue,Queue的引入使得訊息的儲存可以分散式叢集化,具有了水平擴充套件能力。
Offset
在RocketMQ 中,所有訊息佇列都是持久化,長度無限的資料結構,所謂長度無限是指佇列中的每個儲存單元都是定長,訪問其中的儲存單元使用Offset 來訪問,Offset 為 java long 型別,64 位,理論上在 100年內不會溢位,所以認為是長度無限。
也可以認為 Message Queue 是一個長度無限的陣列,Offset 就是下標。
訊息流程
Producer 與 NameServer叢集中的其中一個節點(隨機選擇)建立長連線,定期從 NameServer 獲取 Topic 路由資訊,並向提供 Topic 服務的 Broker Master 建立長連線,且定時向 Broker 傳送心跳。
Producer 只能將訊息傳送到 Broker master,但是 Consumer 則不一樣,它同時和提供 Topic 服務的 Master 和 Slave建立長連線,既可以從 Broker Master 訂閱訊息,也可以從 Broker Slave 訂閱訊息。
初始化啟動:NameService
NamesrvStartup基本類描述了rocket初始化流程:
- 第一步是初始化配置
- 建立NamesrvController例項,並開啟兩個定時任務:
- 每隔10s掃描一次Broker,移除處於不啟用的Broker;
- 每隔10s列印一次KV配置。
Broker
Broker在RocketMQ中是進行處理Producer傳送訊息請求,Consumer消費訊息的請求,並且進行訊息的持久化,以及HA策略和服務端過濾,就是叢集中很重的工作都是交給了Broker進行處理。
Broker模組是通過BrokerStartup進行啟動的,會例項化BrokerController,並且呼叫其初始化方法
初始化流程很冗長,會根據配置建立很多執行緒池主要用來傳送訊息、拉取訊息、查詢訊息、客戶端管理和消費者管理,也有很多定時任務,同時也註冊了很多請求處理器,用來發送拉取訊息查詢訊息的。
Consumer
消費端會通過RebalanceService執行緒,10秒鐘做一次基於Topic下的所有佇列負載。
重複消費
例如,積分系統處理失敗了,這個系統要求重新發送一次這個訊息,積分的系統重新接收並且處理成功了,但是別人的活動,優惠券等等服務也監聽了這個訊息,就可能出現活動系統給他加GMV加兩次,優惠券扣兩次這種情況,這就是重複消費。一般採用介面冪等的方案解決重複消費的問題
介面冪等
同樣的引數呼叫我這個介面,呼叫多少次結果都是一個,你加GMV同一個訂單號你加一次是多少錢,你加N次都還是多少錢。
介面冪等的保證可以使用強校驗和弱校驗,分場景考慮
強校驗
每次訊息過來都要拿著訂單號+業務場景這樣的唯一標識(比是天貓雙十一活動)去流水錶查,看看有沒有這條流水,有就直接return不要走下面的流程了,沒有就執行後面的邏輯。之所以用流水錶,是因為涉及到金錢這樣的活動,有啥問題後面也可以去流水錶對賬,還有就是幫助開發人員定位問題。
弱校驗
一些不重要的場景,比如給誰發簡訊啥的,我就把這個id+場景唯一標識作為Redis的key,放到快取裡面失效時間看你場景,一定時間內的這個訊息就去Redis判斷。
順序消費
資料量大的時候資料同步壓力還是很大的,有時候資料量大的表需要同步幾個億的資料。(並不是主從同步,主從延遲大會有問題,可能是從資料庫或者主資料庫同步到備庫)
這種情況我們都是懟到佇列裡面去,然後慢慢消費的,那問題就來了,我們在資料庫同時對一個Id的資料進行了增、改、刪三個操作,但是你訊息發過去消費的時候變成了改,刪、增,這樣資料就不對了。
解決
一個topic下有多個佇列,為了保證傳送有序,RocketMQ提供了MessageQueueSelector佇列選擇機制,他有三種實現:
我們可使用Hash取模法,讓同一個訂單傳送到同一個佇列中,再使用同步傳送,只有同個訂單的建立訊息傳送成功,再發送支付訊息。這樣,我們保證了傳送有序。
RocketMQ的topic內的佇列機制,可以保證儲存滿足FIFO,剩下的只需要消費者順序消費即可。
RocketMQ僅保證順序傳送,順序消費由消費者業務保證!!!
這裡很好理解,一個訂單你傳送的時候放到一個佇列裡面去,同一個的訂單號Hash後還是一樣的結果,那肯定是一個消費者消費,順序就可以保證了
真正的順序消費不同的中介軟體都有自己的不同實現我這裡就舉個例子,大家思路理解下。
訊息去重
去重原則:使用業務端邏輯保持冪等性
冪等性:就是使用者對於同一操作發起的一次請求或者多次請求的結果是一致的,不會因為多次點選而產生了副作用,資料庫的結果都是唯一的,不可變的。
只要保持冪等性,不管來多少條重複訊息,最後處理的結果都一樣,需要業務端來實現。
去重策略:保證每條訊息都有唯一編號(比如唯一流水號),且保證訊息處理成功與去重表的日誌同時出現。
建立一個訊息表,拿到這個訊息做資料庫的insert操作。給這個訊息做一個唯一主鍵(primary key)或者唯一約束,那麼就算出現重複消費的情況,就會導致主鍵衝突,那麼就不再處理這條訊息。
訊息重複
比如:網路原因閃斷,ACK返回失敗等等故障,確認資訊沒有傳送到訊息佇列,導致訊息佇列不知道自己已經消費過該訊息了,再次將該訊息分發給其他的消費者。
不同的訊息佇列傳送的確認資訊形式不同,例如RabbitMQ是傳送一個ACK確認訊息,RocketMQ是返回一個CONSUME_SUCCESS成功標誌,Kafka實際上有個offset的概念。
訊息可用性
當我們選擇好了叢集模式之後,那麼我們需要關心的就是怎麼去儲存和複製這個資料,RocketMQ對訊息的刷盤提供了同步和非同步的策略來滿足我們的,當我們選擇同步刷盤之後,如果刷盤超時會給返回FLUSH_DISK_TIMEOUT,如果是非同步刷盤不會返回刷盤相關資訊,選擇同步刷盤可以盡最大程度滿足我們的訊息不會丟失。
除了儲存有選擇之後,我們的主從同步提供了同步和非同步兩種模式來進行復制,當然選擇同步可以提升可用性,但是訊息的傳送RT時間會下降10%左右。
RocketMQ採用的是混合型的儲存結構,即為Broker單個例項下所有的佇列共用一個日誌資料檔案(即為CommitLog)來儲存。
而Kafka採用的是獨立型的儲存結構,每個佇列一個檔案。
這裡帥丙認為,RocketMQ採用混合型儲存結構的缺點在於,會存在較多的隨機讀操作,因此讀的效率偏低。同時消費訊息需要依賴ConsumeQueue,構建該邏輯消費佇列需要一定開銷。
刷盤機制
Broker 在訊息的存取時直接操作的是記憶體(記憶體對映檔案),這可以提供系統的吞吐量,但是無法避免機器掉電時資料丟失,所以需要持久化到磁碟中。
刷盤的最終實現都是使用NIO中的 MappedByteBuffer.force() 將對映區的資料寫入到磁碟,如果是同步刷盤的話,在Broker把訊息寫到CommitLog對映區後,就會等待寫入完成。
非同步而言,只是喚醒對應的執行緒,不保證執行的時機,流程如圖所示。
分散式事務
Half Message(半訊息)
是指暫不能被Consumer消費的訊息。Producer 已經把訊息成功傳送到了 Broker 端,但此訊息被標記為暫不能投遞
狀態,處於該種狀態下的訊息稱為半訊息。需要 Producer對訊息的二次確認
(訊息回查)後,Consumer才能去消費它。
訊息回查
由於網路閃段,生產者應用重啟等原因。導致 Producer 端一直沒有對 Half Message(半訊息) 進行 二次確認。這是Brocker伺服器會定時掃描長期處於半訊息的訊息
,會主動詢問 Producer端 該訊息的最終狀態(Commit或者Rollback),該訊息即為 訊息回查。
訊息過濾
- Broker端訊息過濾
在Broker中,按照Consumer的要求做過濾,優點是減少了對於Consumer無用訊息的網路傳輸。缺點是增加了Broker的負擔,實現相對複雜。 - Consumer端訊息過濾
這種過濾方式可由應用完全自定義實現,但是缺點是很多無用的訊息要傳輸到Consumer端。
Broker的Buffer問題
Broker的Buffer通常指的是Broker中一個佇列的記憶體Buffer大小,這類Buffer通常大小有限。
另外,RocketMQ沒有記憶體Buffer概念,RocketMQ的佇列都是持久化磁碟,資料定期清除。
RocketMQ同其他MQ有非常顯著的區別,RocketMQ的記憶體Buffer抽象成一個無限長度的佇列,不管有多少資料進來都能裝得下,這個無限是有前提的,Broker會定期刪除過期的資料。
例如Broker只儲存3天的訊息,那麼這個Buffer雖然長度無限,但是3天前的資料會被從隊尾刪除。
回溯訊息
回溯消費是指Consumer已經消費成功的訊息,由於業務上的需求需要重新消費,要支援此功能,Broker在向Consumer投遞成功訊息後,訊息仍然需要保留。並且重新消費一般是按照時間維度。
例如由於Consumer系統故障,恢復後需要重新消費1小時前的資料,那麼Broker要提供一種機制,可以按照時間維度來回退消費進度。
RocketMQ支援按照時間回溯消費,時間維度精確到毫秒,可以向前回溯,也可以向後回溯。
訊息堆積
訊息中介軟體的主要功能是非同步解耦,還有個重要功能是擋住前端的資料洪峰,保證後端系統的穩定性,這就要求訊息中介軟體具有一定的訊息堆積能力,訊息堆積分以下兩種情況:
- 訊息堆積在記憶體Buffer,一旦超過記憶體Buffer,可以根據一定的丟棄策略來丟棄訊息,如CORBA Notification規範中描述。適合能容忍丟棄訊息的業務,這種情況訊息的堆積能力主要在於記憶體Buffer大小,而且訊息堆積後,效能下降不會太大,因為記憶體中資料多少對於對外提供的訪問能力影響有限。
- 訊息堆積到持久化儲存系統中,例如DB,KV儲存,檔案記錄形式。當訊息不能在記憶體Cache命中時,要不可避免的訪問磁碟,會產生大量讀IO,讀IO的吞吐量直接決定了訊息堆積後的訪問能力。
- 評估訊息堆積能力主要有以下四點:
- 訊息能堆積多少條,多少位元組?即訊息的堆積容量。
- 訊息堆積後,發訊息的吞吐量大小,是否會受堆積影響?
- 訊息堆積後,正常消費的Consumer是否會受影響?
- 訊息堆積後,訪問堆積在磁碟的訊息時,吞吐量有多大?
定時訊息
定時訊息是指訊息發到Broker後,不能立刻被Consumer消費,要到特定的時間點或者等待特定的時間後才能被消費。
如果要支援任意的時間精度,在Broker層面,必須要做訊息排序,如果再涉及到持久化,那麼訊息排序要不可避免的產生巨大效能開銷。
RocketMQ支援定時訊息,但是不支援任意時間精度,支援特定的level,例如定時5s,10s,1m等。
流量控制(削峰填谷)
閘道器的請求先放入訊息佇列中,後端服務儘自己最大能力去訊息佇列中消費請求。超時的請求可以直接返回錯誤。
還有一些服務特別是某些後臺任務,不需要及時地響應,並且業務處理複雜且流程長,那麼過來的請求先放入訊息佇列中,後端服務按照自己的節奏處理
上面兩種情況分別對應著生產者生產過快和消費者消費過慢兩種情況,訊息佇列都能在其中發揮很好的緩衝效果。
佇列模型
訊息佇列有兩種模型:佇列模型和釋出/訂閱模型。
生產者往某個佇列裡面傳送訊息,一個佇列可以儲存多個生產者的訊息,一個佇列也可以有多個消費者, 但是消費者之間是競爭關係,即每條訊息只能被一個消費者消費。
當然佇列模型也可以通過訊息全量儲存至多個佇列來解決一條訊息被多個消費者消費問題,但是會有資料的冗餘。
釋出/訂閱模型
為了解決一條訊息能被多個消費者消費的問題,釋出/訂閱模型就來了。該模型是將訊息發往一個Topic
即主題中,所有訂閱了這個 Topic
的訂閱者都能消費這條訊息。
其實可以這麼理解,釋出/訂閱模型等於我們都加入了一個群聊中,我發一條訊息,加入了這個群聊的人都能收到這條訊息。那麼佇列模型就是一對一聊天,我發給你的訊息,只能在你的聊天視窗彈出,是不可能彈出到別人的聊天視窗中的。
通過多佇列全量儲存相同的訊息,即資料的冗餘可以實現一條訊息被多個消費者消費。RabbitMQ
就是採用佇列模型,通過 Exchange
模組來將訊息傳送至多個佇列,解決一條訊息需要被多個消費者消費問題。
這裡還能看到假設群聊裡除我之外只有一個人,那麼此時的釋出/訂閱模型和佇列模型其實就一樣了。
為了提高併發度,往往釋出/訂閱模型還會引入佇列或者分割槽的概念。即訊息是發往一個主題下的某個佇列或者某個分割槽中。RocketMQ
中叫佇列,Kafka
叫分割槽,本質一樣。
例如某個主題下有 5 個佇列,那麼這個主題的併發度就提高為 5 ,同時可以有 5 個消費者並行消費該主題的訊息。一般可以採用輪詢或者 key hash
取餘等策略來將同一個主題的訊息分配到不同的佇列中。
與之對應的消費者一般都有組的概念 Consumer Group
, 即消費者都是屬於某個消費組的。一條訊息會發往多個訂閱了這個主題的消費組。
假設現在有兩個消費組分別是Group 1
和 Group 2
,它們都訂閱了Topic-a
。此時有一條訊息發往Topic-a
,那麼這兩個消費組都能接收到這條訊息。
然後這條訊息實際是寫入Topic
某個佇列中,消費組中的某個消費者對應消費一個佇列的訊息。
在物理上除了副本拷貝之外,一條訊息在Broker
中只會有一份,每個消費組會有自己的offset
即消費點位來標識消費到的位置。在消費點位之前的訊息表明已經消費過了。當然這個offset
是佇列級別的。每個消費組都會維護訂閱的Topic
下的每個佇列的offset
。
推/拉模式
推模式
一般而言我們在談論推拉模式的時候指的是 Comsumer 和 Broker 之間的互動。
預設的認為 Producer 與 Broker 之間就是推的方式,即 Producer 將訊息推送給 Broker,而不是 Broker 主動去拉取訊息。
優點
訊息實時性高, Broker 接受完訊息之後可以立馬推送給 Consumer。
對於消費者使用來說更簡單,簡單啊就等著,反正有訊息來了就會推過來。
缺點
推送速率難以適應消費速率,推模式的目標就是以最快的速度推送訊息,當生產者往 Broker 傳送訊息的速率大於消費者消費訊息的速率時,隨著時間的增長消費者那邊可能就“爆倉”了,因為根本消費不過來啊。當推送速率過快就像 DDos 攻擊一樣消費者就傻了。
所以說推模式難以根據消費者的狀態控制推送速率,適用於訊息量不大、消費能力強要求實時性高的情況下。
拉模式
拉模式指的是 Consumer 主動從 Broker 請求拉取訊息,即 Broker 被動的傳送訊息給 Consumer。
優點
拉模式主動權就在消費者身上了,消費者可以根據自身的情況來發起拉取訊息的請求。假設當前消費者覺得自己消費不過來了,它可以根據一定的策略停止拉取,或者間隔拉取都行。
拉模式下 Broker 就相對輕鬆了,它只管存生產者發來的訊息,至於消費的時候自然由消費者主動發起,來一個請求就給它訊息唄,從哪開始拿訊息,拿多少消費者都告訴它,它就是一個沒有感情的工具人,消費者要是沒來取也不關它的事。
拉模式可以更合適的進行訊息的批量傳送,基於推模式可以來一個訊息就推送,也可以快取一些訊息之後再推送,但是推送的時候其實不知道消費者到底能不能一次性處理這麼多訊息。而拉模式就更加合理,它可以參考消費者請求的資訊來決定快取多少訊息之後批量傳送。
缺點
訊息延遲,畢竟是消費者去拉取訊息,但是消費者怎麼知道訊息到了呢?所以它只能不斷地拉取,但是又不能很頻繁地請求,太頻繁了就變成消費者在攻擊 Broker 了。因此需要降低請求的頻率,比如隔個 2 秒請求一次,你看著訊息就很有可能延遲 2 秒了。
訊息忙請求,忙請求就是比如訊息隔了幾個小時才有,那麼在幾個小時之內消費者的請求都是無效的,在做無用功。
選擇
RocketMQ 和 Kafka 都選擇了拉模式,當然業界也有基於推模式的訊息佇列如 ActiveMQ。
因為現在的訊息佇列都有持久化訊息的需求,也就是說本身它就有個儲存功能,它的使命就是接受訊息,儲存好訊息使得消費者可以消費訊息即可。
而消費者各種各樣,身為 Broker 不應該有依賴於消費者的傾向,我已經為你儲存好訊息了,你要就來拿好了。
RocketMQ 和 Kafka 都是利用“長輪詢”來實現拉模式,減輕了拉模式的缺點
RocketMQ長輪詢
RocketMQ 中的 PushConsumer 其實是披著拉模式的方法,只是看起來像推模式而已。
因為 RocketMQ 在被背後偷偷的幫我們去 Broker 請求資料了。
後臺會有個 RebalanceService 執行緒,這個執行緒會根據 topic 的佇列數量和當前消費組的消費者個數做負載均衡,每個佇列產生的 pullRequest 放入阻塞佇列 pullRequestQueue 中。然後又有個 PullMessageService 執行緒不斷的從阻塞佇列 pullRequestQueue 中獲取 pullRequest,然後通過網路請求 broker,這樣實現的準實時拉取訊息。
然後 Broker 的 PullMessageProcessor 裡面的 processRequest 方法是用來處理拉訊息請求的,有訊息就直接返回
而 PullRequestHoldService 這個執行緒會每 5 秒從 pullRequestTable 取PullRequest請求,然後看看待拉取訊息請求的偏移量是否小於當前消費佇列最大偏移量,如果條件成立則說明有新訊息了,則會呼叫 notifyMessageArriving ,最終呼叫 PullMessageProcessor 的 executeRequestWhenWakeup() 方法重新嘗試處理這個訊息的請求,也就是再來一次,整個長輪詢的時間預設 30 秒。
ReputMessageService 執行緒用來不斷地從 commitLog 中解析資料並分發請求,構建出 ConsumeQueue 和 IndexFile 兩種型別的資料,並且也會有喚醒請求的操作,來彌補每 5s 一次這麼慢的延遲
Kafka 中的長輪詢
像 Kafka 在拉請求中有引數,可以使得消費者請求在 “長輪詢” 中阻塞等待。
簡單的說就是消費者去 Broker 拉訊息,定義了一個超時時間,也就是說消費者去請求訊息,如果有的話馬上返回訊息,如果沒有的話消費者等著直到超時,然後再次發起拉訊息請求。
並且 Broker 也得配合,如果消費者請求過來,有訊息肯定馬上返回,沒有訊息那就建立一個延遲操作,等條件滿足了再返回。
消費者端呼叫的就是 Kafka 包裝過的 selector,而最終會呼叫 Java nio 的 select(timeout)。
Broker 端處理所有請求的入口在 KafkaApis.scala 檔案的 handle 方法下, 主要方法:handleFetchRequest
可以看到 RocketMQ 和 Kafka 都是採用“長輪詢”的機制,具體的做法都是通過消費者等待訊息,當有訊息的時候 Broker 會直接返回訊息,如果沒有訊息都會採取延遲處理的策略,並且為了保證訊息的及時性,在對應佇列或者分割槽有新訊息到來的時候都會提醒訊息來了,及時返回訊息。
一句話說就是消費者和 Broker 相互配合,拉取訊息請求不滿足條件的時候 hold 住,避免了多次頻繁的拉取動作,當訊息一到就提醒返回。
面試
1、如何保證訊息不丟失
就我們市面上常見的訊息佇列而言,只要配置得當,我們的訊息就不會丟。
可以看到一共有三個階段,分別是生產訊息、儲存訊息和消費訊息。我們從這三個階段分別入手來看看如何確保訊息不會丟失。
生產訊息
生產者傳送訊息至Broker
,需要處理Broker
的響應,不論是同步還是非同步傳送訊息,同步和非同步回撥都需要做好try-catch
,妥善的處理響應,如果Broker
返回寫入失敗等錯誤訊息,需要重試傳送。當多次傳送失敗需要作報警,日誌記錄等。這樣就能保證在生產訊息階段訊息不會丟失。
儲存訊息
儲存訊息階段需要在訊息刷盤之後再給生產者響應,假設訊息寫入快取中就返回響應,那麼機器突然斷電這訊息就沒了,而生產者以為已經發送成功了。
如果Broker
是叢集部署,有多副本機制,即訊息不僅僅要寫入當前Broker
,還需要寫入副本機中。那配置成至少寫入兩臺機子後再給生產者響應。這樣基本上就能保證儲存的可靠了。
消費訊息
當消費者拿到訊息之後直接存入記憶體佇列中就直接返回給Broker
消費成功,這是不對的。
需要考慮拿到訊息放在記憶體之後消費者就宕機了怎麼辦。所以我們應該在消費者真正執行完業務邏輯之後,再發送給Broker
消費成功,這才是真正的消費了。
所以只要我們在訊息業務邏輯處理完成之後再給Broker
響應,那麼消費階段訊息就不會丟失。
小結
生產者
需要處理好Broker
的響應,出錯情況下利用重試、報警等手段。
Broker
需要控制響應的時機,單機情況下是訊息刷盤後返回響應,叢集多副本情況下,即傳送至兩個副本及以上的情況下再返回響應。
消費者
需要在執行完真正的業務邏輯之後再返回響應給Broker
。
但是要注意訊息可靠性增強了,效能就下降了,等待訊息刷盤、多副本同步後返回都會影響效能。因此還是看業務,例如日誌的傳輸可能丟那麼一兩條關係不大,因此沒必要等訊息刷盤再響應。
2、重複訊息
假設我們傳送訊息,就管發,不管Broker
的響應,那麼我們發往Broker
是不會重複的。
但是一般情況我們是不允許這樣的,這樣訊息就完全不可靠了,我們的基本需求是訊息至少得發到Broker
上,那就得等Broker
的響應,那麼就可能存在Broker
已經寫入了,當時響應由於網路原因生產者沒有收到,然後生產者又重發了一次,此時訊息就重複了。
再看消費者消費的時候,假設我們消費者拿到訊息消費了,業務邏輯已經走完了,事務提交了,此時需要更新Consumer offset
了,然後這個消費者掛了,另一個消費者頂上,此時Consumer offset
還沒更新,於是又拿到剛才那條訊息,業務又被執行了一遍。於是訊息又重複了。
可以看到正常業務而言訊息重複是不可避免的,因此我們只能從另一個角度來解決重複訊息的問題。
關鍵點就是冪等。既然我們不能防止重複訊息的產生,那麼我們只能在業務上處理重複訊息所帶來的影響。
冪等處理重複訊息
例如這條 SQLupdate t1 set money = 150 where id = 1 and money = 100;
執行多少遍money
都是150,這就叫冪等。
因此需要改造業務處理邏輯,使得在重複訊息的情況下也不會影響最終的結果。
可以通過上面我那條 SQL 一樣,做了個前置條件判斷,即money = 100
情況,並且直接修改,更通用的是做個version
即版本號控制,對比訊息中的版本號和資料庫中的版本號。
或者通過資料庫的約束例如唯一鍵,例如insert into update on duplicate key...
。
或者記錄關鍵的key,比如處理訂單這種,記錄訂單ID,假如有重複的訊息過來,先判斷下這個ID是否已經被處理過了,如果沒處理再進行下一步。當然也可以用全域性唯一ID等等。
訊息有序性
全域性有序
如果要保證訊息的全域性有序,首先只能由一個生產者往Topic
傳送訊息,並且一個Topic
內部只能有一個佇列(分割槽)。消費者也必須是單執行緒消費這個佇列。這樣的訊息就是全域性有序的!
不過一般情況下我們都不需要全域性有序,即使是同步MySQL Binlog
也只需要保證單表訊息有序即可。
部分有序
因此絕大部分的有序需求是部分有序,部分有序我們就可以將Topic
內部劃分成我們需要的佇列數,把訊息通過特定的策略發往固定的佇列中,然後每個佇列對應一個單執行緒處理的消費者。這樣即完成了部分有序的需求,又可以通過佇列數量的併發來提高訊息處理效率。
訊息堆積處理
訊息的堆積往往是因為生產者的生產速度與消費者的消費速度不匹配。有可能是因為訊息消費失敗反覆重試造成的,也有可能就是消費者消費能力弱,漸漸地訊息就積壓了。
因此我們需要先定位消費慢的原因,如果是bug
則處理 bug
,如果是因為本身消費能力較弱,我們可以優化下消費邏輯,比如之前是一條一條訊息消費處理的,這次我們批量處理,比如資料庫的插入,一條一條插和批量插效率是不一樣的。
假如邏輯我們已經都優化了,但還是慢,那就得考慮水平擴容了,增加Topic
的佇列數和消費者數量,注意佇列數一定要增加,不然新增加的消費者是沒東西消費的。一個Topic中,一個佇列只會分配給一個消費者。
當然你消費者內部是單執行緒還是多執行緒消費那看具體場景。不過要注意上面提高的訊息丟失的問題,如果你是將接受到的訊息寫入記憶體佇列之後,然後就返回響應給Broker
,然後多執行緒向記憶體佇列消費訊息,假設此時消費者宕機了,記憶體佇列裡面還未消費的訊息也就丟了。