1. 程式人生 > 程式設計 >?冒著期末掛科的風險也要給你看的訊息佇列和RocketMQ入門總結

?冒著期末掛科的風險也要給你看的訊息佇列和RocketMQ入門總結

吐槽

文章很長,點贊再看,養成好習慣???

這真的是考前的最後一篇部落格,再寫真的要掛科了。

其實一直想吐槽現在很多大學的計算機教育的,我從大二開始入門 Java後端,其實我覺得我的學習效率很低,中間也走了很多歪路。但是,如今和同齡人比較,我發現大部分同學竟然連基本的程式碼都不會敲,尤其到了課程設計大家都是水水過的,讓他們做一個管理系統簡直要他們命。包括現在大三了,很多人希望本科就出來就業的開始報培訓班,培機構開始大把大把地撈金,學生在那瘋狂吐槽三四年裡沒有從學校學到任何東西,這可能是大部分大學計算機教育的通病吧。

當然還有很多師資力量的問題,不知道清北的,但在我認識的一些 211 或者某些 985 的同學,他們學校的老師或者所教的一些知識也已經落伍了好多年了(當然不是指一些作業系統類似的基礎學科,我覺得它們是幾十年都不變的)。如果一個資料庫老師會告訴你 MySQL

不支援事務,一個資料結構老師說 B+樹 是二叉樹,甚至在學生提到一些新的東西的時候沒有了解過並且不想去了解,你們會怎麼想。

但很慶幸,在大二的時候加入了我們學校最牛的技術組織 TECH F5VE ,這是一個學生組織的創業型技術團隊,成立五年目前成員四十人不到,但裡面不乏各種大廠的大牛,我在裡面獲得了很多很多。

當然,參加過一些小公司的實習或者和他們合作過做一些外包專案,但發現他們的技術選型也已經 out 了,我們團隊的成員戲稱 "考古式開發"。我想這也是大部分人擠破了頭皮想要進大廠的理由吧。

可能觀點有點偏激,希望大家理解???。

訊息佇列掃盲

訊息佇列顧名思義就是存放訊息的佇列,佇列我就不解釋了,別告訴我你連佇列都不知道似啥吧?

所以問題並不是訊息佇列是什麼,而是 訊息佇列為什麼會出現?訊息佇列能用來幹什麼?用它來幹這些事會帶來什麼好處?訊息佇列會帶來副作用嗎?

訊息佇列為什麼會出現?

訊息佇列算是作為後端程式設計師的一個必備技能吧,因為分散式應用必定涉及到各個系統之間的通訊問題,這個時候訊息佇列也應運而生了。可以說分散式的產生是訊息佇列的基礎,而分散式怕是一個很古老的概念了吧,所以訊息佇列也是一個很古老的中介軟體了。

訊息佇列能用來幹什麼?

非同步

你可能會反駁我,應用之間的通訊又不是隻能由訊息佇列解決,好好的通訊為什麼中間非要插一個訊息佇列呢?我不能直接進行通訊嗎?

很好?,你又提出了一個概念,同步通訊。就比如現在業界使用比較多的 Dubbo

就是一個適用於各個系統之間同步通訊的 RPC 框架。

我來舉個?吧,比如我們有一個購票系統,需求是使用者在購買完之後能接收到購買完成的簡訊。

我們省略中間的網路通訊時間消耗,假如購票系統處理需要 150ms ,簡訊系統處理需要 200ms ,那麼整個處理流程的時間消耗就是 150ms + 200ms = 350ms。

當然,乍看沒什麼問題。可是仔細一想你就感覺有點問題,我使用者購票在購票系統的時候其實就已經完成了購買,而我現在通過同步呼叫非要讓整個請求拉長時間,而短息系統這玩意又不是很有必要,它僅僅是一個輔助功能增強使用者體驗感而已。我現在整個呼叫流程就有點 頭重腳輕 的感覺了,購票是一個不太耗時的流程,而我現在因為同步呼叫,非要等待傳送簡訊這個比較耗時的操作才返回結果。那我如果再加一個傳送郵件呢?

這樣整個系統的呼叫鏈又變長了,整個時間就變成了550ms。

當我們在學生時代需要在食堂排隊的時候,我們和食堂大媽就是一個同步的模型。

我們需要告訴食堂大媽:“姐姐,給我加個雞腿,再加個酸辣土豆絲,幫我澆點汁上去,多打點飯哦???” 咦~~~ 為了多吃點,真噁心。

然後大媽幫我們打飯配菜,我們看著大媽那顫抖的手和掉落的土豆絲不禁嚥了咽口水。

最終我們從大媽手中接過飯菜然後去尋找座位了…

回想一下,我們在給大媽傳送需要的資訊之後我們是 同步等待大媽給我配好飯菜 的,上面我們只是加了雞腿和土豆絲,萬一我再加一個番茄牛腩,韭菜雞蛋,這樣是不是大媽打飯配菜的流程就會變長,我們等待的時間也會相應的變長。

