1. 程式人生 > 實用技巧 >go 操作RabbitMQ

go 操作RabbitMQ

1.RMQ的安裝

docker run -d --hostname my-rabbit --name rmq -p 15672:15672 -p 5672:5672 -p 25672:25672 -e RABBITMQ_DEFAULT_USER=使用者名稱 -e RABBITMQ_DEFAULT_PASS=密碼 rabbitmq:3-management

  • 三個埠對映,分別表示
5672:連線生產者、消費者的埠
15672:WEB管理頁面的埠
25672:分散式叢集的埠

2.基本概念

  • amqp:高階訊息佇列協議,即一種訊息中介軟體協議,RMQ是amqp協議的一個具體實現。RMQ使用Erlang語言實現的,具有很好的併發能力,具體歷史請百度,這裡主要關心怎麼用。
  • 生產者將訊息傳送至交換器;交換器再發送至佇列,最後傳送至消費者
  • 交換器有四種類型,fanout、direct、topic三種類型,header型別沒用過,不關注。
fanout
一對多,根據繫結傳送到每一個佇列,
常用於釋出訂閱

direct
預設模式,一對一關係,根據routingkey與bindingjkey
一一對應匹配,傳送訊息

關於topic模式
以 ‘.’ 來分割單詞。
‘#’ 表示一個或多個單詞。
‘*’ 表示一個單詞。
如:
RoutingKey為:
aaa.bbb.ccc
BindingKey可以為:
*.bbb.ccc
aaa.#

3.庫中重要的方法

  • 建立交換器
func (ch *Channel) ExchangeDeclare(
	name string,  //交換器的名稱
	kind string, //表示交換器的型別。有四種常用型別:direct、fanout、topic、headers
	durable bool, //是否持久化,true表示是。持久化表示會把交換器的配置存檔,當RMQ Server重啟後,會自動載入交換器
	autoDelete bool, //是否自動刪除,true表示,當所有繫結都與交換器解綁後,會自動刪除此交換器。
	internal bool,  //是否為內部,true表示是。客戶端無法直接傳送msg到內部交換器,只有交換器可以傳送msg到內部交換器。
	noWait bool, //是否非阻塞, 阻塞:表示建立交換器的請求傳送後,阻塞等待RMQ Server返回資訊。非阻塞:不會阻塞等待RMQ
	args Table
) error
  • 建立佇列
func (ch *Channel) QueueDeclare(
	name string,  //佇列名稱
	durable bool,  //是否持久化,true為是。持久化會把佇列存檔,伺服器重啟後,不會丟失佇列以及佇列內的資訊
	autoDelete bool,  //是否刪除,當所有消費者都斷開時,佇列會自動刪除。
	exclusive bool,   //是否排他,true為是。如果設定為排他,則佇列僅對首次宣告他的連線可見,並在連線斷開時自動刪除。
	noWait bool, //是否非阻塞
	args Table) (Queue, error)
  • 佇列與交換器繫結,key,表示要繫結的鍵,交換器以此來分發
func (ch *Channel) QueueBind(
	name,  //佇列名字,確定哪個佇列
	key, // 對應圖中BandingKey,表示要繫結的鍵。
	exchange string,  //交換器的名字
	noWait bool,  //是否非阻塞
	args Table) error
  • 交換器之間的繫結
func (ch *Channel) ExchangeBind(
	destination,  //目的交換器,通常是內部交換器。
	key,    //對應BandingKey,表示要繫結的鍵。
	source string,  //源交換器
	noWait bool,   //是否非阻塞
	args Table) error
  • 傳送訊息
func (ch *Channel) Publish(
		exchange,  //要傳送的交換機
		key string,  //路由鍵,與之相關的繫結鍵對應
		mandatory, 
		immediate bool, 
		msg Publishing   //要傳送的訊息,msg對應一個Publishing結構
		) error
		
//Publishing 結構體
type Publishing struct {
        Headers Table
        // Properties
        ContentType     string  //訊息的型別,通常為“text/plain”
        ContentEncoding string  //訊息的編碼,一般預設不用寫
        DeliveryMode    uint8   //訊息是否持久化,2表示持久化,0或1表示非持久化。
        Body []byte  //訊息主體
        Priority        uint8  //訊息的優先順序 0 to 9
        CorrelationId   string    // correlation identifier
        ReplyTo         string    // address to to reply to (ex: RPC)
        Expiration      string    // message expiration spec
        MessageId       string    // message identifier
        Timestamp       time.Time // message timestamp
        Type            string    // message type name
        UserId          string    // creating user id - ex: "guest"
        AppId           string    // creating application id
}
		
  • 消費者接收訊息--推模式
