全網最全RabbitMQ總結,別再說你不會RabbitMQ
RabbitMQ入門教程
當初我學RabbitMQ的時候,第一時間就上GitHub找相應的教程,但是令我很失望的是沒有找到,Spring,Mybatis之類的教程很多,而RabbitMQ的教程幾乎找不到,看的最多的就是朱小廝大佬的部落格。後來想著索性自己總結一下吧,有不恰當的地方歡迎小夥伴指出。
這篇文章主要是對著我在GitHub上的原始碼解釋的,因此本文並沒有太多的原始碼。寫了挺長時間的,為了防止迷路,歡迎大家star和fork
github地址:https://github.com/erlieStar/rabbitmq-examples
前言
我們先來看一下一條訊息在RabbitMQ中的流轉過程
圖示的主要流程如下
- 生產者傳送訊息的時候指定RoutingKey,然後訊息被髮送到Exchange
- Exchange根據一些列規則將訊息路由到指定的佇列中
- 消費者從佇列中消費訊息
整個流程主要就4個參與者message,exchange,queue,consumer,我們就來認識一下這4個參與者
Message
訊息可以設定一些列屬性,每種屬性的作用可以參考《深入RabbitMQ》一書
屬性名 | 用處 |
---|---|
contentType | 訊息體的MIME型別,如application/json |
contentEncoding | 訊息的編碼型別,如是否壓縮 |
messageId | 訊息的唯一性標識,由應用進行設定 |
correlationId | 一般用作關聯訊息的message-id,常用於訊息的響應 |
timestamp | 訊息的建立時刻,整型,精確到秒 |
expiration | 訊息的過期時刻,字串,但是呈現格式為整型,精確到秒 |
deliveryMode | 訊息的持久化型別 ,1為非持久化,2為持久化,效能影響巨大 |
appId | 應用程式的型別和版本號 |
userId | 標識已登入使用者,極少使用 |
type | 訊息型別名稱,完全由應用決定如何使用該欄位 |
replyTo | 構建回覆訊息的私有響應佇列 |
headers | 鍵/值對錶,使用者自定義任意的鍵和值 |
priority | 指定佇列中訊息的優先順序 |
Exchange
接收訊息,並根據路由鍵轉發訊息到所繫結的佇列,常用的屬性如下
交換機屬性 | 型別 |
---|---|
name | 交換器名稱 |
type | 交換器型別,有如下四種,direct,topic,fanout,headers |
durability | 是否需要持久化,true為持久化。持久化可以將交換器存檔,在伺服器重啟的時候不會丟失相關資訊 |
autoDelete | 與這個Exchange繫結的Queue或Exchange都與此解綁時,會刪除本交換器 |
internal | 設定是否內建,true為內建。如果是內建交換器,客戶端無法傳送訊息到這個交換器中,只能通過交換器路由到交換器這種方式 |
argument | 其他一些結構化引數 |
我們最常使用的就是type屬性,下面就詳細解釋type屬性
Fanout Exchange
傳送到該交換機的訊息都會路由到與該交換機繫結的所有佇列上,可以用來做廣播
不處理路由鍵,只需要簡單的將佇列繫結到交換機上
Fanout交換機轉發訊息是最快的
Direct Exchage
把訊息路由到BindingKey和RoutingKey完全匹配的佇列中
Topic Exchange
前面說到,direct型別的交換器路由規則是完全匹配RoutingKey和BindingKey。topic和direct類似,也是將訊息傳送到RoutingKey和BindingKey相匹配的佇列中,只不過可以模糊匹配。
- RoutinKey為一個被“.”號分割的字串(如com.rabbitmq.client)
- BindingKey和RoutingKey也是“.”號分割的字串
- BindKey中可以存在兩種特殊字串“*”和“#”,用於做模糊匹配,其中“*”用於匹配不多不少一個詞,“#”用於匹配多個單詞(包含0個,1個)
BindIngKey | 能夠匹配到的RoutingKey |
---|---|
java.# | java.lang,java.util, java.util.concurrent |
java.* | java.lang,java.util |
*.*.uti | com.javashitang.util,org.spring.util |
假如現在有2個RoutingKey為java.lang和java.util.concurrent的訊息,java.lang會被路由到Consumer1和Consumer2,java.util.concurrent會被路由到Consumer2。
Headers Exchange
headers型別的交換器不依賴於路由鍵的匹配規則來路由訊息,而是根據傳送訊息內容中的headers屬性進行匹配。headers型別的交換器效能差,不實用,基本上不會使用。
Queue
佇列的常見屬性如下
引數名 | 用處 |
---|---|
queue | 佇列的名稱 |
durable | 是否持久化,true為持久化。持久化的佇列會存檔,在伺服器重啟的時候可以保證不丟失相關資訊 |
exclusive | 設定是否排他,true為排他。如果一個佇列被宣告為排他佇列,該佇列僅對首次宣告他它的連線可見,並在連線斷開時自動刪除(即一個佇列只能有一個消費者) |
autoDelete | 設定是否自動刪除,true為自動刪除,自動刪除的前提是,至少一個消費者連線到這個佇列,之後所有與這個連線的消費者都斷開時,才會自動刪除 |
arguments | 設定佇列的其他引數,如x-message-ttl,x-max-length |
arguments中可以設定的佇列的常見引數如下
引數名 | 目的 |
---|---|
x-dead-letter-exchange | 死信交換器 |
x-dead-letter-routing-key | 死信訊息的可選路由鍵 |
x-expires | 佇列在指定毫秒數後被刪除 |
x-ha-policy | 建立HA佇列 |
x-ha-nodes | HA佇列的分佈節點 |
x-max-length | 佇列的最大訊息數 |
x-message-ttl | 毫秒為單位的訊息過期時間,佇列級別 |
x-max-priority | 最大優先值為255的佇列優先排序功能 |
rabbitmq-api(rabbitmq api的使用)
chapter_1: 快速開始,手寫一個RabbitMQ的生產者和消費者
chapter_2: 演示了各種exchange的使用
來回顧一下上面說的各種exchange機器路由規則
交換器型別 | 路由規則 |
---|---|
fanout | 傳送到該交換機的訊息都會路由到與該交換機繫結的所有佇列上,可以用來做廣播 |
direct | 把訊息路由到BindingKey和RoutingKey完全匹配的佇列中 |
topic | topic和direct類似,也是將訊息傳送到RoutingKey和BindingKey相匹配的佇列中,只不過可以模糊匹配 |
headers | 效能差,基本不會使用 |
chapter_3: 拉取訊息
訊息的獲得方式有2種
拉取訊息(get message)
推送訊息(consume message)
那我們應該拉取訊息還是推送訊息?get是一個輪詢模型,而consumer是一個推送模型。get模型會導致每條訊息都會產生與RabbitMQ同步通訊的開銷,這一個請求由傳送請求幀的客戶端應用程式和傳送應答的RabbitMQ組成。所以推送訊息,避免拉取
chapter_4: 手動ack
訊息的確認方式有2種
- 自動確認(autoAck=true)
- 手動確認(autoAck=false)
消費者在消費訊息的時候,可以指定autoAck引數
String basicConsume(String queue, boolean autoAck, Consumer callback)
autoAck=false: RabbitMQ會等待消費者顯示回覆確認訊息後才從記憶體(或者磁碟)中移出訊息
autoAck=true: RabbitMQ會自動把傳送出去的訊息置為確認,然後從記憶體(或者磁碟)中刪除,而不管消費者是否真正的消費了這些訊息
手動確認的方法如下,有2個引數
basicAck(long deliveryTag, boolean multiple)
deliveryTag: 用來標識通道中投遞的訊息。RabbitMQ 推送訊息給Consumer時,會附帶一個deliveryTag,以便Consumer可以在訊息確認時告訴RabbitMQ到底是哪條訊息被確認了。
RabbitMQ保證在每個通道中,每條訊息的deliveryTag從1開始遞增
multiple=true: 訊息id<=deliveryTag的訊息,都會被確認
myltiple=false: 訊息id=deliveryTag的訊息,都會被確認
訊息一直不確認會發生啥?
如果佇列中的訊息傳送到消費者後,消費者不對訊息進行確認,那麼訊息會一直留在佇列中,直到確認才會刪除。
如果傳送到A消費者的訊息一直不確認,只有等到A消費者與rabbitmq的連線中斷,rabbitmq才會考慮將A消費者未確認的訊息重新投遞給另一個消費者
chapter_5: 拒絕訊息的兩種方式
確認訊息只有一種方法
- basicAck(long deliveryTag, boolean multiple)
而拒絕訊息有兩種方式
basicNack(long deliveryTag, boolean multiple, boolean requeue)
basicReject(long deliveryTag, boolean requeue)
basicNack和basicReject的區別只有一個,basicNack支援批量拒絕
deliveryTag和multiple引數前面已經說過。
requeue=true: 訊息會被再次傳送到佇列中
requeue=false: 訊息會被直接丟失
chapter_6: 失敗通知
chapter_6到chapter_10主要簡述了訊息釋出時的權衡
我們最常用的就是失敗通知和釋出者確認
當訊息不能被路由到某個queue時,我們如何獲取到不能正確路由的訊息呢?
- 在傳送訊息時設定mandatory為true
- 生產者可以通過呼叫channel.addReturnListener來新增ReturnListener監聽器獲取沒有被路由到佇列中的訊息
mandatory是channel.basicPublish()方法中的引數
mandatory=true: 交換器無法根據路由鍵找到一個符合條件的佇列,那麼RabbitMQ會呼叫Basic.Return命令將訊息返回給生產者
mandatory=false: 出現上述情形,則訊息直接被丟棄
chapter_7: 釋出者確認
當訊息被髮送後,訊息到底有沒有到達exchange呢?預設情況下生產者是不知道訊息有沒有到達exchange
RabbitMQ針對這個問題,提供了兩種解決方式
- 事務(後面會講到)
- 釋出者確認(publisher confirm)
而釋出者確認有三種程式設計方式
- 普通confirm模式:每傳送一條訊息後,呼叫waitForConfirms()方法,等待伺服器端confirm。實際上是一種序列confirm了。
- 批量confirm模式:每傳送一批訊息後,呼叫waitForConfirms()方法,等待伺服器端confirm。
- 非同步confirm模式:提供一個回撥方法,服務端confirm了一條或者多條訊息後Client端會回撥這個方法。
非同步confirm模式的效能最高,因此經常使用,我想把這個分享的細一下
channel.addConfirmListener(new ConfirmListener() {
@Override
public void handleAck(long deliveryTag, boolean multiple) throws IOException {
log.info("handleAck, deliveryTag: {}, multiple: {}", deliveryTag, multiple);
}
@Override
public void handleNack(long deliveryTag, boolean multiple) throws IOException {
log.info("handleNack, deliveryTag: {}, multiple: {}", deliveryTag, multiple);
}
});
寫過非同步confirm程式碼的小夥伴應該對這段程式碼不陌生,可以看到這裡也有deliveryTag和multiple。但是我要說的是這裡的deliveryTag和multiple和訊息的ack沒有一點關係。
confirmListener中的ack: rabbitmq控制的,用來確認訊息是否到達exchange
訊息的ack: 上面說到可以自動確認,也可以手動確認,用來確認queue中的訊息是否被consumer消費
chapter_8: 備用交換器
生產者在傳送訊息的時候如果不設定 mandatory 引數那麼訊息在未被路由到queue的情況下將會丟失,如果設定了 mandatory 引數,那麼需要新增 ReturnListener 的程式設計邏輯,生產者的程式碼將變得複雜。如果既不想複雜化生產者的程式設計邏輯,又不想訊息丟失,那麼可以使用備用交換器,這樣可以將未被路由到queue的訊息儲存在RabbitMQ 中,在需要的時候去處理這些訊息
chapter_9: 事務
RabbitMQ中與事務機制相關的方法有3個
方法 | 解釋 |
---|---|
channel.txSelect() | 將當前的通道設定成事務模式 |
channel.txCommit() | 提交事務 |
channel.txRollback() | 回滾事務 |
訊息成功被髮送到RabbitMQ的exchange上,事務才能提交成功,否則便可在捕獲異常之後進行事務回滾,與此同時可以進行訊息重發
因為事務會榨乾RabbitMQ的效能,所以一般使用釋出者確認代替事務
chapter_10: 訊息持久化
訊息做持久化,只需要將訊息屬性的delivery-mode設定為2即可
RabbitMQ給我們封裝了這個屬性,即MessageProperties.PERSISTENT_TEXT_PLAIN,
詳細使用可以參考github的程式碼
當我們想做訊息的持久化時,最好同時設定佇列和訊息的持久化,因為只設置佇列的持久化,重啟之後訊息會丟失。只設置佇列的持久化,重啟後佇列消失,繼而訊息也丟失
chapter_11: 死信佇列
DLX,全稱為Dead-Letter-Exchange,稱之為死信交換器。當一個訊息在佇列中變成死信(dead message)之後,它能被重新發送到另一個交換器中,這個交換器就是DLX,繫結DLX的佇列就稱之為死信佇列。
DLX也是一個正常的交換器,和一般的交換器沒有區別,實際上就是設定某個佇列的屬性
訊息變成死信一般是由於以下幾種情況
- 訊息被拒絕(Basic.Reject/Basic.Nack)且不重新投遞(requeue=false)
- 訊息過期
- 佇列達到最大長度
死信交換器和備用交換器的區別
備用交換器: 1.訊息無法路由時轉到備用交換器 2.備用交換器是在宣告主交換器的時候定義的
死信交換器: 1.訊息已經到達佇列,但是被消費者拒絕等的訊息會轉到死信交換器。2.死信交換器是在宣告佇列的時候定義的
chapter_12: 流量控制(服務質量保證)
qos即服務端限流,qos對於拉模式的消費方式無效
使用qos只要進行如下2個步驟即可
autoAck設定為false(autoAck=true的時候不生效)
呼叫basicConsume方法前先呼叫basicQos方法,這個方法有3個引數
basicQos(int prefetchSize, int prefetchCount, boolean global)
引數名 | 含義 |
---|---|
prefetchSize | 批量取的訊息的總大小,0為不限制 |
prefetchCount | 消費完prefetchCount條(prefetchCount條訊息被ack)才再次推送 |
global | global為true表示對channel進行限制,否則對每個消費者進行限制,因為一個channel允許有多個消費者 |
為什麼要使用qos?
提高服務穩定性。假設消費端有一段時間不可用,導致佇列中有上萬條未處理的訊息,如果開啟客戶端,
巨量的訊息推送過來,可能會導致消費端變卡,也有可能直接不可用,所以服務端限流很重要提高吞吐量。當佇列有多個消費者時,佇列收到的訊息以輪詢的方式傳送給消費者。但由於機器效能等的原因,每個消費者的消費能力不一樣,
這就會導致一些消費者處理完了消費的訊息,而另一些則還堆積了一些訊息,會造成整體應用吞吐量的下降
springboot-rabbitmq(springboot整合rabbitmq)
未完待續
聯絡我
email: [email protected]
歡迎大家和我交流,關注公眾號Java識堂獲取我的聯絡方式
相關推薦
全網最全RabbitMQ總結,別再說你不會RabbitMQ
RabbitMQ入門教程 當初我學RabbitMQ的時候,第一時間就上GitHub找相應的教程,但是令我很失望的是沒有找到,Spring,Mybatis之類的教程很多,而RabbitMQ的教程幾乎找不到,看的最多的就是朱小廝大佬的部落格。後來想著索性自己總結一下吧,有不恰當的地方歡迎小夥伴指出。 這篇文章
最全的汽車儀表指示燈解讀,別再說你不認識了!
ref 認識 jpeg 駕駛 class 技術 車主 pan 方便 最全的汽車儀表指示燈解讀,別再說你不認識了! 你汽車上的汽車故障警報燈可能有上百種之多,越是高端的車型故障燈的種類更多。車主在平常駕駛,有時會發現車內有些燈突然亮了,頓時不知所措。尤其對於新手司
Java最經典知識點總結,看完你都記住的了嗎?
1實現多執行緒的方式有幾種? 其實這個問題並不難,只是在這裡做一個總結。一共有三種。 實現Runnable介面,並實現該介面的run()方法 繼承Thread類,重寫run()方法 實現Callable介面,實現call()方法。 大家可能對前兩種已經很清楚了,重點
Office打開慢,怕是你不會設置吧?
鏈接 nbsp office 性能 問題 mage com 原因 content Office軟件大家每天都要使用,不過在使用的過程中,最尷尬的莫過於打開慢,用著卡,想砸電腦。一般來講,Office問題集中在以下幾點原因: 1、Office加載項過多 2、Office新
全網把Map中的hash()分析的最透徹的文章,別無二家。
nbsp -i lin 相等 分布 原因 單向鏈表 mas ret 你知道HashMap中hash方法的具體實現嗎?你知道HashTable、ConcurrentHashMap中hash方法的實現以及原因嗎?你知道為什麽要這麽實現嗎?你知道為什麽JDK 7和JDK 8中ha
Python3 視頻教程,全網最全的視頻教程,爬蟲,從入門到實戰
python3 python基礎 入門到 分享 視頻 pst https size 分布式爬蟲 需要聯系我:QQ:1844912514 最新Python基礎班+就業班視頻教程 鏈接: python分布式爬蟲打造搜索引擎鏈接: https://pan.baidu.com/
Android 劉海屏來襲,全網最全適配技巧
Apple 一直在引領設計的潮流,自從 iPhone X 釋出之後,"劉海屏" 就一直存在爭議。不過不管你怎樣,Android 也要躋入 "劉海屏" 的行列,尤其是 Android P 釋出之後,也從系統級支援頂部凹槽螢幕設計。 很多廠商也在逐漸推出 “劉海屏” 設計的手機,在國內比較常見的
作業系統最全知識點總結(找工作,考研必備)
作為非科班出身的計算機愛好者,拜讀了作業系統書,刷了王道考研作業系統,收益頗豐,現將知識點總結如下,供找工作、考研的小夥伴們拿來作為作業系統複習提綱。 程序 記憶體 檔案 IO Xmind內容:http://pan.baidu.com/s/1i38bvzJ 密碼:b
2017已來,最全面試總結——這些Android面試題你一定需要
三金四銀,又到了一年一度的跳槽季。也許有不少Android程式設計師開始摩拳擦掌蠢蠢欲動了。結合以往自己的經歷,今天給大家總結下Android面試題,希望有幫助。 本文轉自:(http://www.jianshu.com/p/a22450882af2) 1:Activ
[乾貨]2017已來,最全面試總結——這些Android面試題你一定需要
前言 來年發完年終獎。也許有不少Android程式設計師開始摩拳擦掌蠢蠢欲動了。結合以往自己的經歷,今天給大家總結下Android面試題,希望有幫助。 01Activity生命週期? 這幾乎是個老少咸宜,永遠不會過
React Native 效能優化指南【全網最全,值得收藏】
2020 年談 React Native,在日新月異的前端圈,可能算比較另類了。文章動筆之前我也猶豫過,但是想到寫技術文章又不是趕時髦,啥新潮寫啥,所以還是動筆寫了這篇 React Native 效能優化的文章。 本文談到的 React Native 效能優化,還沒到修改 React Native 原始碼
NodeJS——大彙總(一)(只需要使用這些東西,就能處理80%以上業務需求,全網最全node解決方案,吐血整理)
# 一、前言 ## 本文目標 > 本文是博主總結了之前的自己在做的很多個專案的一些知識點,當然我在這裡不會過多的講解業務的流程,而是建立一個小demon,旨在幫助大家去更加高效 更加便捷的生成自己的node後臺介面專案,本文底部提供了一個 藍圖,歡迎大家下載,start,實際上,這樣的一套思路打下來,基本上就
教師節快樂,全網最全程式設計學習網站彙總來了,還不趕快收藏
教師節快樂! 程式設計師是一個需要不斷學習的職業。幸運的是,在這個網際網路時代,知識就在那裡,等著我們去獲取。 作為一個“收藏從未停止,學習從未開始”的博主,秉承著好東西不能獨享的態度,把收藏的學習網站整理分享出來,希望大家不要學我,一定要好好學習,天天進步,升職加薪
多目標跟蹤全解析,全網最全
與多目標跟蹤(Multiple Object Tracking簡稱MOT)對應的是單目標跟蹤(Single Object Tracking簡稱SOT),按照字面意思來理解,前者是對連續視訊畫面中多個目標進行跟蹤,後者是對連續視訊畫面中單個目標進行跟蹤。由於大部分應用場景都涉及到多個目標的跟蹤,因此多目標跟蹤也
【軟體測試 Python自動化】全網最全大廠面試題,看完以後你就是面試官!
前言 為了讓大家更好的理解和學習投入到Python自動化來找到一份好的資料也是學習過程中,非常重要的一個點。你的檢索能力越強,你就會越容易找到最合適你的資料。 有需要的小夥伴可以複製群號 313782132 這裡可免費領取! 暗號:部落格。 一、什麼是相容性測試?相容性測試側重哪些方面? 參考答案:
解決有關flask-socketio中服務端和客戶端回調函數callback參數的問題(全網最全)
分享圖片 ready 發現 ted doc 客戶端 event return 建立 由於工作當中需要用的flask_socketio,所以自己學習了一下如何使用,查閱了有關文檔,當看到回調函數callback的時候,發現文檔裏都描述的不太清楚,最後終於琢磨出來了,分享給有
Ubuntu 搭建Web服務器(MySQL+PHP+Apache)詳細教程 (全網最全)
rpm php7 grep host 存儲 需要 目錄 所有 提示 Ubuntu 搭建Web服務器(MySQL+PHP+Apache)詳細教程 (全網最全) 看了好多人的博客,有的不全 or 有問題,整理了一下,適合小白 新手先整理幾個小問題 1.為啥使用 Linux 搭建
[資源分享]不吹牛 全網!最全!!最新!!!最優質!!!!
好久沒有分享資源乾貨了,近期多途徑收集了很多優質的學習資源,包括Java、Python、Linux、前端、人工智慧等優質學習資源,來回饋所有的讀者朋友。保證全網最優質!!! 資源領取方式 關注公眾號「苦逼的碼農」回覆相應關鍵詞即可。 長按二維碼即可關注。
Ubuntu 搭建Web伺服器(MySQL+PHP+Apache)詳細教程 (全網最全)
Ubuntu 搭建Web伺服器(MySQL+PHP+Apache)詳細教程 (全網最全) 看了好多人的部落格,有的不全 or 有問題,整理了一下,適合小白 新手先整理幾個小問題 1.為啥使用 Linux 搭建伺服器? 一個是因為主流的雲伺服器太貴,買個低配的容易崩,聽說 Linux 比較穩定,之後 2.
[譯文]過猶不及,別再在程式設計中高射炮打蚊子
原文連結:Anyway,stop recommending bazookas to kill flies in programming. 眾成翻譯地址:過猶不及,別再在程式設計中高射炮打蚊子 譯者注:翻譯這篇吐槽的文章,主要是為了自省~日常工作中確實會犯類似的錯誤,不單是解答別人的時候,