那後來,我們工作賺錢了有錢去飯店吃飯了,我們告訴服務員來一碗牛肉麵加個荷包蛋 (傳達一個訊息) ,然後我們就可以在飯桌上安心的玩手機了 (幹自己其他事情) ,等到我們的牛肉麵上了我們就可以吃了。這其中我們也就傳達了一個訊息,然後我們又轉過頭幹其他事情了。這其中雖然做面的時間沒有變短,但是我們只需要傳達一個訊息就可以看其他事情了,這是一個 非同步 的概念。

所以,為瞭解決這一個問題,聰明的程式設計師在中間也加了個類似於服務員的中介軟體——訊息佇列。這個時候我們就可以把模型給改造了。

這樣,我們在將訊息存入訊息佇列之後我們就可以直接返回了(我們告訴服務員我們要吃什麼然後玩手機),所以整個耗時只是 150ms + 10ms = 160ms。

但是你需要注意的是,整個流程的時長是沒變的,就像你僅僅告訴服務員要吃什麼是不會影響到做面的速度的。

解耦

回到最初同步呼叫的過程,我們寫個虛擬碼簡單概括一下。

那麼第二步,我們又添加了一個傳送郵件,我們就得重新去修改程式碼,如果我們又加一個需求:使用者購買完還需要給他加積分,這個時候我們是不是又得改程式碼?

如果你覺得還行,那麼我這個時候不要發郵件這個服務了呢,我是不是又得改程式碼,又得重啟應用?

這樣改來改去是不是很麻煩,那麼 此時我們就用一個訊息佇列在中間進行解耦 。你需要注意的是,我們後面的傳送簡訊、傳送郵件、新增積分等一些操作都依賴於上面的 result ,這東西抽象出來就是購票的處理結果呀,比如訂單號,使用者賬號等等,也就是說我們後面的一系列服務都是需要同樣的訊息來進行處理。既然這樣,我們是不是可以通過 “廣播訊息” 來實現。

我上面所講的“廣播”並不是真正的廣播,而是接下來的系統作為消費者去 訂閱 特定的主題。比如我們這裡的主題就可以叫做 訂票 ,我們購買系統作為一個生產者去生產這條訊息放入訊息佇列,然後消費者訂閱了這個主題,會從訊息佇列中拉取訊息並消費。就比如我們剛剛畫的那張圖,你會發現,在生產者這邊我們只需要關注 生產訊息到指定主題中 ,而 消費者只需要關注從指定主題中拉取訊息 就行了。

如果沒有訊息佇列,每當一個新的業務接入,我們都要在主系統呼叫新介面、或者當我們取消某些業務,我們也得在主系統刪除某些介面呼叫。有了訊息佇列,我們只需要關心訊息是否送達了佇列,至於誰希望訂閱,接下來收到訊息如何處理,是下游的事情,無疑極大地減少了開發和聯調的工作量。

削峰

我們再次回到一開始我們使用同步呼叫系統的情況,並且思考一下,如果此時有大量使用者請求購票整個系統會變成什麼樣?

如果,此時有一萬的請求進入購票系統,我們知道執行我們主業務的伺服器配置一般會比較好,所以這裡我們假設購票系統能承受這一萬的使用者請求,那麼也就意味著我們同時也會出現一萬呼叫發簡訊服務的請求。而對於簡訊系統來說並不是我們的主要業務,所以我們配備的硬體資源並不會太高,那麼你覺得現在這個簡訊系統能承受這一萬的峰值麼,且不說能不能承受,系統會不會 直接崩潰 了?

簡訊業務又不是我們的主業務,我們能不能 折中處理 呢?如果我們把購買完成的資訊傳送到訊息佇列中,而簡訊系統 儘自己所能地去訊息佇列中取訊息和消費訊息 ,即使處理速度慢一點也無所謂,只要我們的系統沒有崩潰就行了。

留得江山在,還怕沒柴燒?你敢說每次傳送驗證碼的時候是一發你就收到了的麼?

訊息佇列能帶來什麼好處?

其實上面我已經說了。非同步、解耦、削峰。 哪怕你上面的都沒看懂也千萬要記住這六個字,因為他不僅是訊息佇列的精華,更是程式設計和架構的精華。

訊息佇列會帶來副作用嗎?

沒有哪一門技術是“銀彈”,訊息佇列也有它的副作用。

比如,本來好好的兩個系統之間的呼叫,我中間加了個訊息佇列,如果訊息佇列掛了怎麼辦呢?是不是 降低了系統的可用性

那這樣是不是要保證HA(高可用)?是不是要搞叢集?那麼我 整個系統的複雜度是不是上升了

拋開上面的問題不講,萬一我傳送方傳送失敗了,然後執行重試,這樣就可能產生重複的訊息。

或者我消費端處理失敗了,請求重發,這樣也會產生重複的訊息。

對於一些微服務來說,消費重複訊息會帶來更大的麻煩,比如增加積分,這個時候我加了多次是不是對其他使用者不公平?

那麼,又 如何解決重複消費訊息的問題 呢?

如果我們此時的訊息需要保證嚴格的順序性怎麼辦呢?比如生產者生產了一系列的有序訊息(對一個id為1的記錄進行刪除增加修改),但是我們知道在釋出訂閱模型中,對於主題是無順序的,那麼這個時候就會導致對於消費者消費訊息的時候沒有按照生產者的傳送順序消費,比如這個時候我們消費的順序為修改刪除增加,如果該記錄涉及到金額的話是不是會出大事情?

