1. 程式人生 > >MQ學習筆記整理

MQ學習筆記整理

MQ學習筆記

一、為什麼要使用MQ?

其實這裡要講的就是使用MQ的好處,MQ的的使用場景有很多,但是比較核心的有3個:解耦、非同步、削峰

1. 解耦

例如:A系統要傳送資料到B、C、D三個系統,通過介面呼叫傳送。假如現在又添加了一個E系統,也要資料,A系統需要修改;B系統說我現在不需要這個資料了,A系統還是要修改。這種情況下,A系統的維護者肯定很崩潰。

解耦前

其實這個呼叫是不需要直接同步呼叫介面的,如果用MQ給他非同步化解耦,也是可以的

在這裡插入圖片描述

2. 非同步

A系統接收一個請求,需要在自己本地寫庫,還需要在BCD三個系統寫庫,自己本地寫庫要3ms,BCD三個系統分別寫庫要300ms、450ms、200ms。最終請求總延時是3 + 300 + 450 + 200 = 953ms,接近1s,使用者感覺搞個什麼東西,慢死了慢死了。

在這裡插入圖片描述

當使用 MQ進行非同步儲存的時候:
在這裡插入圖片描述

3. 削峰

每天0點到11點,A系統風平浪靜,每秒併發請求數量就100個。結果每次一到11點~1點,每秒併發請求數量突然會暴增到1萬條。但是系統最大的處理能力就只能是每秒鐘處理1000個請求啊。。。尷尬了,系統會死。。。

在這裡插入圖片描述

使用MQ進行非同步處理以後,可以很大程度上減輕MySQL的壓力:

在這裡插入圖片描述

二、使用MQ會帶來哪些缺點?

1. 系統可用性降低

系統引入的外部依賴越多,越容易掛掉,本來你就是A系統呼叫BCD三個系統的介面就好了,人ABCD四個系統好好的,沒啥問題,你偏加個MQ進來,萬一MQ掛了咋整?MQ掛了,整套系統崩潰了,你不就完了麼。

2. 系統複雜性提高

硬生生加個MQ進來,你怎麼保證訊息沒有重複消費?怎麼處理訊息丟失的情況?怎麼保證訊息傳遞的順序性?頭大頭大,問題一大堆,痛苦不已

3. 一致性問題

A系統處理完了直接返回成功了,人都以為你這個請求就成功了;但是問題是,要是BCD三個系統那裡,BD兩個系統寫庫成功了,結果C系統寫庫失敗了,咋整?你這資料就不一致了。

所以訊息佇列實際是一種非常複雜的架構,你引入它有很多好處,但是也得針對它帶來的壞處做各種額外的技術方案和架構來規避掉,最好之後,你會發現,媽呀,系統複雜度提升了一個數量級,也許是複雜了10倍。但是關鍵時刻,用,還是得用的。

在這裡插入圖片描述

三、kafka、activemq、rabbitmq、rocketmq都有什麼優點和缺點啊?

你起碼得知道各種mq的優點和缺點吧,咱們來畫個表格看看

