1. 程式人生 > 實用技巧 >訊息佇列學習(1)

訊息佇列學習(1)

轉載:https://mp.weixin.qq.com/s/sKaYMXLX7aUWKylcUX3j4g

訊息佇列分兩種模式:佇列模式和釋出訂閱模式
佇列模式:生產者往某個佇列裡面傳送訊息,一個佇列可以儲存多個生產者的訊息,一個佇列也可以有多個消費者, 但是消費者之間是競爭關係,即每條訊息只能被一個消費者消費。


釋出訂閱模式:為了解決一條訊息能被多個消費者消費的問題,釋出/訂閱模型就來了。該模型是將訊息發往一個Topic即主題中,所有訂閱了這個 Topic 的訂閱者都能消費這條訊息。
其實可以這麼理解,釋出/訂閱模型等於我們都加入了一個群聊中,我發一條訊息,加入了這個群聊的人都能收到這條訊息。那麼佇列模型就是一對一聊天,我發給你的訊息,只能在你的聊天視窗彈出,是不可能彈出到別人的聊天視窗中的。

講到這有人說,那我一對一聊天對每個人都發同樣的訊息不就也實現了一條訊息被多個人消費了嘛。
是的,通過多佇列全量儲存相同的訊息,即資料的冗餘可以實現一條訊息被多個消費者消費。RabbitMQ 就是採用佇列模型,通過 Exchange 模組來將訊息傳送至多個佇列,解決一條訊息需要被多個消費者消費問題。
這裡還能看到假設群聊裡除我之外只有一個人,那麼此時的釋出/訂閱模型和佇列模型其實就一樣了。

佇列模型每條訊息只能被一個消費者消費,而釋出/訂閱模型就是為讓一條訊息可以被多個消費者消費而生的,當然佇列模型也可以通過訊息全量儲存至多個佇列來解決一條訊息被多個消費者消費問題,但是會有資料的冗餘。
釋出/訂閱模型相容佇列模型,即只有一個消費者的情況下和佇列模型基本一致。

RabbitMQ 採用佇列模型,RocketMQ和Kafka 採用釋出/訂閱模型。

常用術語

一般我們稱傳送訊息方為生產者 Producer,接受消費訊息方為消費者Consumer,訊息佇列服務端為Broker。
訊息從Producer發往Broker,Broker將訊息儲存至本地,然後Consumer從Broker拉取訊息,或者Broker推送訊息至Consumer,最後消費。

為了提高併發度,往往釋出/訂閱模型還會引入佇列或者分割槽的概念。即訊息是發往一個主題下的某個佇列或者某個分割槽中。RocketMQ中叫佇列,Kafka叫分割槽,本質一樣。

例如某個主題下有 5 個佇列,那麼這個主題的併發度就提高為 5 ,同時可以有 5 個消費者並行消費

該主題的訊息。一般可以採用輪詢或者key hash取餘等策略來將同一個主題的訊息分配到不同的佇列中。

與之對應的消費者一般都有組的概念Consumer Group, 即消費者都是屬於某個消費組的。一條訊息會發往多個訂閱了這個主題的消費組。

假設現在有兩個消費組分別是Group 1Group 2,它們都訂閱了Topic-a。此時有一條訊息發往Topic-a,那麼這兩個消費組都能接收到這條訊息。

然後這條訊息實際是寫入Topic某個佇列中,消費組中的某個消費者對應消費一個佇列的訊息。

在物理上除了副本拷貝之外,一條訊息在Broker中只會有一份,每個消費組會有自己的offset即消費點位來標識消費到的位置。在消費點位之前的訊息表明已經消費過了。當然這個offset是佇列級別的。每個消費組都會維護訂閱的Topic下的每個佇列的offset

來個圖看看應該就很清晰了。

如何保證訊息不丟失

就我們市面上常見的訊息佇列而言,只要配置得當,我們的訊息就不會丟。

先來看看這個圖,

生產訊息

生產者傳送訊息至Broker,需要處理Broker的響應,不論是同步還是非同步傳送訊息,同步和非同步回撥都需要做好try-catch,妥善的處理響應,如果Broker返回寫入失敗等錯誤訊息,需要重試傳送。當多次傳送失敗需要作報警,日誌記錄等。

這樣就能保證在生產訊息階段訊息不會丟失。

儲存訊息

儲存訊息階段需要在訊息刷盤之後再給生產者響應,假設訊息寫入快取中就返回響應,那麼機器突然斷電這訊息就沒了,而生產者以為已經發送成功了。

如果Broker是叢集部署,有多副本機制,即訊息不僅僅要寫入當前Broker,還需要寫入副本機中。那配置成至少寫入兩臺機子後再給生產者響應。這樣基本上就能保證儲存的可靠了。一臺掛了還有一臺還在呢(假如怕兩臺都掛了..那就再多些)。

消費訊息

這裡經常會有同學犯錯,有些同學當消費者拿到訊息之後直接存入記憶體佇列中就直接返回給Broker消費成功,這是不對的。