那麼,又 如何解決訊息的順序消費問題 呢?

就拿我們上面所講的分散式系統來說,使用者購票完成之後是不是需要增加賬戶積分?在同一個系統中我們一般會使用事務來進行解決,如果用 Spring 的話我們在上面虛擬碼中加入 @Transactional 註解就好了。但是在不同系統中如何保證事務呢?總不能這個系統我扣錢成功了你那積分系統積分沒加吧?或者說我這扣錢明明失敗了,你那積分系統給我加了積分。

那麼,又如何 解決分散式事務問題 呢?

我們剛剛說了,訊息佇列可以進行削峰操作,那如果我的消費者如果消費很慢或者生產者生產訊息很快,這樣是不是會將訊息堆積在訊息佇列中?

那麼,又如何 解決訊息堆積的問題 呢?

可用性降低,複雜度上升,又帶來一系列的重複消費,順序消費,分散式事務,訊息堆積的問題,這訊息佇列還怎麼用啊??

別急,辦法總是有的。

RocketMQ是什麼?

哇,你個混蛋!上面給我丟擲那麼多問題,你現在又講 RocketMQ ,還讓不讓人活了?!?

別急別急,話說你現在清楚 MQ 的構造嗎,我還沒講呢,我們先搞明白 MQ 的內部構造,再來看看如何解決上面的一系列問題吧,不過你最好帶著問題去閱讀和了解喔。

RocketMQ 是一個 佇列模型 的訊息中介軟體,具有高效能、高可靠、高實時、分散式 的特點。它是一個採用 Java 語言開發的分散式的訊息系統,由阿里巴巴團隊開發,在2016年底貢獻給 Apache,成為了 Apache 的一個頂級專案。 在阿里內部,RocketMQ 很好地服務了集團大大小小上千個應用,在每年的雙十一當天,更有不可思議的萬億級訊息通過 RocketMQ 流轉。

廢話不多說,想要了解 RocketMQ 歷史的同學可以自己去搜尋資料。聽完上面的介紹,你只要知道 RocketMQ 很快、很牛、而且經歷過雙十一的實踐就行了!

佇列模型和主題模型

在談 RocketMQ 的技術架構之前,我們先來瞭解一下兩個名詞概念——佇列模型主題模型

首先我問一個問題,訊息佇列為什麼要叫訊息佇列?

你可能覺得很弱智,這玩意不就是存放訊息的佇列嘛?不叫訊息佇列叫什麼?

的確,早期的訊息中介軟體是通過 佇列 這一模型來實現的,可能是歷史原因,我們都習慣把訊息中介軟體成為訊息佇列。

但是,如今例如 RocketMQKafka 這些優秀的訊息中介軟體不僅僅是通過一個 佇列 來實現訊息儲存的。

佇列模型

就像我們理解佇列一樣,訊息中介軟體的佇列模型就真的只是一個佇列。。。我畫一張圖給大家理解。

在一開始我跟你提到了一個 “廣播” 的概念,也就是說如果我們此時我們需要將一個訊息傳送給多個消費者(比如此時我需要將資訊傳送給簡訊系統和郵件系統),這個時候單個佇列即不能滿足需求了。

當然你可以讓 Producer 生產訊息放入多個佇列中,然後每個佇列去對應每一個消費者。問題是可以解決,建立多個佇列並且複製多份訊息是會很影響資源和效能的。而且,這樣子就會導致生產者需要知道具體消費者個數然後去複製對應數量的訊息佇列,這就違揹我們訊息中介軟體的 解耦 這一原則。

主題模型

那麼有沒有好的方法去解決這一個問題呢?有,那就是 主題模型 或者可以稱為 釋出訂閱模型

感興趣的同學可以去了解一下設計模式裡面的觀察者模式並且手動實現一下,我相信你會有所收穫的。

在主題模型中,訊息的生產者稱為 釋出者(Publisher) ,訊息的消費者稱為 訂閱者(Subscriber) ,存放訊息的容器稱為 主題(Topic)

其中,釋出者將訊息傳送到指定主題中,訂閱者需要 提前訂閱主題 才能接受特定主題的訊息。

RocketMQ中的訊息模型

RockerMQ 中的訊息模型就是按照 主題模型 所實現的。你可能會好奇這個 主題 到底是怎麼實現的呢?你上面也沒有講到呀!

其實對於主題模型的實現來說每個訊息中介軟體的底層設計都是不一樣的,就比如 Kafka 中的 分割槽RocketMQ 中的 佇列RabbitMQ 中的 Exchange 。我們可以理解為 主題模型/釋出訂閱模型 就是一個標準,那些中介軟體只不過照著這個標準去實現而已。

所以,主題模型 到底是如何實現的呢?首先我畫一張圖,大家嘗試著去理解一下。

我們可以看到在整個圖中有 Producer GroupTopicConsumer Group 三個角色,我來分別介紹一下他們。

  • Producer Group 生產者組: 代表某一類的生產者,比如我們有多個秒殺系統作為生產者,這多個合在一起就是一個 Producer Group 生產者組,它們一般生產相同的訊息。
  • Consumer Group 消費者組: 代表某一類的消費者,比如我們有多個簡訊系統作為消費者,這多個合在一起就是一個 Consumer Group 消費者組,它們一般消費相同的訊息。
  • Topic 主題: 代表一類訊息,比如訂單訊息,物流訊息等等。