特性 ActiveMQ RabbitMQ RocketMQ Kafka
單機吞吐量 萬級,吞吐量比RocketMQ和Kafka要低了一個數量級 萬級,吞吐量比RocketMQ和Kafka要低了一個數量級 10萬級,RocketMQ也是可以支撐高吞吐的一種MQ 10萬級別,這是kafka最大的優點,就是吞吐量高。 一般配合大資料類的系統來進行實時資料計算、日誌採集等場景
topic數量對吞吐量的影響 topic可以達到幾百,幾千個的級別,吞吐量會有較小幅度的下降 這是RocketMQ的一大優勢,在同等機器下,可以支撐大量的topic topic從幾十個到幾百個的時候,吞吐量會大幅度下降 所以在同等機器下,kafka儘量保證topic數量不要過多。如果要支撐大規模topic,需要增加更多的機器資源
時效性 ms級 微秒級,這是rabbitmq的一大特點,延遲是最低的 ms級 延遲在ms級以內
可用性 高,基於主從架構實現高可用性 高,基於主從架構實現高可用性 非常高,分散式架構 非常高,kafka是分散式的,一個數據多個副本,少數機器宕機,不會丟失資料,不會導致不可用
訊息可靠性 有較低的概率丟失資料 經過引數優化配置,可以做到0丟失 經過引數優化配置,訊息可以做到0丟失
功能支援 MQ領域的功能極其完備 基於erlang開發,所以併發能力很強,效能極其好,延時很低 MQ功能較為完善,還是分散式的,擴充套件性好 功能較為簡單,主要支援簡單的MQ功能,在大資料領域的實時計算以及日誌採集被大規模使用,是事實上的標準
優劣勢總結 非常成熟,功能強大,在業內大量的公司以及專案中都有應用 偶爾會有較低概率丟失訊息 而且現在社群以及國內應用都越來越少,官方社群現在對ActiveMQ 5.x維護越來越少,幾個月才釋出一個版本 而且確實主要是基於解耦和非同步來用的,較少在大規模吞吐的場景中使用 erlang語言開發,效能極其好,延時很低; 吞吐量到萬級,MQ功能比較完備 而且開源提供的管理介面非常棒,用起來很好用 社群相對比較活躍,幾乎每個月都發布幾個版本分 在國內一些網際網路公司近幾年用rabbitmq也比較多一些 但是問題也是顯而易見的,RabbitMQ確實吞吐量會低一些,這是因為他做的實現機制比較重。 而且erlang開發,國內有幾個公司有實力做erlang原始碼級別的研究和定製?如果說你沒這個實力的話,確實偶爾會有一些問題,你很難去看懂原始碼,你公司對這個東西的掌控很弱,基本職能依賴於開源社群的快速維護和修復bug。 而且rabbitmq叢集動態擴充套件會很麻煩,不過這個我覺得還好。其實主要是erlang語言本身帶來的問題。很難讀原始碼,很難定製和掌控。 介面簡單易用,而且畢竟在阿里大規模應用過,有阿里品牌保障 日處理訊息上百億之多,可以做到大規模吞吐,效能也非常好,分散式擴充套件也很方便,社群維護還可以,可靠性和可用性都是ok的,還可以支撐大規模的topic數量,支援複雜MQ業務場景 而且一個很大的優勢在於,阿里出品都是java系的,我們可以自己閱讀原始碼,定製自己公司的MQ,可以掌控 社群活躍度相對較為一般,不過也還可以,文件相對來說簡單一些,然後介面這塊不是按照標準JMS規範走的有些系統要遷移需要修改大量程式碼 還有就是阿里出臺的技術,你得做好這個技術萬一被拋棄,社群黃掉的風險,那如果你們公司有技術實力我覺得用RocketMQ挺好的 kafka的特點其實很明顯,就是僅僅提供較少的核心功能,但是提供超高的吞吐量,ms級的延遲,極高的可用性以及可靠性,而且分散式可以任意擴充套件 同時kafka最好是支撐較少的topic數量即可,保證其超高吞吐量 而且kafka唯一的一點劣勢是有可能訊息重複消費,那麼對資料準確性會造成極其輕微的影響,在大資料領域中以及日誌採集中,這點輕微影響可以忽略 這個特性天然適合大資料實時計算以及日誌收集

四、如何保證訊息佇列的高可用啊?

1. RabbitMQ的高可用性

rabbitmq有三種模式:單機模式,普通叢集模式,映象叢集模式

1.1 單機模式

就是demo級別的,一般就是你本地啟動了玩玩兒的,沒人生產用單機模式

1.2 普通叢集模式

意思就是在多臺機器上啟動多個rabbitmq例項,每個機器啟動一個。但是你建立的queue,只會放在一個rabbtimq例項上,但是每個例項都同步queue的元資料。完了你消費的時候,實際上如果連線到了另外一個例項,那麼那個例項會從queue所在例項上拉取資料過來。不過,這種方式確實很麻煩,也不怎麼好,沒做到所謂的分散式,就是個普通叢集。因為這導致你要麼消費者每次隨機連線一個例項然後拉取資料,要麼固定連線那個queue所在例項消費資料,前者有資料拉取的開銷,後者導致單例項效能瓶頸。而且如果那個放queue的例項宕機了,會導致接下來其他例項就無法從那個例項拉取,如果你開啟了訊息持久化,讓rabbitmq落地儲存訊息的話,訊息不一定會丟,得等這個例項恢復了,然後才可以繼續從這個queue拉取資料。所以這個事兒就比較尷尬了,這就沒有什麼所謂的高可用性可言了,這方案主要是提高吞吐量的,就是說讓叢集中多個節點來服務某個queue的讀寫操作。