你需要考慮拿到訊息放在記憶體之後消費者就宕機了怎麼辦。所以我們應該在消費者真正執行完業務邏輯之後,再發送給Broker消費成功,這才是真正的消費了。

所以只要我們在訊息業務邏輯處理完成之後再給Broker響應,那麼消費階段訊息就不會丟失。

小結一下

可以看出,保證訊息的可靠性需要三方配合

生產者需要處理好Broker的響應,出錯情況下利用重試、報警等手段。

Broker需要控制響應的時機,單機情況下是訊息刷盤後返回響應,叢集多副本情況下,即傳送至兩個副本及以上的情況下再返回響應。

消費者需要在執行完真正的業務邏輯之後再返回響應給Broker

但是要注意訊息可靠性增強了,效能就下降了,等待訊息刷盤、多副本同步後返回都會影響效能。因此還是看業務,例如日誌的傳輸可能丟那麼一兩條關係不大,因此沒必要等訊息刷盤再響應。

如何處理重複訊息

我們先來看看能不能避免訊息的重複。

假設我們傳送訊息,就管發,不管Broker的響應,那麼我們發往Broker是不會重複的。

但是一般情況我們是不允許這樣的,這樣訊息就完全不可靠了,我們的基本需求是訊息至少得發到Broker上,那就得等Broker的響應,那麼就可能存在Broker已經寫入了,當時響應由於網路原因生產者沒有收到,然後生產者又重發了一次,此時訊息就重複了。

再看消費者消費的時候,假設我們消費者拿到訊息消費了,業務邏輯已經走完了,事務提交了,此時需要更新Consumer offset了,然後這個消費者掛了,另一個消費者頂上,此時Consumer offset還沒更新,於是又拿到剛才那條訊息,業務又被執行了一遍。於是訊息又重複了。

可以看到正常業務而言訊息重複是不可避免的,因此我們只能從另一個角度來解決重複訊息的問題。

關鍵點就是冪等。既然我們不能防止重複訊息的產生,那麼我們只能在業務上處理重複訊息所帶來的影響。

冪等處理重複訊息

冪等是數學上的概念,我們就理解為同樣的引數多次呼叫同一個介面和呼叫一次產生的結果是一致的。

例如這條 SQLupdate t1 set money = 150 where id = 1 and money = 100;執行多少遍money都是150,這就叫冪等。

因此需要改造業務處理邏輯,使得在重複訊息的情況下也不會影響最終的結果。

可以通過上面我那條 SQL 一樣,做了個前置條件判斷,即money = 100情況,並且直接修改,更通用的是做個version即版本號控制,對比訊息中的版本號和資料庫中的版本號。

或者通過資料庫的約束例如唯一鍵,例如insert into update on duplicate key...

或者記錄關鍵的key,比如處理訂單這種,記錄訂單ID,假如有重複的訊息過來,先判斷下這個ID是否已經被處理過了,如果沒處理再進行下一步。當然也可以用全域性唯一ID等等。

基本上就這麼幾個套路,真正應用到實際中還是得看具體業務細節

如何保證訊息的有序性

有序性分:全域性有序和部分有序

全域性有序

如果要保證訊息的全域性有序,首先只能由一個生產者往Topic傳送訊息,並且一個Topic內部只能有一個佇列(分割槽)。消費者也必須是單執行緒消費這個佇列。這樣的訊息就是全域性有序的!

不過一般情況下我們都不需要全域性有序,即使是同步MySQL Binlog也只需要保證單表訊息有序即可。

部分有序

因此絕大部分的有序需求是部分有序,部分有序我們就可以將Topic內部劃分成我們需要的佇列數,把訊息通過特定的策略發往固定的佇列中,然後每個佇列對應一個單執行緒處理的消費者。這樣即完成了部分有序的需求,又可以通過佇列數量的併發來提高訊息處理效率。

如果處理訊息堆積

訊息的堆積往往是因為生產者的生產速度與消費者的消費速度不匹配。有可能是因為訊息消費失敗反覆重試造成的,也有可能就是消費者消費能力弱,漸漸地訊息就積壓了。

因此我們需要先定位消費慢的原因,如果是bug則處理bug,如果是因為本身消費能力較弱,我們可以優化下消費邏輯,比如之前是一條一條訊息消費處理的,這次我們批量處理,比如資料庫的插入,一條一條插和批量插效率是不一樣的。

假如邏輯我們已經都優化了,但還是慢,那就得考慮水平擴容了,增加Topic的佇列數和消費者數量,注意佇列數一定要增加,不然新增加的消費者是沒東西消費的。一個Topic中,一個佇列只會分配給一個消費者

當然你消費者內部是單執行緒還是多執行緒消費那看具體場景。不過要注意上面提高的訊息丟失的問題,如果你是將接受到的訊息寫入記憶體佇列之後,然後就返回響應給Broker,然後多執行緒向記憶體佇列消費訊息,假設此時消費者宕機了,記憶體佇列裡面還未消費的訊息也就丟了。