你可以看到圖中生產者組中的生產者會向主題傳送訊息,而 主題中存在多個佇列,生產者每次生產訊息之後是指定主題中的某個佇列傳送訊息的。

每個主題中都有多個佇列(這裡還不涉及到 Broker),叢集消費模式下,一個消費者叢集多臺機器共同消費一個 topic 的多個佇列,一個佇列只會被一個消費者消費。如果某個消費者掛掉,分組內其它消費者會接替掛掉的消費者繼續消費。就像上圖中 Consumer1Consumer2 分別對應著兩個佇列,而 Consuer3 是沒有佇列對應的,所以一般來講要控制 消費者組中的消費者個數和主題中佇列個數相同

當然也可以消費者個數小於佇列個數,只不過不太建議。如下圖。

每個消費組在每個佇列上維護一個消費位置 ,為什麼呢?

因為我們剛剛畫的僅僅是一個消費者組,我們知道在釋出訂閱模式中一般會涉及到多個消費者組,而每個消費者組在每個佇列中的消費位置都是不同的。如果此時有多個消費者組,那麼訊息被一個消費者組消費完之後是不會刪除的(因為其它消費者組也需要呀),它僅僅是為每個消費者組維護一個 消費位移(offset) ,每次消費者組消費完會返回一個成功的響應,然後佇列再把維護的消費位移加一,這樣就不會出現剛剛消費過的訊息再一次被消費了。

可能你還有一個問題,為什麼一個主題中需要維護多個佇列

答案是 提高併發能力 。的確,每個主題中只存在一個佇列也是可行的。你想一下,如果每個主題中只存在一個佇列,這個佇列中也維護著每個消費者組的消費位置,這樣也可以做到 釋出訂閱模式 。如下圖。

但是,這樣我生產者是不是隻能向一個佇列傳送訊息?又因為需要維護消費位置所以一個佇列只能對應一個消費者組中的消費者,這樣是不是其他的 Consumer 就沒有用武之地了?從這兩個角度來講,併發度一下子就小了很多。

所以總結來說,RocketMQ 通過使用在一個 Topic 中配置多個佇列並且每個佇列維護每個消費者組的消費位置 實現了 主題模式/釋出訂閱模式

RocketMQ的架構圖

講完了訊息模型,我們理解起 RocketMQ 的技術架構起來就容易多了。

RocketMQ 技術架構中有四大角色 NameServerBrokerProducerConsumer 。我來向大家分別解釋一下這四個角色是幹啥的。

  • Broker: 主要負責訊息的儲存、投遞和查詢以及服務高可用保證。說白了就是訊息佇列伺服器嘛,生產者生產訊息到 Broker ,消費者從 Broker 拉取訊息並消費。

    這裡,我還得普及一下關於 Topic 和 佇列的關係。上面我講解了 Topic 和佇列的關係——一個 Topic 中存在多個佇列,那麼這個 Topic 和佇列存放在哪呢?

    一個 Topic 分佈在多個 Broker上,一個 Broker 可以配置多個 Topic ,它們是多對多的關係

    如果某個 Topic 訊息量很大,應該給它多配置幾個佇列(上文中提到了提高併發能力),並且 儘量多分佈在不同 Broker 上,以減輕某個 Broker 的壓力

    Topic 訊息量都比較均勻的情況下,如果某個 broker 上的佇列越多,則該 broker 壓力越大。

所以說我們需要配置多個Broker。

  • NameServer: 不知道你們有沒有接觸過 ZooKeeperSpring Cloud 中的 Eureka ,它其實也是一個 註冊中心 ,主要提供兩個功能:Broker管理路由資訊管理 。說白了就是 Broker 會將自己的資訊註冊到 NameServer 中,此時 NameServer 就存放了很多 Broker 的資訊(Broker的路由表),消費者和生產者就從 NameServer 中獲取路由表然後照著路由表的資訊和對應的 Broker 進行通訊(生產者和消費者定期會向 NameServer 去查詢相關的 Broker 的資訊)。

  • Producer: 訊息釋出的角色,支援分散式叢集方式部署。說白了就是生產者。

  • Consumer: 訊息消費的角色,支援分散式叢集方式部署。支援以push推,pull拉兩種模式對訊息進行消費。同時也支援叢集方式和廣播方式的消費,它提供實時訊息訂閱機制。說白了就是消費者。

聽完了上面的解釋你可能會覺得,這玩意好簡單。不就是這樣的麼?

嗯?你可能會發現一個問題,這老傢伙 NameServer 幹啥用的,這不多餘嗎?直接 ConsumerBroker 直接進行生產訊息,消費訊息不就好了麼?

但是,我們上文提到過 Broker 是需要保證高可用的,如果整個系統僅僅靠著一個 Broker 來維持的話,那麼這個 Broker 的壓力會不會很大?所以我們需要使用多個 Broker 來保證 負載均衡