在這裡插入圖片描述

1.3 映象叢集模式

這種模式,才是所謂的rabbitmq的高可用模式,跟普通叢集模式不一樣的是,你建立的queue,無論元資料還是queue裡的訊息都會存在於多個例項上,然後每次你寫訊息到queue的時候,都會自動把訊息到多個例項的queue裡進行訊息同步。這樣的話,好處在於,你任何一個機器宕機了,沒事兒,別的機器都可以用。壞處在於,第一,這個效能開銷也太大了吧,訊息同步所有機器,導致網路頻寬壓力和消耗很重!第二,這麼玩兒,就沒有擴充套件性可言了,如果某個queue負載很重,你加機器,新增的機器也包含了這個queue的所有資料,並沒有辦法線性擴充套件你的queue

在這裡插入圖片描述

2. kafka的高可用性

kafka一個最基本的架構認識:多個broker組成,每個broker是一個節點;你建立一個topic,這個topic可以劃分為多個partition,每個partition可以存在於不同的broker上,每個partition就放一部分資料。這就是天然的分散式訊息佇列,就是說一個topic的資料,是分散放在多個機器上的,每個機器就放一部分資料。

實際上rabbitmq之類的,並不是分散式訊息佇列,他就是傳統的訊息佇列,只不過提供了一些叢集、HA的機制而已,因為無論怎麼玩兒,rabbitmq一個queue的資料都是放在一個節點裡的,映象叢集下,也是每個節點都放這個queue的完整資料。

kafka 0.8以前,是沒有HA機制的,就是任何一個broker宕機了,那個broker上的partition就廢了,沒法寫也沒法讀,沒有什麼高可用性可言。

kafka 0.8以後,提供了HA機制,就是replica副本機制。每個partition的資料都會同步到吉他機器上,形成自己的多個replica副本。然後所有replica會選舉一個leader出來,那麼生產和消費都跟這個leader打交道,然後其他replica就是follower。寫的時候,leader會負責把資料同步到所有follower上去,讀的時候就直接讀leader上資料即可。只能讀寫leader?很簡單,要是你可以隨意讀寫每個follower,那麼就要care資料一致性的問題,系統複雜度太高,很容易出問題。kafka會均勻的將一個partition的所有replica分佈在不同的機器上,這樣才可以提高容錯性。這麼搞,就有所謂的高可用性了,因為如果某個broker宕機了,沒事兒,那個broker上面的partition在其他機器上都有副本的,如果這上面有某個partition的leader,那麼此時會重新選舉一個新的leader出來,大家繼續讀寫那個新的leader即可。這就有所謂的高可用性了。

寫資料的時候,生產者就寫leader,然後leader將資料落地寫本地磁碟,接著其他follower自己主動從leader來pull資料。一旦所有follower同步好資料了,就會發送ack給leader,leader收到所有follower的ack之後,就會返回寫成功的訊息給生產者。(當然,這只是其中一種模式,還可以適當調整這個行為)

消費的時候,只會從leader去讀,但是隻有一個訊息已經被所有follower都同步成功返回ack的時候,這個訊息才會被消費者讀到。

在這裡插入圖片描述

3. 避免消費重複資料

3.1 為什麼會產生重複資料

首先就是比如rabbitmq、rocketmq、kafka,都有可能會出現消費重複消費的問題,正常。因為這問題通常不是mq自己保證的,是給你保證的。然後我們挑一個kafka來舉個例子,說說怎麼重複消費吧。

kafka實際上有個offset的概念,就是每個訊息寫進去,都有一個offset,代表他的序號,然後consumer消費了資料之後,每隔一段時間,會把自己消費過的訊息的offset提交一下,代表我已經消費過了,下次我要是重啟啥的,你就讓我繼續從上次消費到的offset來繼續消費吧。

