RabbitMQ Go客戶端教程4——路由
本文翻譯自RabbitMQ官網的Go語言客戶端系列教程,本文首發於我的個人部落格:liwenzhou.com,教程共分為六篇,本文是第四篇——路由。
這些教程涵蓋了使用RabbitMQ建立訊息傳遞應用程式的基礎知識。 你需要安裝RabbitMQ伺服器才能完成這些教程,請參閱安裝指南或使用Docker映象。 這些教程的程式碼是開源的,官方網站也是如此。
先決條件
本教程假設RabbitMQ已安裝並執行在本機上的標準埠(5672)。如果你使用不同的主機、埠或憑據,則需要調整連線設定。
路由
(使用Go RabbitMQ客戶端)
在上一教程中,我們構建了一個簡單的日誌記錄系統。我們能夠向許多接收者廣播日誌訊息。
在本教程中,我們將向它新增一個特性-我們將使它能夠只訂閱訊息的一個子集。例如,我們將只能將關鍵錯誤訊息定向到日誌檔案(以節省磁碟空間),同時仍然能夠在控制檯上列印所有日誌訊息。
繫結
在前面的示例中,我們已經在建立繫結。你可能會想起以下程式碼:
err=ch.QueueBind(
q.Name,//queuename
"",//routingkey
"logs",//exchange
false,
nil)
繫結是交換器和佇列之間的關係。這可以簡單地理解為:佇列對來自此交換器的訊息感興趣。
繫結可以採用額外的routing_key
引數。為了避免與Channel.Publish
引數混淆,我們將其稱為binding key
err=ch.QueueBind(
q.Name,//queuename
"black",//routingkey
"logs",//exchange
false,
nil)
繫結金鑰的含義取決於交換器的型別。我們以前使用的fanout
交換器只是忽略了這個值。
直連交換器
我們上一個教程中的日誌系統將所有訊息廣播給所有消費者。我們希望擴充套件這一點,允許根據訊息的嚴重性過濾訊息。例如,我們可能希望將日誌訊息寫入磁碟的指令碼只接收嚴重錯誤,而不會在warning或info日誌訊息上浪費磁碟空間。
我們使用fanout
交換器,這並沒有給我們很大的靈活性——它只能進行無腦廣播。
我們將使用direct
direct
交換器背後的路由演算法很簡單——訊息進入其binding key
與訊息的routing key
完全匹配的佇列。
為了說明這一點,請考慮以下設定:
direct-exchang
在此設定中,我們可以看到綁定了兩個佇列的direct
交換器X
。第一個佇列繫結鍵為orange
,第二個佇列繫結為兩個,一個繫結鍵為black
,另一個為green
。
在這種設定中,使用orange
路由鍵釋出到交換器的訊息將被路由到佇列Q1
。路由鍵為black
或green
的訊息將轉到Q2
。所有其他訊息將被丟棄。
多重繫結
direct-exchange-multiple
用相同的繫結鍵繫結多個佇列是完全合法的。在我們的示例中,我們可以使用繫結鍵black
在X
和Q1
之間新增繫結。在這種情況下,direct
交換器的行為將類似fanout
,並將訊息廣播到所有匹配的佇列。帶有black
路由鍵的訊息將同時傳遞給Q1
和Q2
。
傳送日誌
我們將在日誌系統中使用這個模型。我們將傳送訊息到direct
交換器,而不是fanout
。我們將提供嚴重性(譯註:通常我們使用日誌級別劃分日誌資訊的嚴重性)作為路由鍵。這樣,接收指令碼將能夠選擇其想要接收的日誌級別。讓我們首先關注傳送日誌。
與往常一樣,我們需要首先建立一個交換器:
err=ch.ExchangeDeclare(
"logs_direct",//name
"direct",//type
true,//durable
false,//auto-deleted
false,//internal
false,//no-wait
nil,//arguments
)
我們已經準備好傳送一條訊息:
err=ch.ExchangeDeclare(
"logs_direct",//name
"direct",//type
true,//durable
false,//auto-deleted
false,//internal
false,//no-wait
nil,//arguments
)
failOnError(err,"Failedtodeclareanexchange")
body:=bodyFrom(os.Args)
err=ch.Publish(
"logs_direct",//exchange
severityFrom(os.Args),//routingkey
false,//mandatory
false,//immediate
amqp.Publishing{
ContentType:"text/plain",
Body:[]byte(body),
})
為了簡化問題,我們假設“嚴重性”可以是“info”、“warning”、“error”之一。
訂閱
接收訊息的工作方式與上一教程一樣,但有一個例外——我們將為感興趣的每種嚴重性(日誌級別)建立一個新的繫結。
q,err:=ch.QueueDeclare(
"",//name
false,//durable
false,//deletewhenunused
true,//exclusive
false,//no-wait
nil,//arguments
)
failOnError(err,"Failedtodeclareaqueue")
iflen(os.Args)<2{
log.Printf("Usage:%s[info][warning][error]",os.Args[0])
os.Exit(0)
}
//建立多個繫結關係
for_,s:=rangeos.Args[1:]{
log.Printf("Bindingqueue%stoexchange%swithroutingkey%s",
q.Name,"logs_direct",s)
err=ch.QueueBind(
q.Name,//queuename
s,//routingkey
"logs_direct",//exchange
false,
nil)
failOnError(err,"Failedtobindaqueue")
}
完整示例
img
emit_log_direct.go
指令碼的程式碼:
packagemain
import(
"log"
"os"
"strings"
"github.com/streadway/amqp"
)
funcfailOnError(errerror,msgstring){
iferr!=nil{
log.Fatalf("%s:%s",msg,err)
}
}
funcmain(){
conn,err:=amqp.Dial("amqp://guest:guest@localhost:5672/")
failOnError(err,"FailedtoconnecttoRabbitMQ")
deferconn.Close()
ch,err:=conn.Channel()
failOnError(err,"Failedtoopenachannel")
deferch.Close()
err=ch.ExchangeDeclare(
"logs_direct",//name
"direct",//type
true,//durable
false,//auto-deleted
false,//internal
false,//no-wait
nil,//arguments
)
failOnError(err,"Failedtodeclareanexchange")
body:=bodyFrom(os.Args)
err=ch.Publish(
"logs_direct",//exchange
severityFrom(os.Args),//routingkey
false,//mandatory
false,//immediate
amqp.Publishing{
ContentType:"text/plain",
Body:[]byte(body),
})
failOnError(err,"Failedtopublishamessage")
log.Printf("[x]Sent%s",body)
}
funcbodyFrom(args[]string)string{
varsstring
if(len(args)<3)||os.Args[2]==""{
s="hello"
}else{
s=strings.Join(args[2:],"")
}
returns
}
funcseverityFrom(args[]string)string{
varsstring
if(len(args)<2)||os.Args[1]==""{
s="info"
}else{
s=os.Args[1]
}
returns
}
receive_logs_direct.go
的程式碼:
packagemain
import(
"log"
"os"
"github.com/streadway/amqp"
)
funcfailOnError(errerror,msgstring){
iferr!=nil{
log.Fatalf("%s:%s",msg,err)
}
}
funcmain(){
conn,err:=amqp.Dial("amqp://guest:guest@localhost:5672/")
failOnError(err,"FailedtoconnecttoRabbitMQ")
deferconn.Close()
ch,err:=conn.Channel()
failOnError(err,"Failedtoopenachannel")
deferch.Close()
err=ch.ExchangeDeclare(
"logs_direct",//name
"direct",//type
true,//durable
false,//auto-deleted
false,//internal
false,//no-wait
nil,//arguments
)
failOnError(err,"Failedtodeclareanexchange")
q,err:=ch.QueueDeclare(
"",//name
false,//durable
false,//deletewhenunused
true,//exclusive
false,//no-wait
nil,//arguments
)
failOnError(err,"Failedtodeclareaqueue")
iflen(os.Args)<2{
log.Printf("Usage:%s[info][warning][error]",os.Args[0])
os.Exit(0)
}
for_,s:=rangeos.Args[1:]{
log.Printf("Bindingqueue%stoexchange%swithroutingkey%s",
q.Name,"logs_direct",s)
err=ch.QueueBind(
q.Name,//queuename
s,//routingkey
"logs_direct",//exchange
false,
nil)
failOnError(err,"Failedtobindaqueue")
}
msgs,err:=ch.Consume(
q.Name,//queue
"",//consumer
true,//autoack
false,//exclusive
false,//nolocal
false,//nowait
nil,//args
)
failOnError(err,"Failedtoregisteraconsumer")
forever:=make(chanbool)
gofunc(){
ford:=rangemsgs{
log.Printf("[x]%s",d.Body)
}
}()
log.Printf("[*]Waitingforlogs.ToexitpressCTRL+C")
<-forever
}
如果你只想將“warning”和“err”(而不是“info”)級別的日誌訊息儲存到檔案中,只需開啟控制檯並輸入:
gorunreceive_logs_direct.gowarningerror>logs_from_rabbit.log
如果你想在螢幕上檢視所有日誌訊息,請開啟一個新終端並執行以下操作:
gorunreceive_logs_direct.goinfowarningerror
#=>[*]Waitingforlogs.ToexitpressCTRL+C
例如,要發出error
日誌訊息,只需輸入:
gorunemit_log_direct.goerror"Run.Run.Oritwillexplode."
#=>[x]Sent'error':'Run.Run.Oritwillexplode.'
(這裡是(emit_log_direct.go)和(receive_logs_direct.go)的完整原始碼)
繼續學習教程5,瞭解如何根據模式監聽訊息。