如果說,我們的消費者和生產者直接和多個 Broker 相連,那麼當 Broker 修改的時候必定會牽連著每個生產者和消費者,這樣就會產生耦合問題,而 NameServer 註冊中心就是用來解決這個問題的。

如果還不是很理解的話,可以去看我介紹 Spring Cloud 的那篇文章,其中介紹了 Eureka 註冊中心。

當然,RocketMQ 中的技術架構肯定不止前面那麼簡單,因為上面圖中的四個角色都是需要做叢集的。我給出一張官網的架構圖,大家嘗試理解一下。

其實和我們最開始畫的那張乞丐版的架構圖也沒什麼區別,主要是一些細節上的差別。聽我細細道來?。

第一、我們的 Broker 做了叢集並且還進行了主從部署 ,由於訊息分佈在各個 Broker 上,一旦某個 Broker 宕機,則該Broker 上的訊息讀寫都會受到影響。所以 Rocketmq 提供了 master/slave 的結構,salve 定時從 master 同步資料(同步刷盤或者非同步刷盤),如果 master 宕機,slave 提供消費服務,但是不能寫入訊息 (後面我還會提到哦)。

第二、為了保證 HA ,我們的 NameServer 也做了叢集部署,但是請注意它是 去中心化 的。也就意味著它沒有主節點,你可以很明顯地看出 NameServer 的所有節點是沒有進行 Info Replicate 的,在 RocketMQ 中是通過 單個Broker和所有NameServer保持長連線 ,並且在每隔30秒 Broker 會向所有 Nameserver 傳送心跳,心跳包含了自身的 Topic 配置資訊,這個步驟就對應這上面的 Routing Info

第三、在生產者需要向 Broker 傳送訊息的時候,需要先從 NameServer 獲取關於 Broker 的路由資訊,然後通過 輪詢 的方法去向每個佇列中生產資料以達到 負載均衡 的效果。

第四、消費者通過 NameServer 獲取所有 Broker 的路由資訊後,向 Broker 傳送 Pull 請求來獲取訊息資料。Consumer 可以以兩種模式啟動—— 廣播(Broadcast)和叢集(Cluster)。廣播模式下,一條訊息會傳送給 同一個消費組中的所有消費者 ,叢集模式下訊息只會傳送給一個消費者。

如何解決 順序消費、重複消費

其實,這些東西都是我在介紹訊息佇列帶來的一些副作用的時候提到的,也就是說,這些問題不僅僅掛鉤於 RocketMQ ,而是應該每個訊息中介軟體都需要去解決的。

在上面我介紹 RocketMQ 的技術架構的時候我已經向你展示了 它是如何保證高可用的 ,這裡不涉及運維方面的搭建,如果你感興趣可以自己去官網上照著例子搭建屬於你自己的 RocketMQ 叢集。

其實 Kafka 的架構基本和 RocketMQ 類似,只是它註冊中心使用了 Zookeeper 、它的 分割槽 就相當於 佇列 。還有一些小細節不同會在後面提到。

順序消費

在上面的技術架構介紹中,我們已經知道了 RocketMQ 在主題上是無序的、它只有在佇列層面才是保證有序 的。

這又扯到兩個概念——普通順序嚴格順序

所謂普通順序是指 消費者通過 同一個消費佇列收到的訊息是有順序的 ,不同訊息佇列收到的訊息則可能是無順序的。普通順序訊息在 重啟情況下不會保證訊息順序性 (短暫時間) 。

所謂嚴格順序是指 消費者收到的 所有訊息 均是有順序的。嚴格順序訊息 即使在異常情況下也會保證訊息的順序性

但是,嚴格順序看起來雖好,實現它可會付出巨大的代價。如果你使用嚴格順序模式,Broker 叢集中只要有一臺機器不可用,則整個叢集都不可用。你還用啥?現在主要場景也就在 binlog 同步。

一般而言,我們的 MQ 都是能容忍短暫的亂序,所以推薦使用普通順序模式。

那麼,我們現在使用了 普通順序模式 ,我們從上面學習知道了在 Producer 生產訊息的時候會進行輪詢(取決你的負載均衡策略)來向同一主題的不同訊息佇列傳送訊息。那麼如果此時我有幾個訊息分別是同一個訂單的建立、支付、發貨,在輪詢的策略下這 三個訊息會被髮送到不同佇列 ,因為在不同的佇列此時就無法使用 RocketMQ 帶來的佇列有序特性來保證訊息有序性了。

那麼,怎麼解決呢?

其實很簡單,我們需要處理的僅僅是將同一語義下的訊息放入同一個佇列(比如這裡是同一個訂單),那我們就可以使用 Hash取模法 來保證同一個訂單在同一個佇列中就行了。

重複消費

emmm,就兩個字—— 冪等 。在程式設計中一個冪等 操作的特點是其任意多次執行所產生的影響均與一次執行的影響相同。比如說,這個時候我們有一個訂單的處理積分的系統,每當來一個訊息的時候它就負責為建立這個訂單的使用者的積分加上相應的數值。可是有一次,訊息佇列傳送給訂單系統 FrancisQ 的訂單資訊,其要求是給 FrancisQ 的積分加上 500。但是積分系統在收到 FrancisQ 的訂單資訊處理完成之後返回給訊息佇列處理成功的資訊的時候出現了網路波動(當然還有很多種情況,比如Broker意外重啟等等),這條迴應沒有傳送成功。

