golang操作Rabbit
阿新 • • 發佈:2020-12-28
GOLANG操作rabbitmq
簡單模式
一個生產者對應一個消費者!!!
生產者
package main import ( "fmt" "github.com/streadway/amqp" "log" ) func main() { url := "amqp://guest:guest@localhost:5672/" // 建立連結 coon, err := amqp.Dial(url) if err != nil { log.Fatal(err) } defer coon.Close() // 獲取channel channel, err := coon.Channel() if err != nil { log.Fatal(err) } defer channel.Close() // 宣告佇列,沒有則建立 _, err = channel.QueueDeclare("hello", true, true, false, false, nil) /* name:queue的名稱 durable:是否持久化 autoDelete: 是否自動刪除(當沒有customer時會自動刪除) exclusive: 1.是否獨佔,只有一個消費者監聽這個佇列 2.當coon關閉的時候,是否刪除佇列 noWait: 是否等待 args: 額外的引數 */ // 釋出訊息 message := amqp.Publishing{ Body: []byte("hello world"), } if err := channel.Publish("", "hello", false, false, message); err != nil { fmt.Println(err) } /* exchange:交換機的名稱,簡單模式下使用預設的"" routerKey: 路由名稱, 簡單模式下使用和佇列名稱一樣 */ }
消費者
func main() { url := "amqp://guest:guest@localhost:5672/" // 建立連結 coon, err := amqp.Dial(url) if err != nil { log.Fatal(err) } defer coon.Close() // 獲取channel channel, err := coon.Channel() if err != nil { log.Fatal(err) } defer channel.Close() // 宣告佇列,沒有則建立 _, err = channel.QueueDeclare("hello", true, true, false, false, nil) delivery, err := channel.Consume("hello", "", false, false, false, false, nil) if err != nil { log.Fatal(err) } for message := range delivery { fmt.Println(string(message.Body)) } }
工作模式
一個生產者對應多個消費者,但是隻能有一個消費者獲得訊息!!!
package main import ( "fmt" "github.com/streadway/amqp" "log" "strconv" ) const ( url = "amqp://guest:guest@localhost:5672/" queue = "workMode" ) func Consume(name string) { // 建立連結 coon, err := amqp.Dial(url) if err != nil { log.Fatal(err) } defer coon.Close() // 獲取channel channel, err := coon.Channel() if err != nil { log.Fatal(err) } defer channel.Close() // 宣告佇列,沒有則建立 _, err = channel.QueueDeclare(queue, true, true, false, false, nil) delivery, err := channel.Consume(queue, "", false, false, false, false, nil) if err != nil { log.Fatal(err) } for message := range delivery { fmt.Printf("name:%s body:%s\n", name, string(message.Body)) } } func Publish() { coon, err := amqp.Dial(url) if err != nil { log.Fatal(err) } defer coon.Close() channel, err := coon.Channel() if err != nil { log.Fatal(err) } defer channel.Close() // 宣告佇列,沒有則建立 _, err = channel.QueueDeclare(queue, true, true, false, false, nil) for i := 0; i < 10; i++ { message := amqp.Publishing{ Body: []byte("hello world" + strconv.Itoa(i)), } if err := channel.Publish("", queue, false, false, message); err != nil { fmt.Println(err) } } } func main() { // 開啟兩個消費者 for i := 0; i < 2; i++ { go Consume(fmt.Sprintf("name%d", i)) } // 開啟一個生產者 go Publish() // 阻塞主goroutine <-make(chan int) }
測試結果
name:name1 body:hello world0
name:name1 body:hello world2
name:name1 body:hello world4
name:name1 body:hello world6
name:name1 body:hello world8
name:name0 body:hello world1
name:name0 body:hello world3
name:name0 body:hello world5
name:name0 body:hello world7
name:name0 body:hello world9
釋出訂閱模式
package main
import (
"fmt"
"github.com/streadway/amqp"
"log"
)
const (
url = "amqp://guest:guest@localhost:5672/"
exchangeName = "TestFanout"
queue1 = "TestFanoutQueue1"
queue2 = "TestFanoutQueue2"
)
func Consume(queue string, action string) {
// 建立連結
coon, err := amqp.Dial(url)
if err != nil {
log.Fatal(err)
}
defer coon.Close()
// 獲取channel
channel, err := coon.Channel()
if err != nil {
log.Fatal(err)
}
defer channel.Close()
// 注意:這裡一定要寫上,否則有異常,ioException
_, err = channel.QueueDeclare(queue, true, true, false, false, nil)
delivery, err := channel.Consume(queue, "", false, false, false, false, nil)
if err != nil {
log.Fatal(err)
}
for message := range delivery {
fmt.Printf("name:%s action:%s body:%s\n", queue, action, string(message.Body))
}
}
func Publish() {
coon, err := amqp.Dial(url)
if err != nil {
log.Fatal(err)
}
defer coon.Close()
channel, err := coon.Channel()
if err != nil {
log.Fatal(err)
}
defer channel.Close()
// 建立交換機
if err := channel.ExchangeDeclare(
exchangeName, // name
amqp.ExchangeFanout, // kind
false, // durable
true, // autoDelete
false, // internal 是否rabbitmq內部使用
true, // noWait
nil, // args
); err != nil {
log.Fatal(err)
}
// 宣告佇列,沒有則建立
_, err = channel.QueueDeclare(queue1, true, true, false, false, nil)
_, err = channel.QueueDeclare(queue2, true, true, false, false, nil)
if err := channel.QueueBind(queue1, "", exchangeName, true, nil); err != nil {
log.Fatal(err)
}
if err := channel.QueueBind(queue2, "", exchangeName, true, nil); err != nil {
log.Fatal(err)
}
message := amqp.Publishing{
Body: []byte("ExchangeFanout"),
}
// 使用FANOUT時,routeingKey設定為空字串
if err := channel.Publish(exchangeName, "", false, false, message); err != nil {
log.Fatal(err)
}
}
func main() {
// 開啟兩個消費者
go Consume(queue1, "記錄日誌")
go Consume(queue2, "儲存資訊")
// 開啟一個生產者
go Publish()
// 阻塞主goroutine
<-make(chan int)
}
路由模式
package main
import (
"fmt"
"github.com/streadway/amqp"
"log"
)
const (
url = "amqp://guest:guest@localhost:5672/"
exchangeName = "direct"
errorRouteKey = "error"
infoRouteKey = "info"
warningRouteKey = "warning"
queue1 = "directQueue1"
queue2 = "directQueue2"
)
func Consume(queue string, action string) {
// 建立連結
coon, err := amqp.Dial(url)
if err != nil {
log.Fatal(err)
}
defer coon.Close()
// 獲取channel
channel, err := coon.Channel()
if err != nil {
log.Fatal(err)
}
defer channel.Close()
// 注意:這裡一定要寫上,否則有異常,ioException
_, err = channel.QueueDeclare(queue, true, true, false, false, nil)
delivery, err := channel.Consume(queue, "", false, false, false, false, nil)
if err != nil {
log.Fatal(err)
}
for message := range delivery {
fmt.Printf("name:%s action:%s body:%s\n", queue, action, string(message.Body))
}
}
func Publish() {
coon, err := amqp.Dial(url)
if err != nil {
log.Fatal(err)
}
defer coon.Close()
channel, err := coon.Channel()
if err != nil {
log.Fatal(err)
}
defer channel.Close()
// 建立交換機
if err := channel.ExchangeDeclare(
exchangeName, // name
amqp.ExchangeDirect, // kind
false, // durable
true, // autoDelete
false, // internal 是否rabbitmq內部使用
true, // noWait
nil, // args
); err != nil {
log.Fatal(err)
}
// 宣告佇列,沒有則建立
_, err = channel.QueueDeclare(queue1, true, true, false, false, nil)
_, err = channel.QueueDeclare(queue2, true, true, false, false, nil)
// 佇列1繫結error
if err := channel.QueueBind(queue1, errorRouteKey, exchangeName, true, nil); err != nil {
log.Fatal(err)
}
// 佇列2繫結error,info,warning
if err := channel.QueueBind(queue2, errorRouteKey, exchangeName, true, nil); err != nil {
log.Fatal(err)
}
if err := channel.QueueBind(queue2, infoRouteKey, exchangeName, true, nil); err != nil {
log.Fatal(err)
}
if err := channel.QueueBind(queue2, warningRouteKey, exchangeName, true, nil); err != nil {
log.Fatal(err)
}
message := amqp.Publishing{
Body: []byte("ExchangeDirect"),
}
// 傳送訊息指定交換機和路由key
if err := channel.Publish(exchangeName, infoRouteKey, false, false, message); err != nil {
log.Fatal(err)
}
}
func main() {
// 開啟兩個消費者
go Consume(queue1, "記錄日誌")
go Consume(queue2, "儲存資訊")
// 開啟一個生產者
go Publish()
// 阻塞主goroutine
<-make(chan int)
}
此時生產者傳送的路由key為info,只有queue2才能收到,如果把路由key改為error,則兩個佇列都能收到。
萬用字元模式
package main
import (
"fmt"
"github.com/streadway/amqp"
"log"
)
const (
url = "amqp://guest:guest@localhost:5672/"
exchangeName = "topic"
queue1 = "directQueue1"
queue2 = "directQueue2"
)
func Consume(queue string, action string) {
// 建立連結
coon, err := amqp.Dial(url)
if err != nil {
log.Fatal(err)
}
defer coon.Close()
// 獲取channel
channel, err := coon.Channel()
if err != nil {
log.Fatal(err)
}
defer channel.Close()
// 注意:這裡一定要寫上,否則有異常,ioException
_, err = channel.QueueDeclare(queue, true, true, false, false, nil)
delivery, err := channel.Consume(queue, "", false, false, false, false, nil)
if err != nil {
log.Fatal(err)
}
for message := range delivery {
fmt.Printf("name:%s action:%s body:%s\n", queue, action, string(message.Body))
}
}
func Publish() {
coon, err := amqp.Dial(url)
if err != nil {
log.Fatal(err)
}
defer coon.Close()
channel, err := coon.Channel()
if err != nil {
log.Fatal(err)
}
defer channel.Close()
// 建立交換機
if err := channel.ExchangeDeclare(
exchangeName, // name
amqp.ExchangeTopic, // kind
false, // durable
true, // autoDelete
false, // internal 是否rabbitmq內部使用
true, // noWait
nil, // args
); err != nil {
log.Fatal(err)
}
// 宣告佇列,沒有則建立
_, err = channel.QueueDeclare(queue1, true, true, false, false, nil)
_, err = channel.QueueDeclare(queue2, true, true, false, false, nil)
// 佇列1繫結error
if err := channel.QueueBind(queue1, "*.*", exchangeName, true, nil); err != nil {
log.Fatal(err)
}
// 佇列2繫結error,info,warning
if err := channel.QueueBind(queue2, "*.error", exchangeName, true, nil); err != nil {
log.Fatal(err)
}
if err := channel.QueueBind(queue2, "#.info", exchangeName, true, nil); err != nil {
log.Fatal(err)
}
message := amqp.Publishing{
Body: []byte("ExchangeDirect"),
}
// 傳送訊息指定交換機和路由key
if err := channel.Publish(exchangeName, "test.info", false, false, message); err != nil {
log.Fatal(err)
}
}
func main() {
// 開啟兩個消費者
go Consume(queue1, "記錄日誌")
go Consume(queue2, "儲存資訊")
// 開啟一個生產者
go Publish()
// 阻塞主goroutine
<-make(chan int)
}
萬用字元模式是路由模式的進階版,它能夠讓一個佇列監聽動態的路由規則。