1. 程式人生 > 實用技巧 >RabbitMQ Go客戶端教程4——路由

RabbitMQ Go客戶端教程4——路由

本文翻譯自RabbitMQ官網的Go語言客戶端系列教程,本文首發於我的個人部落格:liwenzhou.com,教程共分為六篇,本文是第四篇——路由。

這些教程涵蓋了使用RabbitMQ建立訊息傳遞應用程式的基礎知識。 你需要安裝RabbitMQ伺服器才能完成這些教程,請參閱安裝指南或使用Docker映象。 這些教程的程式碼是開源的,官方網站也是如此。

先決條件

本教程假設RabbitMQ已安裝並執行在本機上的標準埠(5672)。如果你使用不同的主機、埠或憑據,則需要調整連線設定。

路由

(使用Go RabbitMQ客戶端)

上一教程中,我們構建了一個簡單的日誌記錄系統。我們能夠向許多接收者廣播日誌訊息。

在本教程中,我們將向它新增一個特性-我們將使它能夠只訂閱訊息的一個子集。例如,我們將只能將關鍵錯誤訊息定向到日誌檔案(以節省磁碟空間),同時仍然能夠在控制檯上列印所有日誌訊息。

繫結

在前面的示例中,我們已經在建立繫結。你可能會想起以下程式碼:

err=ch.QueueBind(
q.Name,//queuename
"",//routingkey
"logs",//exchange
false,
nil)

繫結是交換器和佇列之間的關係。這可以簡單地理解為:佇列對來自此交換器的訊息感興趣。

繫結可以採用額外的routing_key引數。為了避免與Channel.Publish引數混淆,我們將其稱為binding key

。這是我們如何使用鍵建立繫結的方法:

err=ch.QueueBind(
q.Name,//queuename
"black",//routingkey
"logs",//exchange
false,
nil)

繫結金鑰的含義取決於交換器的型別。我們以前使用的fanout交換器只是忽略了這個值。

直連交換器

我們上一個教程中的日誌系統將所有訊息廣播給所有消費者。我們希望擴充套件這一點,允許根據訊息的嚴重性過濾訊息。例如,我們可能希望將日誌訊息寫入磁碟的指令碼只接收嚴重錯誤,而不會在warning或info日誌訊息上浪費磁碟空間。

我們使用fanout交換器,這並沒有給我們很大的靈活性——它只能進行無腦廣播。

我們將使用direct

交換器。direct交換器背後的路由演算法很簡單——訊息進入其binding key與訊息的routing key完全匹配的佇列。

為了說明這一點,請考慮以下設定:

direct-exchang

在此設定中,我們可以看到綁定了兩個佇列的direct交換器X。第一個佇列繫結鍵為orange,第二個佇列繫結為兩個,一個繫結鍵為black,另一個為green

在這種設定中,使用orange路由鍵釋出到交換器的訊息將被路由到佇列Q1。路由鍵為blackgreen的訊息將轉到Q2。所有其他訊息將被丟棄。

多重繫結

direct-exchange-multiple

用相同的繫結鍵繫結多個佇列是完全合法的。在我們的示例中,我們可以使用繫結鍵blackXQ1之間新增繫結。在這種情況下,direct交換器的行為將類似fanout,並將訊息廣播到所有匹配的佇列。帶有black路由鍵的訊息將同時傳遞給Q1Q2

傳送日誌

我們將在日誌系統中使用這個模型。我們將傳送訊息到direct交換器,而不是fanout。我們將提供嚴重性(譯註:通常我們使用日誌級別劃分日誌資訊的嚴重性)作為路由鍵。這樣,接收指令碼將能夠選擇其想要接收的日誌級別。讓我們首先關注傳送日誌。

與往常一樣,我們需要首先建立一個交換器:

err=ch.ExchangeDeclare(
"logs_direct",//name
"direct",//type
true,//durable
false,//auto-deleted
false,//internal
false,//no-wait
nil,//arguments
)

我們已經準備好傳送一條訊息:

err=ch.ExchangeDeclare(
"logs_direct",//name
"direct",//type
true,//durable
false,//auto-deleted
false,//internal
false,//no-wait
nil,//arguments
)
failOnError(err,"Failedtodeclareanexchange")

body:=bodyFrom(os.Args)
err=ch.Publish(
"logs_direct",//exchange
severityFrom(os.Args),//routingkey
false,//mandatory
false,//immediate
amqp.Publishing{
ContentType:"text/plain",
Body:[]byte(body),
})

為了簡化問題,我們假設“嚴重性”可以是“info”、“warning”、“error”之一。

訂閱

接收訊息的工作方式與上一教程一樣,但有一個例外——我們將為感興趣的每種嚴重性(日誌級別)建立一個新的繫結。

q,err:=ch.QueueDeclare(
"",//name
false,//durable
false,//deletewhenunused
true,//exclusive
false,//no-wait
nil,//arguments
)
failOnError(err,"Failedtodeclareaqueue")