那麼,訊息佇列沒收到積分系統的迴應會不會嘗試重發這個訊息?問題就來了,我再發這個訊息,萬一它又給 FrancisQ 的賬戶加上 500 積分怎麼辦呢?

所以我們需要給我們的消費者實現 冪等 ,也就是對同一個訊息的處理結果,執行多少次都不變。

那麼如何給業務實現冪等呢?這個還是需要結合具體的業務的。你可以使用 寫入 Redis 來保證,因為 Rediskeyvalue 就是天然支援冪等的。當然還有使用 資料庫插入法 ,基於資料庫的唯一鍵來保證重複資料不會被插入多條。

不過最主要的還是需要 根據特定場景使用特定的解決方案 ,你要知道你的訊息消費是否是完全不可重複消費還是可以忍受重複消費的,然後再選擇強校驗和弱校驗的方式。畢竟在 CS 領域還是很少有技術銀彈的說法。

而在整個網際網路領域,冪等不僅僅適用於訊息佇列的重複消費問題,這些實現冪等的方法,也同樣適用於,在其他場景中來解決重複請求或者重複呼叫的問題 。比如將HTTP服務設計成冪等的,解決前端或者APP重複提交表單資料的問題 ,也可以將一個微服務設計成冪等的,解決 RPC 框架自動重試導致的 重複呼叫問題

分散式事務

如何解釋分散式事務呢?事務大家都知道吧?要麼都執行要麼都不執行 。在同一個系統中我們可以輕鬆地實現事務,但是在分散式架構中,我們有很多服務是部署在不同系統之間的,而不同服務之間又需要進行呼叫。比如此時我下訂單然後增加積分,如果保證不了分散式事務的話,就會出現A系統下了訂單,但是B系統增加積分失敗或者A系統沒有下訂單,B系統卻增加了積分。前者對使用者不友好,後者對運營商不利,這是我們都不願意見到的。

那麼,如何去解決這個問題呢?

如今比較常見的分散式事務實現有 2PC、TCC 和事務訊息(half 半訊息機制)。每一種實現都有其特定的使用場景,但是也有各自的問題,都不是完美的解決方案

RocketMQ 中使用的是 事務訊息加上事務反查機制 來解決分散式事務問題的。我畫了張圖,大家可以對照著圖進行理解。

在第一步傳送的 half 訊息 ,它的意思是 在事務提交之前,對於消費者來說,這個訊息是不可見的

那麼,如何做到寫入訊息但是對使用者不可見呢?RocketMQ事務訊息的做法是:如果訊息是half訊息,將備份原訊息的主題與訊息消費佇列,然後 改變主題 為RMQ_SYS_TRANS_HALF_TOPIC。由於消費組未訂閱該主題,故消費端無法消費half型別的訊息,然後RocketMQ會開啟一個定時任務,從Topic為RMQ_SYS_TRANS_HALF_TOPIC中拉取訊息進行消費,根據生產者組獲取一個服務提供者傳送回查事務狀態請求,根據事務狀態來決定是提交或回滾訊息。

你可以試想一下,如果沒有從第5步開始的 事務反查機制 ,如果出現網路波動第4步沒有傳送成功,這樣就會產生 MQ 不知道是不是需要給消費者消費的問題,他就像一個無頭蒼蠅一樣。在 RocketMQ 中就是使用的上述的事務反查來解決的,而在 Kafka 中通常是直接丟擲一個異常讓使用者來自行解決。

你還需要注意的是,在 MQ Server 指向系統B的操作已經和系統A不相關了,也就是說在訊息佇列中的分散式事務是——本地事務和儲存訊息到訊息佇列才是同一個事務。這樣也就產生了事務的最終一致性,因為整個過程是非同步的,每個系統只要保證它自己那一部分的事務就行了

訊息堆積問題

在上面我們提到了訊息佇列一個很重要的功能——削峰 。那麼如果這個峰值太大了導致訊息堆積在佇列中怎麼辦呢?

其實這個問題可以將它廣義化,因為產生訊息堆積的根源其實就只有兩個——生產者生產太快或者消費者消費太慢。

我們可以從多個角度去思考解決這個問題,當流量到峰值的時候是因為生產者生產太快,我們可以使用一些 限流降級 的方法,當然你也可以增加多個消費者例項去水平擴充套件增加消費能力來匹配生產的激增。如果消費者消費過慢的話,我們可以先檢查 是否是消費者出現了大量的消費錯誤 ,或者列印一下日誌檢視是否是哪一個執行緒卡死,出現了鎖資源不釋放等等的問題。

當然,最快速解決訊息堆積問題的方法還是增加消費者例項,不過 同時你還需要增加每個主題的佇列數量

別忘了在 RocketMQ 中,一個佇列只會被一個消費者消費 ,如果你僅僅是增加消費者例項就會出現我一開始給你畫架構圖的那種情況。

回溯消費