func (ch *Channel) Consume(
	queue string,  //佇列名稱 
	consumer string,  //消費者標籤,用於區分不同的消費者
	autoAck string,  //是否自動回覆ACK,true為是,回覆ACK表示高速伺服器我收到訊息了。建議為false,手動回覆,這樣可控性強
	exclusive bool,  //設定是否排他,排他表示當前佇列只能給一個消費者使用
	noLocal bool, //如果為true,表示生產者和消費者不能是同一個connect
	noWait bool, //是否非阻塞
	args Table) (<-chan Delivery, error)
  • 消費者接收訊息--拉模式
func (ch *Channel) Get(
	queue string, 
	autoAck bool) (msg Delivery, ok bool, err error)
  • 手動回覆訊息
func (ch *Channel) Ack(tag uint64, multiple bool) error

func (me Delivery) Ack(multiple bool) error {
        if me.Acknowledger == nil {
                return errDeliveryNotInitialized
        }
        return me.Acknowledger.Ack(me.DeliveryTag, multiple)
}

func (d Delivery) Reject(requeue bool) error
Publish – mandatory引數
  • false:當訊息無法通過交換器匹配到佇列時,會丟棄訊息。
  • true:當訊息無法通過交換器匹配到佇列時,會呼叫basic.return通知生產者。
  • 注:不建議使用,因會使程式邏輯變得複雜,可以通過備用交換機來實現類似的功能。
Publish – immediate引數
  • true:當訊息到達Queue後,發現佇列上無消費者時,通過basic.Return返回給生產者。

  • false:訊息一直快取在佇列中,等待生產者。

  • 注:不建議使用此引數,遇到這種情況,可用TTL和DLX方法代替(後面會介紹

  • Qos
    func (ch *Channel) Qos(prefetchCount, prefetchSize int, global bool) error

  • 注意:這個在推送模式下非常重要,通過設定Qos用來防止訊息堆積。

  • prefetchCount:消費者未確認訊息的個數。

  • prefetchSize :消費者未確認訊息的大小。

  • global :是否全域性生效,true表示是。全域性生效指的是針對當前connect裡的所有channel都生效

4.程式碼示例

生產者

package main

import (
        "fmt"
        "log"
        "os"
        "strings"

        "github.com/streadway/amqp"
)

func failOnError(err error, msg string) {
        if err != nil {
                log.Fatalf("%s: %s", msg, err)
        }
}

func main() {
        conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/")
        failOnError(err, "Failed to connect to RabbitMQ")
        defer conn.Close()

        ch, err := conn.Channel()
        failOnError(err, "Failed to open a channel")
        defer ch.Close()

        err = ch.ExchangeDeclare(
                "logs",   // name
                "fanout", // type
                true,     // durable
                false,    // auto-deleted
                false,    // internal
                false,    // no-wait
                nil,      // arguments
        )
        failOnError(err, "Failed to declare an exchange")

        body := bodyFrom(os.Args)
        err = ch.Publish(
                "logs", // exchange
                "",     // routing key
                false,  // mandatory
                false,  // immediate
                amqp.Publishing{
                        ContentType: "text/plain",
                        Body:        []byte(body),
                })
        failOnError(err, "Failed to publish a message")

        log.Printf(" [x] Sent %s", body)
}

func bodyFrom(args []string) string {
        var s string
        if (len(args) < 2) || os.Args[1] == "" {
                s = "hello"
        } else {
                s = strings.Join(args[1:], " ")
        }
        return s
}

消費者

package main

import (
	"github.com/streadway/amqp"
	"log"
)

func main() {
	conn,err := amqp.Dial("amqp://guest:guest@localhost:5672/")
	DealWithError(err,"Failed to connect to RabbitMQ")
	defer conn.Close()

	ch,err := conn.Channel()
	DealWithError(err,"Failed to open a channel")
	defer ch.Close()
	//宣告交換器
	ch.ExchangeDeclare(
		"logs",
		"fanout",
		true,
		false,
		false,
		false,
		nil,
		)
	DealWithError(err,"Failed to declare an exchange")
	//聲明瞭佇列
	q,err := ch.QueueDeclare(
		"", //佇列名字為rabbitMQ自動生成
		false,
		false,
		true,
		false,
		nil,
		)
	DealWithError(err,"Failed to declare an exchange")
	//交換器跟佇列進行繫結,交換器將接收到的訊息放進佇列中
	err = ch.QueueBind(
		q.Name,
		"",
		"logs",
		false,
		nil,
		)
	DealWithError(err,"Failed to bind a queue")
	msgs,err := ch.Consume(
		q.Name,
		"",
		true,
		false,
		false,
		false,
		nil,
		)
	DealWithError(err,"Failed to register a consumer")
	forever := make(chan bool)
	go func() {
		for d := range msgs{
			log.Printf(" [x] %s",d.Body)
		}
	}()
	log.Printf(" [*] Waiting for logs. To exit press CTRL+C")
	<-forever
}

func DealWithError(err error,msg string)  {
	if err != nil {
		log.Fatalf("%s: %s", msg, err)
	}
}