但是凡事總有意外,比如我們之前生產經常遇到的,就是你有時候重啟系統,看你怎麼重啟了,如果碰到點著急的,直接kill程序了,再重啟。這會導致consumer有些訊息處理了,但是沒來得及提交offset,尷尬了。重啟之後,少數訊息會再次消費一次。

其實重複消費不可怕,可怕的是你沒考慮到重複消費之後,怎麼保證冪等性。

在這裡插入圖片描述

3.2 怎麼保證訊息佇列消費的冪等性?

(1)比如你拿個資料要寫庫,你先根據主鍵查一下,如果這資料都有了,你就別插入了,update一下好吧

(2)比如你是寫redis,那沒問題了,反正每次都是set,天然冪等性

(3)比如你不是上面兩個場景,那做的稍微複雜一點,你需要讓生產者傳送每條資料的時候,裡面加一個全域性唯一的id,類似訂單id之類的東西,然後你這裡消費到了之後,先根據這個id去比如redis裡查一下,之前消費過嗎?如果沒有消費過,你就處理,然後這個id寫redis。如果消費過了,那你就別處理了,保證別重複處理相同的訊息即可。

還有比如基於資料庫的唯一鍵來保證重複資料不會重複插入多條,我們之前線上系統就有這個問題,就是拿到資料的時候,每次重啟可能會有重複,因為kafka消費者還沒來得及提交offset,重複資料拿到了以後我們插入的時候,因為有唯一鍵約束了,所以重複資料只會插入報錯,不會導致資料庫中出現髒資料

在這裡插入圖片描述

4. MQ資料丟失問題

丟資料,mq一般分為兩種,要麼是mq自己弄丟了,要麼是我們生產或者消費的時候弄丟了。咱們從rabbitmq和kafka分別來分析一下

4.1 rabbitmq

1)生產者弄丟了資料

生產者將資料傳送到rabbitmq的時候,可能資料就在半路給搞丟了,因為網路啥的問題,都有可能。

此時可以選擇用rabbitmq提供的事務功能,就是生產者傳送資料之前開啟rabbitmq事務(channel.txSelect),然後傳送訊息,如果訊息沒有成功被rabbitmq接收到,那麼生產者會收到異常報錯,此時就可以回滾事務(channel.txRollback),然後重試傳送訊息;如果收到了訊息,那麼可以提交事務(channel.txCommit)。但是問題是,rabbitmq事務機制一搞,基本上吞吐量會下來,因為太耗效能。

所以一般來說,如果你要確保說寫rabbitmq的訊息別丟,可以開啟confirm模式,在生產者那裡設定開啟confirm模式之後,你每次寫的訊息都會分配一個唯一的id,然後如果寫入了rabbitmq中,rabbitmq會給你回傳一個ack訊息,告訴你說這個訊息ok了。如果rabbitmq沒能處理這個訊息,會回撥你一個nack介面,告訴你這個訊息接收失敗,你可以重試。而且你可以結合這個機制自己在記憶體裡維護每個訊息id的狀態,如果超過一定時間還沒接收到這個訊息的回撥,那麼你可以重發。

事務機制和cnofirm機制最大的不同在於,事務機制是同步的,你提交一個事務之後會阻塞在那兒,但是confirm機制是非同步的,你傳送個訊息之後就可以傳送下一個訊息,然後那個訊息rabbitmq接收了之後會非同步回撥你一個介面通知你這個訊息接收到了。

所以一般在生產者這塊避免資料丟失,都是用confirm機制的。

2)rabbitmq弄丟了資料

就是rabbitmq自己弄丟了資料,這個你必須開啟rabbitmq的持久化,就是訊息寫入之後會持久化到磁碟,哪怕是rabbitmq自己掛了,恢復之後會自動讀取之前儲存的資料,一般資料不會丟。除非極其罕見的是,rabbitmq還沒持久化,自己就掛了,可能導致少量資料會丟失的,但是這個概率較小。