回溯消費是指 Consumer 已經消費成功的訊息,由於業務上需求需要重新消費,在RocketMQ 中, Broker 在向Consumer 投遞成功訊息後,訊息仍然需要保留 。並且重新消費一般是按照時間維度,例如由於 Consumer 系統故障,恢復後需要重新消費1小時前的資料,那麼 Broker 要提供一種機制,可以按照時間維度來回退消費進度。RocketMQ 支援按照時間回溯消費,時間維度精確到毫秒。

這是官方檔案的解釋,我直接照搬過來就當科普了???。

RocketMQ 的刷盤機制

上面我講了那麼多的 RocketMQ 的架構和設計原理,你有沒有好奇

Topic 中的 佇列是以什麼樣的形式存在的?

佇列中的訊息又是如何進行儲存持久化的呢?

我在上文中提到的 同步刷盤非同步刷盤 又是什麼呢?它們會給持久化帶來什麼樣的影響呢?

下面我將給你們一一解釋。

同步刷盤和非同步刷盤

如上圖所示,在同步刷盤中需要等待一個刷盤成功的 ACK ,同步刷盤對 MQ 訊息可靠性來說是一種不錯的保障,但是 效能上會有較大影響 ,一般地適用於金融等特定業務場景。

而非同步刷盤往往是開啟一個執行緒去非同步地執行刷盤操作。訊息刷盤採用後臺非同步執行緒提交的方式進行, 降低了讀寫延遲 ,提高了 MQ 的效能和吞吐量,一般適用於如發驗證碼等對於訊息保證要求不太高的業務場景。

一般地,非同步刷盤只有在 Broker 意外宕機的時候會丟失部分資料,你可以設定 Broker 的引數 FlushDiskType 來調整你的刷盤策略(ASYNC_FLUSH 或者 SYNC_FLUSH)。

同步複製和非同步複製

上面的同步刷盤和非同步刷盤是在單個結點層面的,而同步複製和非同步複製主要是指的 Borker 主從模式下,主節點返回訊息給客戶端的時候是否需要同步從節點。

  • 同步複製: 也叫 “同步雙寫”,也就是說,只有訊息同步雙寫到主從結點上時才返回寫入成功
  • 非同步複製: 訊息寫入主節點之後就直接返回寫入成功

然而,很多事情是沒有完美的方案的,就比如我們進行訊息寫入的節點越多就更能保證訊息的可靠性,但是隨之的效能也會下降,所以需要程式設計師根據特定業務場景去選擇適應的主從複製方案。

那麼,非同步複製會不會也像非同步刷盤那樣影響訊息的可靠性呢?

答案是不會的,因為兩者就是不同的概念,對於訊息可靠性是通過不同的刷盤策略保證的,而像非同步同步複製策略僅僅是影響到了 可用性 。為什麼呢?其主要原因RocketMQ 是不支援自動主從切換的,當主節點掛掉之後,生產者就不能再給這個主節點生產訊息了

比如這個時候採用非同步複製的方式,在主節點還未傳送完需要同步的訊息的時候主節點掛掉了,這個時候從節點就少了一部分訊息。但是此時生產者無法再給主節點生產訊息了,消費者可以自動切換到從節點進行消費(僅僅是消費),所以在主節點掛掉的時間只會產生主從結點短暫的訊息不一致的情況,降低了可用性,而當主節點重啟之後,從節點那部分未來得及複製的訊息還會繼續複製。

在單主從架構中,如果一個主節點掛掉了,那麼也就意味著整個系統不能再生產了。那麼這個可用性的問題能否解決呢?一個主從不行那就多個主從的唄,別忘了在我們最初的架構圖中,每個 Topic 是分佈在不同 Broker 中的。

但是這種複製方式同樣也會帶來一個問題,那就是無法保證 嚴格順序 。在上文中我們提到了如何保證的訊息順序性是通過將一個語義的訊息傳送在同一個佇列中,使用 Topic 下的佇列來保證順序性的。如果此時我們主節點A負責的是訂單A的一系列語義訊息,然後它掛了,這樣其他節點是無法代替主節點A的,如果我們任意節點都可以存入任何訊息,那就沒有順序性可言了。

而在 RocketMQ 中採用了 Dledger 解決這個問題。他要求在寫入訊息的時候,要求至少訊息複製到半數以上的節點之後,才給客⼾端返回寫⼊成功,並且它是⽀持通過選舉來動態切換主節點的。這裡我就不展開說明瞭,讀者可以自己去了解。

也不是說 Dledger 是個完美的方案,至少在 Dledger 選舉過程中是無法提供服務的,而且他必須要使用三個節點或以上,如果多數節點同時掛掉他也是無法保證可用性的,而且要求訊息複製板書以上節點的效率和直接非同步複製還是有一定的差距的。

儲存機制

還記得上面我們一開始的三個問題嗎?到這裡第三個問題已經解決了。

