Rabbitmq詳解(基於go語言)
參考文件
RMQ的安裝和埠
手動安裝太麻煩,請自行百度。這裡只給出一種基於docker安裝的簡單形式。
docker run -d --hostname my-rabbit --name rmq -p 15672:15672 -p 5672:5672 -p 25672:25672 -e RABBITMQ_DEFAULT_USER=使用者名稱 -e RABBITMQ_DEFAULT_PASS=密碼 rabbitmq:3-management
通過命令可以看出,一共映射了三個埠,簡單說下這三個埠是幹什麼的。
5672:連線生產者、消費者的埠。
15672:WEB管理頁面的埠。
25672:分散式叢集的埠。
基礎概念
amqp:一種訊息中介軟體協議,RMQ是amqp協議的一個具體實現。RMQ使用Erlang語言實現的,具有很好的併發能力,具體歷史請百度,這裡主要關心怎麼用。
注:此圖不一定是最標準的,但比較形象,易於理解。
下面是圖中出現的單詞的詳細解釋:
Producer:生產者,負責生產訊息。
Connect:連線,生產者與RMQ Server之間建立的TCP連線。
Channel:通道,一條連線可包含多條通道,不同通道之間通訊互不干擾。考慮下多執行緒應用場景,每個執行緒對應一條通道,而不是對應一條連線,這樣可以提高效能。
body:訊息主體,要傳遞的資料。
exchange:交換器,負責把訊息轉發到對應的佇列。交換器本身沒有快取訊息的功能,訊息是在佇列中快取的,如果佇列不存在,則交換器會直接丟棄訊息。常用的有四種類型的交換器:direct、fanout、topic、headers。不同型別的交換器有不同的交換規則,交換器會根據交換規則把訊息轉發到對應的佇列。
exchangeName:交換器名稱,每個交換器對應一個名稱,傳送訊息時會附帶交換器名稱,根據交換器名稱選擇對應的交換器。
queue:佇列,用於快取訊息。
BandingKey:繫結鍵,一個佇列可以有一個到多個繫結鍵,通過繫結操作可以繫結交換器和佇列,交換器會根據繫結鍵的名稱找到對應的佇列。
RotingKey:路由鍵,傳送訊息時,需要附帶一條路由鍵,交換器會對路由鍵和繫結鍵進行匹配,如果匹配成功,則訊息會轉發到繫結鍵對應的佇列中。
Consumer:消費者,負責處理訊息。
交換器型別
注:這裡只關注fanout、direct、topic三種類型,header型別沒用過,不關注。
- fanout – 扇出型
用於支援釋出、訂閱模式(pub/sub)
交換器把訊息轉發到所有與之繫結的佇列中。
扇出型別交換器會遮蔽掉路由鍵、繫結鍵的作用。 direct – 直接匹配
用於支援路由模式(Routing)
直接匹配交換器會對比路由鍵和繫結鍵,如果路由鍵和繫結鍵完全相同,則把訊息轉發到繫結鍵所對應的佇列中。topic – 模式匹配
與直接匹配相對應,可以用一些模式來代替字串的完全匹配。
規則:
以 ‘.’ 來分割單詞。
‘#’ 表示一個或多個單詞。
‘*’ 表示一個單詞。
如:
RoutingKey為:
aaa.bbb.ccc
BindingKey可以為:
*.bbb.ccc
aaa.#
預設交換器
RMQ會自帶幾個交換器,簡單看下,這裡只介紹AMQP default。
當交換器名稱為空時,表示使用預設交換器。空的意思是空字串。
預設交換器是一個特殊的交換器,他無需進行繫結操作,可以以直接匹配的形式直接把訊息傳送到任何佇列中。
下圖兩種模式均採用了預設交換器:
兩個消費者從一個佇列取資料時,會產生競爭條件。此時訊息只能給其中的一個消費者。如果兩個消費者均沒有在收到訊息後做應答操作,則訊息會平均傳送給兩個消費者。如果收到訊息後做了應答操作,則會採取能者多勞的模式。
建立交換器
func (ch *Channel) ExchangeDeclare(name, kind string, durable, autoDelete, internal, noWait bool, args Table) error
name:交換器的名稱,對應圖中exchangeName。
kind:也叫作type,表示交換器的型別。有四種常用型別:direct、fanout、topic、headers。
durable:是否持久化,true表示是。持久化表示會把交換器的配置存檔,當RMQ Server重啟後,會自動載入交換器。
autoDelete:是否自動刪除,true表示是。至少有一條繫結才可以觸發自動刪除,當所有繫結都與交換器解綁後,會自動刪除此交換器。
internal:是否為內部,true表示是。客戶端無法直接傳送msg到內部交換器,只有交換器可以傳送msg到內部交換器。
noWait:是否非阻塞,true表示是。阻塞:表示建立交換器的請求傳送後,阻塞等待RMQ Server返回資訊。非阻塞:不會阻塞等待RMQ Server的返回資訊,而RMQ Server也不會返回資訊。(不推薦使用)
args:直接寫nil,沒研究過,不解釋。
建立佇列
func (ch *Channel) QueueDeclare(name string, durable, autoDelete, exclusive, noWait bool, args Table) (Queue, error)
name:佇列名稱
durable:是否持久化,true為是。持久化會把佇列存檔,伺服器重啟後,不會丟失佇列以及佇列內的資訊。(注:1、不丟失是相對的,如果宕機時有訊息沒來得及存檔,還是會丟失的。2、存檔影響效能。)
autoDelete:是否自動刪除,true為是。至少有一個消費者連線到佇列時才可以觸發。當所有消費者都斷開時,佇列會自動刪除。
exclusive:是否設定排他,true為是。如果設定為排他,則佇列僅對首次宣告他的連線可見,並在連線斷開時自動刪除。(注意,這裡說的是連線不是通道,相同連線不同通道是可見的)。
nowait:是否非阻塞,true表示是。阻塞:表示建立交換器的請求傳送後,阻塞等待RMQ Server返回資訊。非阻塞:不會阻塞等待RMQ Server的返回資訊,而RMQ Server也不會返回資訊。(不推薦使用)
args:直接寫nil,沒研究過,不解釋。
佇列繫結
func (ch *Channel) QueueBind(name, key, exchange string, noWait bool, args Table) error
name:佇列名稱
key:對應圖中BandingKey,表示要繫結的鍵。
exchange:交換器名稱
nowait:是否非阻塞,true表示是。阻塞:表示建立交換器的請求傳送後,阻塞等待RMQ Server返回資訊。非阻塞:不會阻塞等待RMQ Server的返回資訊,而RMQ Server也不會返回資訊。(不推薦使用)
args:直接寫nil,沒研究過,不解釋。
交換器繫結
func (ch *Channel) ExchangeBind(destination, key, source string, noWait bool, args Table) error
源交換器根據路由鍵&繫結鍵把msg轉發到目的交換器。
destination:目的交換器,通常是內部交換器。
key:對應圖中BandingKey,表示要繫結的鍵。
source:源交換器。
nowait:是否非阻塞,true表示是。阻塞:表示建立交換器的請求傳送後,阻塞等待RMQ Server返回資訊。非阻塞:不會阻塞等待RMQ Server的返回資訊,而RMQ Server也不會返回資訊。(不推薦使用)
args:直接寫nil,沒研究過,不解釋。
傳送訊息
func (ch *Channel) Publish(exchange, key string, mandatory, immediate bool, msg Publishing) error
exchange:要傳送到的交換機名稱,對應圖中exchangeName。
key:路由鍵,對應圖中RoutingKey。
mandatory:直接false,不建議使用,後面有專門章節講解。
immediate :直接false,不建議使用,後面有專門章節講解。
msg:要傳送的訊息,msg對應一個Publishing結構,Publishing結構裡面有很多引數,這裡只強調幾個引數,其他引數暫時列出,但不解釋。
# cat $(find ./amqp) | grep -rin type.*Publishing
type Publishing struct {
Headers Table
// Properties
ContentType string //訊息的型別,通常為“text/plain”
ContentEncoding string //訊息的編碼,一般預設不用寫
DeliveryMode uint8 //訊息是否持久化,2表示持久化,0或1表示非持久化。
Body []byte //訊息主體
Priority uint8 //訊息的優先順序 0 to 9
CorrelationId string // correlation identifier
ReplyTo string // address to to reply to (ex: RPC)
Expiration string // message expiration spec
MessageId string // message identifier
Timestamp time.Time // message timestamp
Type string // message type name
UserId string // creating user id - ex: "guest"
AppId string // creating application id
}
接受訊息 – 推模式
RMQ Server主動把訊息推給消費者
func (ch *Channel) Consume(queue, consumer string, autoAck, exclusive, noLocal, noWait bool, args Table) (<-chan Delivery, error)
queue:佇列名稱。
consumer:消費者標籤,用於區分不同的消費者。
autoAck:是否自動回覆ACK,true為是,回覆ACK表示高速伺服器我收到訊息了。建議為false,手動回覆,這樣可控性強。
exclusive:設定是否排他,排他表示當前佇列只能給一個消費者使用。
noLocal:如果為true,表示生產者和消費者不能是同一個connect。
nowait:是否非阻塞,true表示是。阻塞:表示建立交換器的請求傳送後,阻塞等待RMQ Server返回資訊。非阻塞:不會阻塞等待RMQ Server的返回資訊,而RMQ Server也不會返回資訊。(不推薦使用)
args:直接寫nil,沒研究過,不解釋。
注意下返回值:返回一個<- chan Delivery型別,遍歷返回值,有訊息則往下走, 沒有則阻塞。
接受訊息 – 拉模式
消費者主動從RMQ Server拉訊息
func (ch *Channel) Get(queue string, autoAck bool) (msg Delivery, ok bool, err error)
queue:佇列名稱
autoAck:是否開啟自動回覆。
手動回覆訊息
func (ch *Channel) Ack(tag uint64, multiple bool) error
func (me Delivery) Ack(multiple bool) error {
if me.Acknowledger == nil {
return errDeliveryNotInitialized
}
return me.Acknowledger.Ack(me.DeliveryTag, multiple)
}
func (d Delivery) Reject(requeue bool) error
簡單看一眼,函式2呼叫了函式1,本質上兩個函式沒區別。
這裡推薦使用第二個,因為方便。
另外說一下multiple引數。true表示回覆當前通道所有未回覆的ack,用於批量確認。false表示回覆當前條目。
函式三:
拒絕本條訊息。如果requeue為true,則RMQ會把這條訊息重新加入佇列,如果requeue為false,則RMQ會丟棄本條訊息。
注:推薦手動回覆,儘量不要使用autoACK,因autoACK不可控。
關閉連線
func (ch *Channel) Close() error
func (c *Connection) Close() error
簡單看下,不解釋了,按照流程最好有關閉一下,其實不關閉也沒啥事。。
Publish – mandatory引數
false:當訊息無法通過交換器匹配到佇列時,會丟棄訊息。
true:當訊息無法通過交換器匹配到佇列時,會呼叫basic.return通知生產者。
注:不建議使用,因會使程式邏輯變得複雜,可以通過備用交換機來實現類似的功能。
Publish – immediate引數
true:當訊息到達Queue後,發現佇列上無消費者時,通過basic.Return返回給生產者。
false:訊息一直快取在佇列中,等待生產者。
注:不建議使用此引數,遇到這種情況,可用TTL和DLX方法代替(後面會介紹)。
備用交換機&TTL+DLX
藉助備用交換機、TTL+DLX代替mandatory、immediate方案:
1、P傳送msg給Ex,Ex無法把msg路由到Q,則會把路由轉發給ErrEx。
2、msg暫存在Q上之後,如果C不能及時消費msg,則msg會轉發到DlxEx。
3、TTL為msg在Q上的暫存時間,單位為毫秒。
通過設定引數,可以設定Ex的備用交換器ErrEx
建立Exchange時,指定Ex的Args – “alternate-exchange”:”ErrEx”。
其中ErrEx為備用交換器名稱通過設定引數,可以設定Q的DLX交換機DlxEX
建立Queue時,指定Q的Args引數:
“x-message-ttl”:0 //msg超時時間,單位毫秒
“x-dead-letter-exchange”:”dlxExchange” //DlxEx名稱
“x-dead-letter-routing-key”:”dlxQueue” //DlxEx路由鍵
持久化
持久化的作用是防止在重啟、關閉、宕機的情況下資料丟失,持久化是相對靠譜的,如果資料在記憶體中,沒來得及存檔就發生了重啟,那麼資料還是會丟失。
持久化可分為三類:
1、Exchange的持久化(建立時設定引數)
2、Queue的持久化(建立時設定引數)
3、Msg的持久化(傳送時設定Args)
Qos
func (ch *Channel) Qos(prefetchCount, prefetchSize int, global bool) error
注意:這個在推送模式下非常重要,通過設定Qos用來防止訊息堆積。
prefetchCount:消費者未確認訊息的個數。
prefetchSize :消費者未確認訊息的大小。
global :是否全域性生效,true表示是。全域性生效指的是針對當前connect裡的所有channel都生效。
封裝思路
簡單說下,封裝是因為這東西引數太多了,不夠好用。所以又包了一層,讓程式碼看起來更簡潔一些,思路上參考了spring-rmq庫,採用json或者xml配置檔案來描述rmq中的各個元件。這樣看起來會舒服些。
封裝的參考程式碼:
https://gitee.com/vrg0/go-rabbitmq.git
注:喵自己封裝的,沒上過生產環境,功能上也不算特別全,好不好用也有待考驗,姑且可當做一個參考吧。。。