設定持久化有兩個步驟,第一個是建立queue的時候將其設定為持久化的,這樣就可以保證rabbitmq持久化queue的元資料,但是不會持久化queue裡的資料;第二個是傳送訊息的時候將訊息的deliveryMode設定為2,就是將訊息設定為持久化的,此時rabbitmq就會將訊息持久化到磁碟上去。必須要同時設定這兩個持久化才行,rabbitmq哪怕是掛了,再次重啟,也會從磁碟上重啟恢復queue,恢復這個queue裡的資料。

而且持久化可以跟生產者那邊的confirm機制配合起來,只有訊息被持久化到磁碟之後,才會通知生產者ack了,所以哪怕是在持久化到磁碟之前,rabbitmq掛了,資料丟了,生產者收不到ack,你也是可以自己重發的。

哪怕是你給rabbitmq開啟了持久化機制,也有一種可能,就是這個訊息寫到了rabbitmq中,但是還沒來得及持久化到磁碟上,結果不巧,此時rabbitmq掛了,就會導致記憶體裡的一點點資料會丟失。

3)消費端弄丟了資料

rabbitmq如果丟失了資料,主要是因為你消費的時候,剛消費到,還沒處理,結果程序掛了,比如重啟了,那麼就尷尬了,rabbitmq認為你都消費了,這資料就丟了。

這個時候得用rabbitmq提供的ack機制,簡單來說,就是你關閉rabbitmq自動ack,可以通過一個api來呼叫就行,然後每次你自己程式碼裡確保處理完的時候,再程式裡ack一把。這樣的話,如果你還沒處理完,不就沒有ack?那rabbitmq就認為你還沒處理完,這個時候rabbitmq會把這個消費分配給別的consumer去處理,訊息是不會丟的。

在這裡插入圖片描述

4.2 kafka

1)消費端弄丟了資料

唯一可能導致消費者弄丟資料的情況,就是說,你那個消費到了這個訊息,然後消費者那邊自動提交了offset,讓kafka以為你已經消費好了這個訊息,其實你剛準備處理這個訊息,你還沒處理,你自己就掛了,此時這條訊息就丟咯。

這不是一樣麼,大家都知道kafka會自動提交offset,那麼只要關閉自動提交offset,在處理完之後自己手動提交offset,就可以保證資料不會丟。但是此時確實還是會重複消費,比如你剛處理完,還沒提交offset,結果自己掛了,此時肯定會重複消費一次,自己保證冪等性就好了。

生產環境碰到的一個問題,就是說我們的kafka消費者消費到了資料之後是寫到一個記憶體的queue裡先緩衝一下,結果有的時候,你剛把訊息寫入記憶體queue,然後消費者會自動提交offset。

然後此時我們重啟了系統,就會導致記憶體queue裡還沒來得及處理的資料就丟失了

2)kafka弄丟了資料

這塊比較常見的一個場景,就是kafka某個broker宕機,然後重新選舉partiton的leader時。大家想想,要是此時其他的follower剛好還有些資料沒有同步,結果此時leader掛了,然後選舉某個follower成leader之後,他不就少了一些資料?這就丟了一些資料啊。

生產環境也遇到過,我們也是,之前kafka的leader機器宕機了,將follower切換為leader之後,就會發現說這個資料就丟了

所以此時一般是要求起碼設定如下4個引數:

給這個topic設定replication.factor引數:這個值必須大於1,要求每個partition必須有至少2個副本

在kafka服務端設定min.insync.replicas引數:這個值必須大於1,這個是要求一個leader至少感知到有至少一個follower還跟自己保持聯絡,沒掉隊,這樣才能確保leader掛了還有一個follower吧

在producer端設定acks=all:這個是要求每條資料,必須是寫入所有replica之後,才能認為是寫成功了

在producer端設定retries=MAX(很大很大很大的一個值,無限次重試的意思):這個是要求一旦寫入失敗,就無限重試,卡在這裡了

我們生產環境就是按照上述要求配置的,這樣配置之後,至少在kafka broker端就可以保證在leader所在broker發生故障,進行leader切換時,資料不會丟失

3)生產者會不會弄丟資料

如果按照上述的思路設定了ack=all,一定不會丟,要求是,你的leader接收到訊息,所有的follower都同步到了訊息之後,才認為本次寫成功了。如果沒滿足這個條件,生產者會自動不斷的重試,重試無限次。