但是,在 佇列是以什麼樣的形式存在的?佇列中的訊息又是如何進行儲存持久化的呢? 還未解決,其實這裡涉及到了 RocketMQ 是如何設計它的儲存結構了。我首先想大家介紹 RocketMQ 訊息儲存架構中的三大角色——CommitLogConsumeQueueIndexFile

  • CommitLog訊息主體以及元資料的儲存主體,儲存 Producer 端寫入的訊息主體內容,訊息內容不是定長的。單個檔案大小預設1G ,檔名長度為20位,左邊補零,剩餘為起始偏移量,比如00000000000000000000代表了第一個檔案,起始偏移量為0,檔案大小為1G=1073741824;當第一個檔案寫滿了,第二個檔案為00000000001073741824,起始偏移量為1073741824,以此類推。訊息主要是順序寫入日誌檔案,當檔案滿了,寫入下一個檔案。
  • ConsumeQueue: 訊息消費佇列,引入的目的主要是提高訊息消費的效能(我們再前面也講了),由於RocketMQ 是基於主題 Topic 的訂閱模式,訊息消費是針對主題進行的,如果要遍歷 commitlog 檔案中根據 Topic 檢索訊息是非常低效的。Consumer 即可根據 ConsumeQueue 來查詢待消費的訊息。其中,ConsumeQueue(邏輯消費佇列)作為消費訊息的索引,儲存了指定 Topic 下的佇列訊息在 CommitLog 中的 起始物理偏移量 offset訊息大小size 和訊息 TagHashCode 值。consumequeue檔案 可以看成是基於topic的 commitlog索引檔案 ,故 consumequeue 資料夾的組織方式如下:topic/queue/file三層組織結構,具體儲存路徑為:$HOME/store/consumequeue/{topic}/{queueId}/{fileName}。同樣 consumequeue 檔案採取定長設計,每一個條目共20個位元組,分別為8位元組的 commitlog 物理偏移量、4位元組的訊息長度、8位元組tag hashcode,單個檔案由30W個條目組成,可以像陣列一樣隨機訪問每一個條目,每個 ConsumeQueue檔案大小約5.72M;
  • IndexFileIndexFile(索引檔案)提供了一種可以通過key或時間區間來查詢訊息的方法。這裡只做科普不做詳細介紹。

總結來說,整個訊息儲存的結構,最主要的就是 CommitLoqConsumeQueue 。而 ConsumeQueue 你可以大概理解為 Topic 中的佇列。

RocketMQ 採用的是 混合型的儲存結構 ,即為 Broker 單個例項下所有的佇列共用一個日誌資料檔案來儲存訊息。有意思的是在同樣高併發的 Kafka 中會為每個 Topic 分配一個儲存檔案。這就有點類似於我們有一大堆書需要裝上書架,RockeMQ 是不分書的種類直接成批的塞上去的,而 Kafka 是將書本放入指定的分類區域的。

RocketMQ 為什麼要這麼做呢?原因是 提高資料的寫入效率 ,不分 Topic 意味著我們有更大的機率獲取 成批 的訊息進行資料寫入,但也會帶來一個麻煩就是讀取訊息的時候需要遍歷整個大檔案,這是非常耗時的。

所以,在 RocketMQ 中又使用了 ConsumeQueue 作為每個佇列的索引檔案來 提升讀取訊息的效率。我們可以直接根據佇列的訊息序號,計算出索引的全域性位置(索引序號*索引固定⻓度20),然後直接讀取這條索引,再根據索引中記錄的訊息的全域性位置,找到訊息。

講到這裡,你可能對 RockeMQ 的儲存架構還有些模糊,沒事,我們結合著圖來理解一下。

emmm,是不是有一點複雜?,看英文圖片和英文檔案的時候就不要慫,硬著頭皮往下看就行。

如果上面沒看懂的讀者一定要認真看下面的流程分析!

首先,在最上面的那一塊就是我剛剛講的你現在可以直接 ConsumerQueue 理解為 Queue

在圖中最左邊說明瞭 紅色方塊 代表被寫入的訊息,虛線方塊代表等待被寫入的。左邊的生產者傳送訊息會指定 QueueId 和具體訊息內容,而在 Broker 中管你是哪門子訊息,他直接 **全部順序儲存到了 CommitLog **。而根據生產者指定的 TopicQueueId 將這條訊息本身在 CommitLog 的偏移(offset),訊息本身大小,和tag的hash值存入對應的 ConsumeQueue 索引檔案中。而在每個佇列中都儲存了 ConsumeOffset 即每個消費者組的消費位置(我在架構那裡提到了,忘了的同學可以回去看一下),而消費者拉取訊息進行消費的時候只需要根據 ConsumeOffset 獲取下一個未被消費的訊息就行了。

上述就是我對於整個訊息儲存架構的大概理解(這裡不涉及到一些細節討論,比如稀疏索引等等問題),希望對你有幫助。

因為有一個知識點因為寫嗨了忘講了,想想在哪裡加也不好,所以我留給大家去思考??一下吧。

為什麼 CommitLog 檔案要設計成固定大小的長度呢?提醒:記憶體對映機制

總結

總算把這篇部落格寫完了。我講的你們還記得嗎??

這篇文章中我主要想大家介紹了

訊息隊列出現的原因

訊息佇列的作用(非同步,解耦,削峰)

訊息佇列帶來的一系列問題(訊息堆積、重複消費、順序消費、分散式事務等等)

訊息佇列的兩種訊息模型——佇列和主題模式

分析了 RocketMQ 的技術架構(Comsumer)

結合著 RocketMQ 回答了訊息佇列副作用的解決方案

介紹了 RocketMQ 的儲存機制和刷盤策略。

等等。。。

如果喜歡可以點贊喲???。