go 操作RabbitMQ
阿新 • • 發佈:2020-08-11
1.RMQ的安裝
docker run -d --hostname my-rabbit --name rmq -p 15672:15672 -p 5672:5672 -p 25672:25672 -e RABBITMQ_DEFAULT_USER=使用者名稱 -e RABBITMQ_DEFAULT_PASS=密碼 rabbitmq:3-management
- 三個埠對映,分別表示
5672:連線生產者、消費者的埠
15672:WEB管理頁面的埠
25672:分散式叢集的埠
2.基本概念
- amqp:高階訊息佇列協議,即一種訊息中介軟體協議,RMQ是amqp協議的一個具體實現。RMQ使用Erlang語言實現的,具有很好的併發能力,具體歷史請百度,這裡主要關心怎麼用。
- 生產者將訊息傳送至交換器;交換器再發送至佇列,最後傳送至消費者
- 交換器有四種類型,fanout、direct、topic三種類型,header型別沒用過,不關注。
fanout
一對多,根據繫結傳送到每一個佇列,
常用於釋出訂閱
direct
預設模式,一對一關係,根據routingkey與bindingjkey
一一對應匹配,傳送訊息
關於topic模式
以 ‘.’ 來分割單詞。
‘#’ 表示一個或多個單詞。
‘*’ 表示一個單詞。
如:
RoutingKey為:
aaa.bbb.ccc
BindingKey可以為:
*.bbb.ccc
aaa.#
3.庫中重要的方法
- 建立交換器
func (ch *Channel) ExchangeDeclare( name string, //交換器的名稱 kind string, //表示交換器的型別。有四種常用型別:direct、fanout、topic、headers durable bool, //是否持久化,true表示是。持久化表示會把交換器的配置存檔,當RMQ Server重啟後,會自動載入交換器 autoDelete bool, //是否自動刪除,true表示,當所有繫結都與交換器解綁後,會自動刪除此交換器。 internal bool, //是否為內部,true表示是。客戶端無法直接傳送msg到內部交換器,只有交換器可以傳送msg到內部交換器。 noWait bool, //是否非阻塞, 阻塞:表示建立交換器的請求傳送後,阻塞等待RMQ Server返回資訊。非阻塞:不會阻塞等待RMQ args Table ) error
- 建立佇列
func (ch *Channel) QueueDeclare(
name string, //佇列名稱
durable bool, //是否持久化,true為是。持久化會把佇列存檔,伺服器重啟後,不會丟失佇列以及佇列內的資訊
autoDelete bool, //是否刪除,當所有消費者都斷開時,佇列會自動刪除。
exclusive bool, //是否排他,true為是。如果設定為排他,則佇列僅對首次宣告他的連線可見,並在連線斷開時自動刪除。
noWait bool, //是否非阻塞
args Table) (Queue, error)
- 佇列與交換器繫結,key,表示要繫結的鍵,交換器以此來分發
func (ch *Channel) QueueBind(
name, //佇列名字,確定哪個佇列
key, // 對應圖中BandingKey,表示要繫結的鍵。
exchange string, //交換器的名字
noWait bool, //是否非阻塞
args Table) error
- 交換器之間的繫結
func (ch *Channel) ExchangeBind(
destination, //目的交換器,通常是內部交換器。
key, //對應BandingKey,表示要繫結的鍵。
source string, //源交換器
noWait bool, //是否非阻塞
args Table) error
- 傳送訊息
func (ch *Channel) Publish(
exchange, //要傳送的交換機
key string, //路由鍵,與之相關的繫結鍵對應
mandatory,
immediate bool,
msg Publishing //要傳送的訊息,msg對應一個Publishing結構
) error
//Publishing 結構體
type Publishing struct {
Headers Table
// Properties
ContentType string //訊息的型別,通常為“text/plain”
ContentEncoding string //訊息的編碼,一般預設不用寫
DeliveryMode uint8 //訊息是否持久化,2表示持久化,0或1表示非持久化。
Body []byte //訊息主體
Priority uint8 //訊息的優先順序 0 to 9
CorrelationId string // correlation identifier
ReplyTo string // address to to reply to (ex: RPC)
Expiration string // message expiration spec
MessageId string // message identifier
Timestamp time.Time // message timestamp
Type string // message type name
UserId string // creating user id - ex: "guest"
AppId string // creating application id
}
- 消費者接收訊息--推模式
func (ch *Channel) Consume(
queue string, //佇列名稱
consumer string, //消費者標籤,用於區分不同的消費者
autoAck string, //是否自動回覆ACK,true為是,回覆ACK表示高速伺服器我收到訊息了。建議為false,手動回覆,這樣可控性強
exclusive bool, //設定是否排他,排他表示當前佇列只能給一個消費者使用
noLocal bool, //如果為true,表示生產者和消費者不能是同一個connect
noWait bool, //是否非阻塞
args Table) (<-chan Delivery, error)
- 消費者接收訊息--拉模式
func (ch *Channel) Get(
queue string,
autoAck bool) (msg Delivery, ok bool, err error)
- 手動回覆訊息
func (ch *Channel) Ack(tag uint64, multiple bool) error
func (me Delivery) Ack(multiple bool) error {
if me.Acknowledger == nil {
return errDeliveryNotInitialized
}
return me.Acknowledger.Ack(me.DeliveryTag, multiple)
}
func (d Delivery) Reject(requeue bool) error
Publish – mandatory引數
- false:當訊息無法通過交換器匹配到佇列時,會丟棄訊息。
- true:當訊息無法通過交換器匹配到佇列時,會呼叫basic.return通知生產者。
- 注:不建議使用,因會使程式邏輯變得複雜,可以通過備用交換機來實現類似的功能。
Publish – immediate引數
-
true:當訊息到達Queue後,發現佇列上無消費者時,通過basic.Return返回給生產者。
-
false:訊息一直快取在佇列中,等待生產者。
-
注:不建議使用此引數,遇到這種情況,可用TTL和DLX方法代替(後面會介紹
-
Qos
func (ch *Channel) Qos(prefetchCount, prefetchSize int, global bool) error
-
注意:這個在推送模式下非常重要,通過設定Qos用來防止訊息堆積。
-
prefetchCount:消費者未確認訊息的個數。
-
prefetchSize :消費者未確認訊息的大小。
-
global :是否全域性生效,true表示是。全域性生效指的是針對當前connect裡的所有channel都生效
4.程式碼示例
生產者
package main
import (
"fmt"
"log"
"os"
"strings"
"github.com/streadway/amqp"
)
func failOnError(err error, msg string) {
if err != nil {
log.Fatalf("%s: %s", msg, err)
}
}
func main() {
conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/")
failOnError(err, "Failed to connect to RabbitMQ")
defer conn.Close()
ch, err := conn.Channel()
failOnError(err, "Failed to open a channel")
defer ch.Close()
err = ch.ExchangeDeclare(
"logs", // name
"fanout", // type
true, // durable
false, // auto-deleted
false, // internal
false, // no-wait
nil, // arguments
)
failOnError(err, "Failed to declare an exchange")
body := bodyFrom(os.Args)
err = ch.Publish(
"logs", // exchange
"", // routing key
false, // mandatory
false, // immediate
amqp.Publishing{
ContentType: "text/plain",
Body: []byte(body),
})
failOnError(err, "Failed to publish a message")
log.Printf(" [x] Sent %s", body)
}
func bodyFrom(args []string) string {
var s string
if (len(args) < 2) || os.Args[1] == "" {
s = "hello"
} else {
s = strings.Join(args[1:], " ")
}
return s
}
消費者
package main
import (
"github.com/streadway/amqp"
"log"
)
func main() {
conn,err := amqp.Dial("amqp://guest:guest@localhost:5672/")
DealWithError(err,"Failed to connect to RabbitMQ")
defer conn.Close()
ch,err := conn.Channel()
DealWithError(err,"Failed to open a channel")
defer ch.Close()
//宣告交換器
ch.ExchangeDeclare(
"logs",
"fanout",
true,
false,
false,
false,
nil,
)
DealWithError(err,"Failed to declare an exchange")
//聲明瞭佇列
q,err := ch.QueueDeclare(
"", //佇列名字為rabbitMQ自動生成
false,
false,
true,
false,
nil,
)
DealWithError(err,"Failed to declare an exchange")
//交換器跟佇列進行繫結,交換器將接收到的訊息放進佇列中
err = ch.QueueBind(
q.Name,
"",
"logs",
false,
nil,
)
DealWithError(err,"Failed to bind a queue")
msgs,err := ch.Consume(
q.Name,
"",
true,
false,
false,
false,
nil,
)
DealWithError(err,"Failed to register a consumer")
forever := make(chan bool)
go func() {
for d := range msgs{
log.Printf(" [x] %s",d.Body)
}
}()
log.Printf(" [*] Waiting for logs. To exit press CTRL+C")
<-forever
}
func DealWithError(err error,msg string) {
if err != nil {
log.Fatalf("%s: %s", msg, err)
}
}