1. 程式人生 > >RabbitMQ Go客戶端教程5——topic

RabbitMQ Go客戶端教程5——topic

本文翻譯自[RabbitMQ官網的Go語言客戶端系列教程](https://www.rabbitmq.com/getstarted.html),本文首發於我的個人部落格:[liwenzhou.com](https://liwenzhou.com),教程共分為六篇,本文是第五篇——Topic。 這些教程涵蓋了使用RabbitMQ建立訊息傳遞應用程式的基礎知識。 你需要安裝RabbitMQ伺服器才能完成這些教程,請參閱[安裝指南](https://www.rabbitmq.com/download.html)或使用[Docker映象](https://registry.hub.docker.com/_/rabbitmq/)。 這些教程的程式碼是[開源](https://github.com/rabbitmq/rabbitmq-tutorials)的,[官方網站](https://github.com/rabbitmq/rabbitmq-website)也是如此。 ## 先決條件 本教程假設RabbitMQ已安裝並執行在本機上的標準埠(5672)。如果你使用不同的主機、埠或憑據,則需要調整連線設定。 ### topic交換器(主題交換器) 傳送到`topic`交換器的訊息不能具有隨意的`routing_key`——它必須是單詞列表,以點分隔。這些詞可以是任何東西,但通常它們指定與訊息相關的某些功能。一些有效的`routing_key`示例:“`stock.usd.nyse`”,“`nyse.vmw`”,“`quick.orange.rabbit`”。`routing_key`中可以包含任意多個單詞,最多255個位元組。 繫結鍵也必須採用相同的形式。`topic`交換器背後的邏輯類似於`direct`交換器——用特定路由鍵傳送的訊息將傳遞到所有匹配繫結鍵繫結的佇列。但是,繫結鍵有兩個重要的特殊情況: - `*`(星號)可以代替一個單詞。 - `#`(井號)可以替代零個或多個單詞。 通過下面這個示例可以很容易看明白這一點: ![img](https://liwenzhou.com/images/Go/rabbitmq/tutorials05/python-five.png) 在這個例子中,我們將傳送一些都是描述動物的資訊。將使用包含三個詞(兩個點)的路由金鑰傳送訊息。路由鍵中的第一個單詞將描述速度,第二個是顏色,第三個是種類:“`..`”。 我們建立了三個繫結關係:Q1與繫結鍵“`*.orange.*`”繫結,Q2與“`*.*.rabbit`”和“`lazy.#`”繫結。 這些繫結可以總結為: - Q1對所有橙色動物都感興趣。 - Q2想接收有關兔子(rabbit)的一切訊息,以及有關懶惰(lazy)動物的一切訊息。 路由鍵設定為“`quick.orange.rabbit`”的訊息將傳遞到兩個佇列。訊息“`lazy.orange.elephant`”也將傳送給他們兩個。另一方面,“`quick.orange.fox`”將僅進入第一個佇列,而“`lazy.brown.fox`”將僅進入第二個佇列。即使“`lazy.pink.rabbit`”與兩個繫結匹配(匹配Q2的兩個繫結),也只會傳遞到第二個佇列一次。 “`quick.brown.fox`”與任何繫結都不匹配,因此將被丟棄。 如果我們打破約定併發送一個或四個單詞的訊息,例如“`orange`”或“`quick.orange.male.rabbit`”,會發生什麼?好吧,這些訊息將不匹配任何繫結,並且將會丟失。 另外,“`lazy.orange.male.rabbit`”即使有四個單詞,也將匹配最後一個繫結,並將其傳送到第二個佇列。 > topic交換器 > > topic交換器功能強大,可以像其他交換器一樣執行。 > > 當佇列用“`#`”(井號)繫結鍵繫結時,它將接收所有訊息,而與路由鍵無關,就像在`fanout`交換器中一樣。 > > 當在繫結中不使用特殊字元“`*`”(星號)和“`#`”(井號)時,topic交換器的行為就像`direct`交換器一樣。 ### 完整示例 我們將在日誌記錄系統中使用`topic`交換器。我們將從一個可行的假設開始,即日誌的路由鍵將包含兩個詞:“`.`”。 該程式碼與[上一教程](https://www.liwenzhou.com/posts/Go/go_rabbitmq_tutorials_04/)中的程式碼幾乎相同。 `emit_log_topic.go`的程式碼: ```go package main import ( "log" "os" "strings" "github.com/streadway/amqp" ) func failOnError(err error, msg string) { if err != nil { log.Fatalf("%s: %s", msg, err) } } func main() { conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/") failOnError(err, "Failed to connect to RabbitMQ") defer conn.Close() ch, err := conn.Channel() failOnError(err, "Failed to open a channel") defer ch.Close() err = ch.ExchangeDeclare( "logs_topic", // name "topic", // type true, // durable false, // auto-deleted false, // internal false, // no-wait nil, // arguments ) failOnError(err, "Failed to declare an exchange") body := bodyFrom(os.Args) err = ch.Publish( "logs_topic", // exchange severityFrom(os.Args), // routing key false, // mandatory false, // immediate amqp.Publishing{ ContentType: "text/plain", Body: []byte(body), }) failOnError(err, "Failed to publish a message") log.Printf(" [x] Sent %s", body) } func bodyFrom(args []string) string { var s string if (len(args) < 3) || os.Args[2] == "" { s = "hello" } else { s = strings.Join(args[2:], " ") } return s } func severityFrom(args []string) string { var s string if (len(args) < 2) || os.Args[1] == "" { s = "anonymous.info" } else { s = os.Args[1] } return s } ``` `receive_logs_topic.go`的程式碼: ```go package main import ( "log" "os" "github.com/streadway/amqp" ) func failOnError(err error, msg string) { if err != nil { log.Fatalf("%s: %s", msg, err) } } func main() { conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/") failOnError(err, "Failed to connect to RabbitMQ") defer conn.Close() ch, err := conn.Channel() failOnError(err, "Failed to open a channel") defer ch.Close() err = ch.ExchangeDeclare( "logs_topic", // name "topic", // type true, // durable false, // auto-deleted false, // internal false, // no-wait nil, // arguments ) failOnError(err, "Failed to declare an exchange") q, err := ch.QueueDeclare( "", // name false, // durable false, // delete when unused true, // exclusive false, // no-wait nil, // arguments ) failOnError(err, "Failed to declare a queue") if len(os.Args) < 2 { log.Printf("Usage: %s [binding_key]...", os.Args[0]) os.Exit(0) } // 繫結topic for _, s := range os.Args[1:] { log.Printf("Binding queue %s to exchange %s with routing key %s", q.Name, "logs_topic", s) err = ch.QueueBind( q.Name, // queue name s, // routing key "logs_topic", // exchange false, nil) failOnError(err, "Failed to bind a queue") } msgs, err := ch.Consume( q.Name, // queue "", // consumer true, // auto ack false, // exclusive false, // no local false, // no wait nil, // args ) failOnError(err, "Failed to register a consumer") forever := make(chan bool) go func() { for d := range msgs { log.Printf(" [x] %s", d.Body) } }() log.Printf(" [*] Waiting for logs. To exit press CTRL+C") <-forever } ``` 想要接收所有的日誌: ```bash go run receive_logs_topic.go "#" ``` 要從“`kern`”接收所有日誌: ```bahs go run receive_logs_topic.go "kern.*" ``` 或者,如果你只想接收“`critical`”日誌: ```bash go run receive_logs_topic.go "*.critical" ``` 你可以建立多個繫結: ```bash go run receive_logs_topic.go "kern.*" "*.critical" ``` 併發出帶有路由鍵“`kern.critical`”的日誌: ```bash go run emit_log_topic.go "kern.critical" "A critical kernel error" ``` 你可以自己嘗試玩一下這個程式。請注意,程式碼沒有對路由鍵或繫結鍵進行任何假設,你可能希望使用兩個以上的路由鍵引數。 (關於[emit_log_topic.go](https://github.com/rabbitmq/rabbitmq-tutorials/blob/master/go/emit_log_topic.go)和[receive_logs_topic.go](https://github.com/rabbitmq/rabbitmq-tutorials/blob/master/go/receive_logs_topic.go)的完整原始碼) 接下來,我們將在[教程6](https://www.liwenzhou.com/posts/Go/go_rabbitmq_tutorials_06/)中瞭解如何將往返訊息用作遠端過程