iflen(os.Args)<2{
log.Printf("Usage:%s[info][warning][error]",os.Args[0])
os.Exit(0)
}
//建立多個繫結關係
for_,s:=rangeos.Args[1:]{
log.Printf("Bindingqueue%stoexchange%swithroutingkey%s",
q.Name,"logs_direct",s)
err=ch.QueueBind(
q.Name,//queuename
s,//routingkey
"logs_direct",//exchange
false,
nil)
failOnError(err,"Failedtobindaqueue")
}

完整示例

img

emit_log_direct.go指令碼的程式碼:

packagemain

import(
"log"
"os"
"strings"

"github.com/streadway/amqp"
)

funcfailOnError(errerror,msgstring){
iferr!=nil{
log.Fatalf("%s:%s",msg,err)
}
}

funcmain(){
conn,err:=amqp.Dial("amqp://guest:guest@localhost:5672/")
failOnError(err,"FailedtoconnecttoRabbitMQ")
deferconn.Close()

ch,err:=conn.Channel()
failOnError(err,"Failedtoopenachannel")
deferch.Close()

err=ch.ExchangeDeclare(
"logs_direct",//name
"direct",//type
true,//durable
false,//auto-deleted
false,//internal
false,//no-wait
nil,//arguments
)
failOnError(err,"Failedtodeclareanexchange")

body:=bodyFrom(os.Args)
err=ch.Publish(
"logs_direct",//exchange
severityFrom(os.Args),//routingkey
false,//mandatory
false,//immediate
amqp.Publishing{
ContentType:"text/plain",
Body:[]byte(body),
})
failOnError(err,"Failedtopublishamessage")

log.Printf("[x]Sent%s",body)
}

funcbodyFrom(args[]string)string{
varsstring
if(len(args)<3)||os.Args[2]==""{
s="hello"
}else{
s=strings.Join(args[2:],"")
}
returns
}

funcseverityFrom(args[]string)string{
varsstring
if(len(args)<2)||os.Args[1]==""{
s="info"
}else{
s=os.Args[1]
}
returns
}

receive_logs_direct.go的程式碼:

packagemain

import(
"log"
"os"

"github.com/streadway/amqp"
)

funcfailOnError(errerror,msgstring){
iferr!=nil{
log.Fatalf("%s:%s",msg,err)
}
}

funcmain(){
conn,err:=amqp.Dial("amqp://guest:guest@localhost:5672/")
failOnError(err,"FailedtoconnecttoRabbitMQ")
deferconn.Close()

ch,err:=conn.Channel()
failOnError(err,"Failedtoopenachannel")
deferch.Close()

err=ch.ExchangeDeclare(
"logs_direct",//name
"direct",//type
true,//durable
false,//auto-deleted
false,//internal
false,//no-wait
nil,//arguments
)
failOnError(err,"Failedtodeclareanexchange")

q,err:=ch.QueueDeclare(
"",//name
false,//durable
false,//deletewhenunused
true,//exclusive
false,//no-wait
nil,//arguments
)
failOnError(err,"Failedtodeclareaqueue")

iflen(os.Args)<2{
log.Printf("Usage:%s[info][warning][error]",os.Args[0])
os.Exit(0)
}
for_,s:=rangeos.Args[1:]{
log.Printf("Bindingqueue%stoexchange%swithroutingkey%s",
q.Name,"logs_direct",s)
err=ch.QueueBind(
q.Name,//queuename
s,//routingkey
"logs_direct",//exchange
false,
nil)
failOnError(err,"Failedtobindaqueue")
}

msgs,err:=ch.Consume(
q.Name,//queue
"",//consumer
true,//autoack
false,//exclusive
false,//nolocal
false,//nowait
nil,//args
)
failOnError(err,"Failedtoregisteraconsumer")

forever:=make(chanbool)

gofunc(){
ford:=rangemsgs{
log.Printf("[x]%s",d.Body)
}
}()

log.Printf("[*]Waitingforlogs.ToexitpressCTRL+C")
<-forever
}

如果你只想將“warning”和“err”(而不是“info”)級別的日誌訊息儲存到檔案中,只需開啟控制檯並輸入:

gorunreceive_logs_direct.gowarningerror>logs_from_rabbit.log

如果你想在螢幕上檢視所有日誌訊息,請開啟一個新終端並執行以下操作:

gorunreceive_logs_direct.goinfowarningerror
#=>[*]Waitingforlogs.ToexitpressCTRL+C

例如,要發出error日誌訊息,只需輸入:

gorunemit_log_direct.goerror"Run.Run.Oritwillexplode."
#=>[x]Sent'error':'Run.Run.Oritwillexplode.'

(這裡是(emit_log_direct.go)和(receive_logs_direct.go)的完整原始碼)

繼續學習教程5,瞭解如何根據模式監聽訊息。