5. 如何保證MQ訊息的順序性

5.1 順序會錯亂的倆場景:

(1)rabbitmq:一個queue,多個consumer,這不明顯亂了
(2)kafka:一個topic,一個partition,一個consumer,內部多執行緒,這不也明顯亂了

在這裡插入圖片描述
在這裡插入圖片描述

5.2 那如何保證訊息的順序性呢?

(1)rabbitmq:拆分多個queue,每個queue一個consumer,就是多一些queue而已,確實是麻煩點;或者就一個queue但是對應一個consumer,然後這個consumer內部用記憶體佇列做排隊,然後分發給底層不同的worker來處理
(2)kafka:一個topic,一個partition,一個consumer,內部單執行緒消費,寫N個記憶體queue,然後N個執行緒分別消費一個記憶體queue即可。

​![](C:\Users\chengli\Desktop\新建資料夾\images2\002.png)
在這裡插入圖片描述

6. 如何解決訊息佇列的延時以及過期失效問題?訊息佇列滿了以後該怎麼處理?有幾百萬訊息持續積壓幾小時,說說怎麼解決?

6.1 大量訊息在mq裡積壓了幾個小時了還沒解決

幾千萬條資料在MQ裡積壓了七八個小時,這個時候第一反應就是修復consumer的問題,讓他恢復消費速度,然後傻傻的等待幾個小時消費完畢。但是,假如一個消費者一秒是1000條,一秒3個消費者是3000條,一分鐘是18萬條,1000多萬條,所以如果你積壓了幾百萬到上千萬的資料,即使消費者恢復了,也需要大概1小時的時間才能恢復過來。一般這個時候,只能操作臨時緊急擴容了,具體操作步驟和思路如下:

1)先修復consumer的問題,確保其恢復消費速度,然後將現有cnosumer都停掉
2)新建一個topic,queue是原來的10倍,臨時建立好原先10倍或者20倍的queue數量
3)然後寫一個臨時的分發資料的consumer程式,這個程式部署上去消費積壓的資料,消費之後不做耗時的處理,直接均勻輪詢寫入臨時建立好的10倍數量的queue
4)接著臨時徵用10倍的機器來部署consumer,每一批consumer消費一個臨時queue的資料
5)這種做法相當於是臨時將queue資源和consumer資源擴大10倍,以正常的10倍速度來消費資料
6)等快速消費完積壓資料之後,得恢復原先部署架構,重新用原先的consumer機器來消費訊息

6.2 訊息堆積導致過期失效

假設你用的是rabbitmq,rabbitmq是可以設定過期時間的,就是TTL,如果訊息在queue中積壓超過一定的時間就會被rabbitmq給清理掉,這個資料就沒了。那這就是第二個坑了。這就不是說資料會大量積壓在mq裡,而是大量的資料會直接搞丟。
這個情況下,就不是說要增加consumer消費積壓的訊息,因為實際上沒啥積壓,而是丟了大量的訊息。我們可以採取一個方案,就是批量重導,這個我們之前線上也有類似的場景幹過。就是大量積壓的時候,我們當時就直接丟棄資料了,然後等過了高峰期以後,比如大家一起喝咖啡熬夜到晚上12點以後,使用者都睡覺了。
這個時候我們就開始寫程式,將丟失的那批資料,寫個臨時程式,一點一點的查出來,然後重新灌入mq裡面去,把白天丟的資料給他補回來。也只能是這樣了。
假設1萬個訂單積壓在mq裡面,沒有處理,其中1000個訂單都丟了,你只能手動寫程式把那1000個訂單給查出來,手動發到mq裡去再補一次

6.3 訊息堆積導致磁碟滿了

如果走的方式是訊息積壓在mq裡,那麼如果你很長時間都沒處理掉,此時導致mq都快寫滿了,咋辦?這個還有別的辦法嗎?沒有,誰讓你第一個方案執行的太慢了,你臨時寫程式,接入資料來消費,消費一個丟棄一個,都不要了,快速消費掉所有的訊息。然後走第二個方案,到了晚上再補資料吧。
在這裡插入圖片描述