1. 程式人生 > 其它 >RabbitMQ實戰指南

RabbitMQ實戰指南

RbbitMQ

作用:

  1. 解耦
  2. 冗餘(儲存)
  3. 擴充套件性
  4. 削峰
  5. 可恢復性
  6. 順序保證
  7. 緩衝
  8. 非同步通訊

RabbitMQ從最初就實現了一個特性:使用協議本身就可以對佇列和交換器(Exchange)這樣的資源進行配置。

工作模型

  • Exchange
    交換機,將訊息路由到一個或多個佇列種,若路由不到則返回給producer或丟棄。有4種類型
    • direct:訊息傳送到BindingKey和RoutingKey完全匹配的佇列
    • fanout:訊息傳送到所有繫結佇列
    • topic:BindingKey和RoutingKey模糊匹配,使用點分單詞列表,*代表一個單詞,#代表零個或多個單詞
    • headers:使用訊息的headers鍵值對匹配(不實用,基本看不見)
  • RoutingKey
    Producer將訊息發給交換機時攜帶的label,用於指定路由規則。要與Exchange的BIndingKey聯合使用才能生效
  • Binding
    將Exchange和Queue關聯起來,同時會指定一個繫結鍵

生產者流程

  1. Producer連線到mq,建立Connection,建立Channel
  2. 指定Exchange
  3. 指定Queue
  4. 通過BindingKey繫結Exchange和Queue
  5. 傳送訊息
  6. Exchange根據RoutingKey匹配Queue
  7. 找到則傳送,沒有則丟棄或回退
  8. 關閉Channel
  9. 關閉Connection

消費者流程

  1. Consumer連線到mq,建立Connection,建立Channel
  2. 向mq請求相應佇列種訊息
  3. 接收訊息並處理。java使用回撥函式,go使用channel
  4. 自動或手動回覆ack
  5. mq從Queue種刪除被確認訊息
  6. 關閉Channel
  7. 關閉Connection

一個Connection對應一個TCP連線,執行緒安全;多個Channel複用一個Connection,執行緒不安全。當資料量大時需要使用多個Connection調優

Exchange用於路由,傳送訊息時只要指定它即可,無需指定Queue

Queue用於暫存資料,有Queue存在MQ才能保持資料;雙方都能夠建立和繫結。

Producer沒有建立Queue而直接向Exchange傳送訊息,如果此時沒有Consumer則會丟失訊息

推薦Producer和Consumer都使用ExchangeDeclare和QueueDeclare顯式指定,如果已存在同名且屬性不同的Exchange和Queue則會建立失敗,若名稱和屬性都相同則使用已有元件

AMQP協議

是一種位於TCP之上的應用層協議,類似HTTP,涉及到 Connection.Start  Connection.Start-Ok等命令

生產者

消費者

5種模式

官網展示7種模式,有5種比較重要

  1. Simple

    simple
    package main
    
    import (
    	"context"
    	"log"
    	"time"
    
    	"github.com/rabbitmq/amqp091-go"
    )
    
    func failOnError(err error, msg string) {
    	if err != nil {
    		log.Fatalf("%s: %s", msg, err)
    	}
    }
    
    func main() {
    	var err error
    	conn, err := amqp091.Dial("amqp://guest:[email protected]:5672/vhost")
    	failOnError(err, "Failed to connet RabbitMQ")
    	defer conn.Close()
    
    	ch, err := conn.Channel()
    	failOnError(err, "Failed to open channel")
    	defer ch.Close()
    
    	q, err := ch.QueueDeclare(
    		"hello", // name
    		false,   // durable
    		false,   // delete when unused
    		false,   // exclusive
    		false,   // no-wait
    		nil,     // arguments
    	)
    	failOnError(err, "Failed to declare a queue")
    
    	body := "Hello world"
    
    	/**************** 獲取接收訊息的Delivery通道 *******************/
    	ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
    	defer cancel()
    
    	err = ch.PublishWithContext(
    		ctx,
    		"",     // exchange
    		q.Name, // routing key
    		false,  // mandatory
    		false,  // immediate
    		amqp091.Publishing{
    			ContentType: "text/plain",
    			Body:        []byte(body),
    		})
    	failOnError(err, "Failed to publish a message")
    	log.Printf(" [x] Sent: %s\n", body)
    
    	/**************** 獲取接收訊息的Delivery通道 *******************/
    	msgs, err := ch.Consume(
    		q.Name, // queue
    		"",     // consumer
    		true,   // auto-ack
    		false,  // exclusive
    		false,  // no-local
    		false,  // no-wait
    		nil,    // args
    	)
    	failOnError(err, "Failed to register a consumer")
    
    	d := <-msgs
    	log.Printf("%s", d.Body)
    }
  2. Work Queues
    多個消費者

    producer
    package main
    
    import (
    	"context"
    	"log"
    	"strconv"
    	"time"
    
    	"github.com/rabbitmq/amqp091-go"
    )
    
    func failOnError(err error, msg string) {
    	if err != nil {
    		log.Fatalf("%s: %s", msg, err)
    	}
    }
    func main() {
    	var err error
    	conn, err := amqp091.Dial("amqp://guest:[email protected]:5672/vhost")
    	failOnError(err, "Failed to connet RabbitMQ")
    	defer conn.Close()
    
    	ch, err := conn.Channel()
    	failOnError(err, "open a channel failed")
    	defer ch.Close()
    
    	q, err := ch.QueueDeclare(
    		"task_queue", // name
    		true,         // 持久的
    		false,        // delete when unused
    		false,        // 獨有的
    		false,        // no-wait
    		nil,          // arguments
    	)
    	failOnError(err, "delare queue feailed")
    
    	msg := "message"
    	for i := 0; i < 10; i++ {
    		body := msg + strconv.Itoa(i)
    		ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
    		defer cancel()
    		err = ch.PublishWithContext(
    			ctx,
    			"",
    			q.Name,
    			false,
    			false,
    			amqp091.Publishing{
    				DeliveryMode: amqp091.Persistent,
    				ContentType:  "text/plain",
    				Body:         []byte(body),
    			})
    		failOnError(err, "publish message failed")
    		log.Printf("Sent: %s", body)
    	}
    }
    consumer
     package main
    
    import (
    	"log"
    	"time"
    
    	"github.com/rabbitmq/amqp091-go"
    )
    
    func failOnError(err error, msg string) {
    	if err != nil {
    		log.Fatalf("%s: %s", msg, err)
    	}
    }
    
    func main() {
    	var err error
    	conn, err := amqp091.Dial("amqp://guest:[email protected]:5672/vhost")
    	failOnError(err, "Failed to connet RabbitMQ")
    	defer conn.Close()
    
    	ch, err := conn.Channel()
    	failOnError(err, "open a channel failed")
    	defer ch.Close()
    
    	q, err := ch.QueueDeclare(
    		"task_queue", // name
    		true,         // 持久的
    		false,        // delete when unused
    		false,        // 獨有的
    		false,        // no-wait
    		nil,          // arguments
    	)
    	failOnError(err, "delare queue feailed")
    
    	//將預取計數設定為1,在處理並確認一條訊息前,不要向worker傳送新訊息
    	err = ch.Qos(
    		1,     // prefetch count
    		0,     // prefetch size
    		false, // global
    	)
    	failOnError(err, "ch.Qos() failed")
    
    	// 立即返回一個Delivery的通道
    	msgs, err := ch.Consume(
    		q.Name, // queue
    		"",     // consumer
    		false,  // 注意這裡傳false,關閉自動訊息確認
    		false,  // exclusive
    		false,  // no-local
    		false,  // no-wait
    		nil,    // args
    	)
    	failOnError(err, "consume failed")
    
    	go func() {
    		for d := range msgs {
    			log.Printf("Received message: %s", d.Body)
    			d.Ack(false)
    			time.Sleep(time.Second)
    		}
    	}()
    	<-make(chan bool)
    }
  3. Publish/Sublish
    多個消費者接收到相同內容

    publish
    package main
    
    import (
    	"log"
    	"strconv"
    	"time"
    
    	"github.com/rabbitmq/amqp091-go"
    )
    
    func failOnError(err error, msg string) {
    	if err != nil {
    		log.Fatalf("%s: %s", msg, err)
    	}
    }
    
    func main() {
    	var err error
    	conn, err := amqp091.Dial("amqp://guest:[email protected]:5672/vhost")
    	failOnError(err, "Failed to connet RabbitMQ")
    	defer conn.Close()
    
    	ch, err := conn.Channel()
    	failOnError(err, "Failed to open channel")
    	defer ch.Close()
    
    	err = ch.ExchangeDeclare(
    		"logs",   // 使用命名的交換器
    		"fanout", // 交換器型別
    		true,     // durable
    		false,    // auto-deleted
    		false,    // internal
    		false,    // no-wait
    		nil,      // arguments
    	)
    	failOnError(err, "Failed to declare an exchange")
    
    	msg := "message"
    	for i := 0; i < 10; i++ {
    		body := msg + strconv.Itoa(i)
    
    		err = ch.Publish(
    			"logs", // exchange
    			"",     // routing key
    			false,  // mandatory
    			false,  // immediate
    			amqp091.Publishing{
    				ContentType: "text/plain",
    				Body:        []byte(body),
    			})
    		log.Printf("Sent: %s", body)
    		time.Sleep(time.Second)
    	}
    }
    subscribe
     package main
    
    import (
    	"log"
    	"time"
    
    	"github.com/rabbitmq/amqp091-go"
    )
    
    func failOnError(err error, msg string) {
    	if err != nil {
    		log.Fatalf("%s: %s", msg, err)
    	}
    }
    
    func main() {
    	var err error
    	conn, err := amqp091.Dial("amqp://guest:[email protected]:5672/vhost")
    	failOnError(err, "Failed to connet RabbitMQ")
    	defer conn.Close()
    
    	ch, err := conn.Channel()
    	failOnError(err, "Failed to open channel")
    	defer ch.Close()
    
    	err = ch.ExchangeDeclare(
    		"logs",   // 使用命名的交換器
    		"fanout", // 交換器型別
    		true,     // durable
    		false,    // auto-deleted
    		false,    // internal
    		false,    // no-wait
    		nil,      // arguments
    	)
    	failOnError(err, "Failed to declare an exchange")
    
    	q, err := ch.QueueDeclare(
    		"",    // 空字串作為佇列名稱
    		false, // 非持久佇列
    		false, // delete when unused
    		true,  // 獨佔佇列(當前宣告佇列的連線關閉後即被刪除)
    		false, // no-wait
    		nil,   // arguments
    	)
    	failOnError(err, "queue declare failed")
    
    	err = ch.QueueBind(
    		q.Name, // queue name
    		"",     // routing key
    		"logs", // exchange
    		false,  // no-wait
    		nil,
    	)
    	failOnError(err, "bind failed")
    
    	msgs, err := ch.Consume(
    		q.Name, // queue
    		"",     // consumer
    		true,   // auto-ack
    		false,  // exclusive
    		false,  // no-local
    		false,  // no-wait
    		nil,    // args
    	)
    	failOnError(err, "Failed to register a consumer")
    
    	go func() {
    		for d := range msgs {
    			log.Printf("receve: %s", d.Body)
    			time.Sleep(500 * time.Microsecond)
    		}
    	}()
    	<-make(chan bool)
    }
  4. Routing
    根據字串選擇訊息


    pub
    package main
    
    import (
    	"context"
    	"log"
    	"strconv"
    	"time"
    
    	"github.com/rabbitmq/amqp091-go"
    )
    
    func failOnError(err error, msg string) {
    	if err != nil {
    		log.Fatalf("%s: %s", msg, err)
    	}
    }
    
    func main() {
    	var err error
    	conn, err := amqp091.Dial("amqp://guest:[email protected]:5672/vhost")
    	failOnError(err, "Failed to connet RabbitMQ")
    	defer conn.Close()
    
    	ch, err := conn.Channel()
    	failOnError(err, "Failed to open channel")
    	defer ch.Close()
    
    	msg := "message"
    	for i := 0; i < 10; i++ {
    		ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
    		defer cancel()
    		body := msg + strconv.Itoa(i)
    		err = ch.PublishWithContext(
    			ctx,
    			"amq.direct",      // 指定exchange型別,此處使用預設交換機
    			strconv.Itoa(i%2), // routing key
    			false,             // mandatory
    			false,             // immediate
    			amqp091.Publishing{
    				ContentType: "text/plain",
    				Body:        []byte(body),
    			})
    		failOnError(err, "Failed to publish")
    		log.Printf("Sent %s", body)
    	}
    }
    sub
     package main
    
    import (
    	"log"
    	"os"
    
    	"github.com/rabbitmq/amqp091-go"
    )
    
    func failOnError(err error, msg string) {
    	if err != nil {
    		log.Fatalf("%s: %s", msg, err)
    	}
    }
    
    func main() {
    	var err error
    	conn, err := amqp091.Dial("amqp://guest:[email protected]:5672/vhost")
    	failOnError(err, "Failed to connet RabbitMQ")
    	defer conn.Close()
    
    	ch, err := conn.Channel()
    	failOnError(err, "Failed to open channel")
    	defer ch.Close()
    
    	q, err := ch.QueueDeclare(
    		"",
    		false,
    		true,
    		true,
    		false,
    		nil,
    	)
    	failOnError(err, "Failed to declare queue")
    
    	if len(os.Args) < 2 {
    		panic("必須傳入引數 routing-key")
    	}
    	routingKey := os.Args[1]
    	if routingKey != "0" && routingKey != "1" {
    		panic("必須傳入 0 或 1")
    	}
    	err = ch.QueueBind(
    		q.Name,
    		routingKey,
    		"amq.direct", // 繫結預設交換機
    		false,
    		nil)
    	failOnError(err, "Failed to bind queue")
    
    	msgs, err := ch.Consume(
    		q.Name, // queue
    		"",     // consumer
    		true,   // auto ack
    		false,  // exclusive
    		false,  // no local
    		false,  // no wait
    		nil,    // args
    	)
    	failOnError(err, "Failed to consume")
    
    	go func() {
    		for d := range msgs {
    			log.Printf("Receve: %s", d.Body)
    		}
    	}()
    	<-make(chan bool)
    }
  5. Topics
    根據正則表示式選擇訊息

    使用點分的單詞列表作為 routing_key ,* 代表一個單詞,# 代表零個或多個單詞
    使用#作為key時交換機和fanout型別一致
    不使用*#時與direct一致
    pub
    package main
    
    import (
    	"context"
    	"fmt"
    	"log"
    	"math/rand"
    	"strconv"
    	"time"
    
    	"github.com/rabbitmq/amqp091-go"
    )
    
    func failOnError(err error, msg string) {
    	if err != nil {
    		log.Fatalf("%s: %s", msg, err)
    	}
    }
    
    var local = [...]string{"beijing", "shanghai"}
    var halfDay = [...]string{"AM", "PM"}
    
    func main() {
    	rand.Seed(time.Now().Unix())
    	var err error
    	conn, err := amqp091.Dial("amqp://guest:[email protected]:5672/vhost")
    	failOnError(err, "Failed to connet RabbitMQ")
    	defer conn.Close()
    
    	ch, err := conn.Channel()
    	failOnError(err, "Failed to open channel")
    	defer ch.Close()
    
    	msg := "message"
    	for i := 0; i < 10; i++ {
    		body := msg + strconv.Itoa(i)
    		ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
    		defer cancel()
    		topicKey := fmt.Sprintf("%s.%s", local[rand.Intn(len(local))], halfDay[rand.Intn(len(halfDay))])
    		err = ch.PublishWithContext(
    			ctx,
    			"amq.topic",
    			topicKey,
    			false,
    			false,
    			amqp091.Publishing{
    				ContentType: "text/plain",
    				Body:        []byte(body),
    			})
    		failOnError(err, "Failed to publish")
    		log.Printf("with topic [%s] Sent: %s", topicKey, body)
    	}
    }
    sub
    package main
    
    import (
    	"log"
    	"os"
    
    	"github.com/rabbitmq/amqp091-go"
    )
    
    func failOnError(err error, msg string) {
    	if err != nil {
    		log.Fatalf("%s: %s", msg, err)
    	}
    }
    
    func main() {
    	var err error
    	conn, err := amqp091.Dial("amqp://guest:[email protected]:5672/vhost")
    	failOnError(err, "Failed to connet RabbitMQ")
    	defer conn.Close()
    
    	if len(os.Args) < 2 {
    		panic("必須有引數 topic_key")
    	}
    	for _, topicKey := range os.Args[1:] {
    		ch, err := conn.Channel()
    		failOnError(err, "Failed to open channel")
    		defer ch.Close()
    
    		q, err := ch.QueueDeclare(
    			"",    // name
    			false, // durable
    			false, // delete when unused
    			true,  // exclusive
    			false, // no-wait
    			nil,   // arguments
    		)
    		failOnError(err, "Failed to declare a queue")
    
    		err = ch.QueueBind(
    			q.Name,
    			topicKey,
    			"amq.topic",
    			false,
    			nil)
    		failOnError(err, "Failed to bind queue")
    
    		msgs, err := ch.Consume(
    			q.Name, // queue
    			"",     // consumer
    			true,   // auto ack
    			false,  // exclusive
    			false,  // no local
    			false,  // no wait
    			nil,    // args
    		)
    		failOnError(err, "Failed to register a consumer")
    
    		key := topicKey
    		go func() {
    			for d := range msgs {
    				log.Printf("channel [%10s] receve: %s", key, d.Body)
    			}
    		}()
    	}
    	<-make(chan bool)
    }
    // .\sub.exe  beijing.* shanghai.* *.AM *.PM
  6. RPC
    就是使用兩個Queue,一個接收引數,一個返回結果

API詳解

建立

// ExchangeDeclarePassive 用於判斷Exchange是否存在
func (ch *Channel) ExchangeDeclare(
    name,         // 交換機名稱
    kind string,  // 型別,fanout direct topic
    durable,      // 是否持久化到硬碟
    autoDelete,   // 在有至少一個Exchange或Queue與這個Exchange繫結,解綁後自動刪除
    internal,     // 是否位內建,如果為true客戶端無法直接發訊息到這個Exchange種,只能通過Exchange傳送
    noWait bool,  // 為true時,不接收server發出的是否Declare成功的資訊。出錯時Channel會被關閉
    args Table    // 結構體,其他引數
) error {

    
// 繫結兩個Exchange
func (ch *Channel) ExchangeBind(
    destination, 
    key, 
    source string, 
    noWait bool, 
    args Table
) error {

    
// QueueDeclarePassive 將passive引數設定為true,用於判斷queue是否存在
func (ch *Channel) QueueDeclare(
    name string,   // 佇列名稱,為空字串時返回一個由MQ建立的臨時佇列
    durable,       // 是否持久化
    autoDelete,    // 至少有一個消費者連線到這個Queue,之後所有消費者都斷開時會自動刪除
    exclusive,     // 是否排他,僅對首次宣告它的Connection可見。
                   // 與普通佇列不同,即使是持久化的,一旦連線關閉或client退出,該佇列自動刪除
    noWait bool, 
    args Table
) (Queue, error) {

    
// 繫結一個Exchange和一個Queue
func (ch *Channel) QueueBind(
    name,             // queue名稱
    key,              // BindingKey
    exchange string,  // Exchange名稱
    noWait bool,      // 是否等待
    args Table        // 其他引數結構體
) error {

兩個Exchange繫結

傳送訊息

mandatory(強制的,法定的) : 引數告訴伺服器至少將該訊息路由到一個佇列中,否則將訊息返回給生產者。
immediate (立即的,直接的):(棄用)引數告訴伺服器,如果該訊息關聯的佇列上有消費者,則立刻投遞;如果所有匹配的佇列上都沒有消費者,則直接將訊息返還給生產者,不用將訊息存入佇列而等待消費者了。

func (ch *Channel) PublishWithContext(
    ctx context.Context,  // 一般使用timeoutContext
    exchange,             // 指定Exchange
    key string,           // RoutingKey
    mandatory,            // 為true時,找不到合適Queue時會呼叫 Basic.Return(AMQP協議規定的命令) 命令將訊息返回給生產者
                          // 為false時,直接丟棄
    immediate bool,       // (棄用)為true時,如果匹配的佇列上沒有消費者,這條訊息不存入佇列,
                          // 所有匹配的佇列都沒有消費者時使用 Basic.Return 返回至生產者
    msg Publishing
) error {
    // 此函式返回一個DeferredConfirmation,用於接收訊息的確認,用於確認模式
	_, err := ch.PublishWithDeferredConfirmWithContext(ctx, exchange, key, mandatory, immediate, msg)
	return err
}

接收訊息

消費分兩種

使用Consume會將Channel設定為接收模式,直到取消佇列的訂閱為止。不能使用迴圈加Get代替Consume,這回嚴重影響效能。實現高吞吐應使用Consume

// 推模式,go中應當使用一個新協程處理chan;java中使用回撥函式,由一個新的執行緒池呼叫callback
func (ch *Channel) Consume(
    queue, 
    consumer string,   消費者標籤,用於區分消費者
    autoAck, 
    exclusive,         是否排他
    noLocal,           The noLocal flag is not supported by RabbitMQ.
    noWait bool, 
    args Table
) (<-chan Delivery, error) {


// 拉模式,一次只獲取一條
func (ch *Channel) Get(
    queue string, 
    autoAck bool
) (msg Delivery, ok bool, err error) {

訊息確認與拒絕

autoAck為true時,自動把發出的訊息置為確認,不管消費者是否真的消費到;
為false時,將發出的訊息標記為刪除,接收到Ack後才從記憶體或磁碟刪除,如果一直沒有ack且消費者斷開則重發。
建議使用false

func (ch *Channel) Reject(
    tag uint64,  // 即 Channel.DeliveryTag,是訊息的編號
    requeue bool // 為true則mq發給下一個訂閱者
                 // 為false則mq丟棄,啟用“死信佇列”
) error {
    
// 批量拒絕
func (ch *Channel) Nack(
    tag uint64, 
    multiple bool, // 為false時與前一個一樣,為true時拒絕所有tag標號之前的未被當前消費者確認的訊息
    requeue bool
) error {

// 讓mq重發未被確認的訊息
func (ch *Channel) Recover(
    requeue bool   // 為false則發給相同消費者,為true則可能發給任何消費者
) error {

高階設定

備份交換機 Alternate Exchange

未被路由的訊息會被存在MQ中

t := amqp091.NewConnectionProperties() // t就是 map[string]interface{}
t["alternate-exchange"] = "altExg"

err = ch.ExchangeDeclare(
    "myAe",
    "fanout", // 如果為其他,可能因為不匹配而丟失資訊
    true,
    false,
    false,
    false,
    t,
)
// ...繫結佇列...
  • 如果設定的備份交換器不存在,客戶端和 RabbitMQ 服務端都不會有異常出現,此時訊息會丟失。
  • 如果備份交換器沒有繫結任何佇列,客戶端和 RabbitMQ 服務端都不會有異常出現,此時訊息會丟失。
  • 如果備份交換器沒有任何匹配的佇列,客戶端和 RabbitMQ 服務端都不會有異常出現,此時訊息會丟失。
  • 如果備份交換器和 mandatory 引數一起使用,那麼 mandatory 引數無效。

TTL

毫秒為單位

設定訊息的 TTL

  1. 通過佇列屬性設定
    過期的一定在佇列頭部,一旦過期立即從佇列中抹去
    // 如果不設定 TTL,則表示此訊息不會過期;如果將 TTL 設定為 0,
    // 則表示除非此時可以直接將訊息投遞到消費者,否則該訊息會被立即丟棄
    t := amqp091.NewConnectionProperties()
    t["x-message-ttl"] = 6000
    
    q, err := ch.QueueDeclare(
        "tmp_queue",
        true,
        false,
        false,
        false,
        t)
  2. 對訊息本身進行單獨設定
    每條訊息的過期時間不同,即使過期也不會立即抹去,是在即將投遞到消費者之前判定的
    err = ch.PublishWithContext(
        ctx,
        "amq.topic",
        topicKey,
        false,
        false,
        amqp091.Publishing{
            ContentType: "text/plain",
            Body:        []byte(body),
            Expiration: "600",      // 設定超時時間
        })

以兩者之間較小的那個數值為準

一旦超過設定的 TTL 值時,就會變成“死信”(Dead Message)

設定佇列的 TTL

控制佇列被自動刪除前處於未使用狀態的時間。未使用的意思是佇列上沒有任何的消費者,佇列也沒有被重新宣告,且在過期時間段內也未呼叫過 Basic.Get 命令。

RabbitMQ 會確保在過期時間到達後將佇列刪除,但是不保障刪除的動作有多及時。在RabbitMQ 重啟後,持久化的佇列的過期時間會被重新計算。

t := amqp091.NewConnectionProperties()
t["x-expires"] = 1800000

q, err := ch.QueueDeclare(
    "tmp_queue",
    true,
    false,
    false,
    false,
    t)

死信佇列

DLX,全稱為 Dead-Letter-Exchange,可以稱之為死信交換器。訊息變為死信後發到DLX

訊息變成死信一般是由於以下幾種情況:

  1. 訊息被拒絕(Basic.Reject/Basic.Nack),並且設定 requeue 引數為 false;
  2. 訊息過期;
  3. 佇列達到最大長度。

DLX 也是一個正常的交換器,和一般的交換器沒有區別,它能在任何的佇列上被指定,實際上就是設定某個佇列的屬性。當這個佇列中存在死信時,RabbitMQ 就會自動地將這個訊息重新發布到設定的 DLX 上去,進而被路由到另一個佇列,即死信佇列。

通過在 channel.queueDeclare 方法中設定 x-dead-letter-exchange 引數來為這個佇列新增 DLX

t := amqp091.NewConnectionProperties()
t["x-message-ttl"]="10000"
t["x-dead-letter-exchange"]="dlx_exchange" // 指定死信交換機
t["x-dead-letter-routing-key"]="routingkey"
q, err := ch.QueueDeclare(
    "task_queue", // name
    true,         // 持久的
    false,        // delete when unused
    false,        // 獨有的
    false,        // no-wait
    t,          // arguments
)

延遲佇列

DLX 配合 TTL 使用還可以實現延遲佇列的功能

優先順序佇列

通過設定佇列的 x-max-priority 引數來設定佇列最大優先順序,每次傳送訊息時設定訊息優先順序

t := amqp091.NewConnectionProperties()
t["x-max-priority"]=10 // 設定的是佇列的最大優先順序
q, err := ch.QueueDeclare("task_queue",true,false,false,false,t)

err = ch.Publish(
    "logs", // exchange
    "",     // routing key
    false,  // mandatory
    false,  // immediate
    amqp091.Publishing{
        ContentType: "text/plain",
        Body:        []byte(body),
        Priority: 5,               // 每次傳送訊息時都要設定優先順序
    })

持久化

Exchange、Queue 通過建立時的durable引數設定。

佇列的持久化能保證其本身的元資料不會因異常情況而丟失,但是並不能保證內部所儲存的
訊息不會丟失。訊息持久化要在傳送時設定。

寫入磁碟非常慢,所以持久化對吞吐量影響非常大

持久化不能保證資料百分比不丟失:寫入磁碟時先寫入系統快取,然後呼叫 fsync 同步存檔,如果此時宕機,訊息將丟失。

可以使用主從複製的映象佇列機制,master掛掉,自動切到slave。仍有可能丟失資料,但是可靠多了。

只設置訊息的持久化,重啟之後佇列消失,繼而訊息也丟失。單單設定訊息持久化而不設定佇列的持久化顯得毫無意義。

	err = ch.Publish(
		"logs", // exchange
		"",     // routing key
		false,  // mandatory
		false,  // immediate
		amqp091.Publishing{
			ContentType: "text/plain",
			Body:        []byte(body),
			DeliveryMode: 2,             // 實現訊息持久化
		})

生產者確認

預設情況下發送訊息的操作是不會返回任何資訊給生產者的,生產者是不知道訊息有沒有正確地到達伺服器。

兩種解決方案:

  1. 事務機制
  2. 傳送方確認機制

事務機制

java中使用

channel.txSelect()
channel.txCommit()
channel.txRollback()

go中使用

ch.Tx()
ch.TxCommit()
ch.TxRollback()

事務提交

  • 客戶端傳送 Tx.Select,將通道置為事務模式;
  • Broker 回覆 Tx.Select-Ok,確認已將通道置為事務模式;
  • 在傳送完訊息之後,客戶端傳送 Tx.Commit 提交事務;
  • Broker 回覆 Tx.Commit-Ok,確認事務提交。

事務回滾

事務會大大降低效能

傳送方確認機制

生產者將通道設定成 confirm(確認)模式,一旦通道進入 confirm 模式,所有在該通道上面釋出的訊息都會被指派一個唯一的 ID(從 1 開始),一旦訊息被投遞到所有匹配的佇列之後,RabbitMQ 就會發送一個確認(Basic.Ack)給生產者(包含訊息的唯一 ID),這就使得生產者知曉訊息已經正確到達了目的地了。如果訊息和佇列是可持久化的,那麼確認訊息會在訊息寫入磁碟之後發出。RabbitMQ 回傳給生產者的確認訊息中的 deliveryTag 包含了確認訊息的序號,此外 RabbitMQ 也可以設定 channel.basicAck 方法中的 multiple 引數,表示到這個序號之前的所有訊息都已經得到了處理,可以參考圖 4-10。注意辨別這裡的確認和消費時候的確認之間的異同。

// 建立一個接收確認訊息的chan
confirms := ch.NotifyPublish(make(chan amqp.Confirmation, 1)) // 處理確認邏輯
go func (confirms <-chan amqp.Confirmation) {
    if confirmed := <-confirms; confirmed.Ack {
        fmt.Printf("confirmed delivery with delivery tag: %d", confirmed.DeliveryTag)
    } else {
        fmt.Printf("confirmed delivery of delivery tag: %d", confirmed.DeliveryTag)
    }
}(confirms)

其他用於確認的函式:

注意:

  1. 事務和傳送確認是互斥的
  2. 這隻能確保正確傳送到Exchange

消費端

  1. 訊息分發
  2. 訊息順序性
  3. 棄用QueueingConsumer

訊息分發

預設使用輪詢方法

// 限制通道上的消費者所能保持的最大未確認訊息的數量
// 類比 TCP/IP 的滑動視窗
func (ch *Channel) Qos(
    prefetchCount,    // 所能接收的所有未確認訊息的數量,為0表示沒有上限
    prefetchSize int, // 所能接收的所有未確認訊息的總體大小上限,單位位元組,為0沒有上限
    global bool       // 為true通道Channel上所有消費者都遵從限制,否則只有新的消費者遵從
) error {

訊息順序性

可能破壞順序性的情況:

  1. 生產者使用事務,出現異常後回滾,使用另一執行緒補償傳送
  2. 使用延遲佇列時錯序
  3. 存在優先順序
  4. ...

要保證訊息順序性,要業務方做進一步處理,如新增全域性有序標識(Sequence ID)

訊息傳輸保障

傳輸保障的3個層級:

  1. At most once:最多一次。訊息可能會丟失,但絕不會重複傳輸。
    無需考慮太多,但可能丟失資料
  2. At least once:最少一次。訊息絕不會丟失,但可能會重複傳輸。
    考慮一下幾方面
    1. 生產者開啟事務或publisher confirm
    2. 使用備份交換機,保證訊息能到佇列
    3. 持久化
    4. 手動ack
  3. Exactly once:恰好一次。每條訊息肯定會被傳輸一次且僅傳輸一次。(不支援)

RabbitMQ管理

兩個工具 rabbitmqctlrabbitmq-plugins 。使用 rabbitmq -plugins list 檢視所有外掛,使用 rabbitmq -plugins enable rabbitmq _management 並重啟開啟web介面

應用管理

rabbitmq-server -detached    # 啟動Erlang虛擬機器和應用
rabbitmqctl stop_app         # 停止MQ,但不關閉Erlang虛擬機器(在執行其他停止RabbitMQ的操作前使用,如 rabbitmqctl reset)
rabbitmqctl start_app        # 啟動應用,但前提是Erlang虛擬機器已經啟動
rabbitmqctl stop [pid_file]  # 關閉MQ和Erlang,指定 pid_file 後阻塞至關閉結束
rabbitmqctl shutdown         # 阻塞至關閉
rabbitmqctl wait [pid_file]  # 等待mq的啟動
rabbitmqctl reset            # 重置到最初狀態
rabbitmqctl force_reset      # 可能損壞資料或配置
rabbitmqctl rotate_logs {suffix}   # 輪換日誌,用於日誌分割。將已有日誌更名為原檔名+字尾,以後的日誌繼續寫入舊檔名

叢集管理

rabbitmqctl join_cluster {cluster_node} [--ram]  # 加入叢集,執行前要停止MQ並重置節點
rabbitmqctl cluster_status
rabbitmqctl change_cluster_node_type {disc|ram}  # 修改節點型別
rabbitmqctl forget_cluster_node [--offline]      # 從叢集刪除,允許離線執行
rabbitmqctl update_cluster_nodes {clusternode}   # 用於再次加入同一叢集時跟新狀態
rabbitmqctl force_boot                           # 強制啟動。一般,重啟的第一個節點應該是最後關閉的節點,因為它可以看到其他節點所看不到的事情。
rabbitmqctl sync_queue [-p vhost] {queue}        # 開啟主從複製
rabbitmqctl cancel_sync_queue [-p vhost] {queue} # 關閉主從複製
rabbitmqctl set_cluster_name {name}

服務端狀態

rabbitmqctl list_queues [-p vhost] [queueinfoitem ...]
rabbitmqctl list_exchanges [-p vhost] [exchangeinfoitem ...]
rabbitmqctl list_bindings [-p vhost] [bindinginfoitem ...] 
rabbitmqctl list_connections [connectioninfoitem ...] 
rabbitmqctl list_channels [channelinfoitem ...]
rabbitmqctl list_consumers [-p vhost]
rabbitmqctl status 
rabbitmqctl node_health_check
rabbitmqctl environment             顯示每個執行程式環境中每個變數的名稱和值
rabbitmqctl report                  為所有伺服器狀態生成一個伺服器狀態報告
rabbitmqctl eval {expr}             執行任意 Erlang表示式

配置

3種方式,優先順序如下:

  1. 環境變數
  2. 配置檔案
  3. 執行時引數和策略

環境變數

都是以“RABBITMQ_”開頭的,預設使用 rabbit@$HOSTNAME 作為名稱

還可以在rabbitmq-env.conf檔案種配置環境變數,只是沒有了開頭的 RABBITMQ_

配置檔案

https://github.com/rabbitmq/rabbitmq-server/blob/stable/docs/rabbitmq.config.example

配置檔案的位置取決於不同的作業系統和安裝包。最有效的方法就是檢查RabbitMQ 的服務日誌,在啟動 RabbitMQ 服務的時候會列印相關資訊。如 -config /etc/rabbitmq/rabbitmq

# 一個配置檔案示例,注意最後的 . 號
[
    {
        rabbit, [
            {tcp_listeners, [5673]}
        ]
    }
].

配置網路

配置網路
# 緩衝區越大,吞吐量也會越高,但是每個連線上耗費的記憶體也就越多
# 將 TCP 緩衝區大小設定為 192KB
[
    {rabbit, [
         {tcp_listen_options, [
            {backlog, 128},
            {nodelay, true},    # 禁用Nagles演算法
            {linger, {true,0}},
            {exit_on_close, false},
            {sndbuf, 196608},
            {recbuf, 196608}
        ]}
    ]}
].


[
    {kernel, [
        {inet_default_connect_options, [{nodelay, true}]},
        {inet_default_listen_options, [{nodelay, true}]}
    ]},
    {rabbit, [
        {tcp_listen_options, [
            {backlog, 4096},
            {nodelay, true},
            {linger, {true,0}},
            {exit_on_close, false}
        ]}
    ]}
].

RabbitMQ 運維

叢集搭建

單臺 RabbitMQ 伺服器可以滿足每秒 1000 條訊息的吞吐量,當叢集中一個 RabbitMQ 節點崩潰時,該節點上的所有佇列中的訊息也會丟失。RabbitMQ 叢集中的所有節點都會備份所有的元資料資訊,包括以下內容。

  • 佇列元資料:佇列的名稱及屬性;
  • 交換器:交換器的名稱及屬性;
  • 繫結關係元資料:交換器與佇列或者交換器與交換器之間的繫結關係;
  • vhost 元資料:為 vhost 內的佇列、交換器和繫結提供名稱空間及安全屬性。

不會備份訊息,但可以通過映象佇列解決。叢集中只在單個節點上而不是所有節點建立和包含完整的佇列資訊和訊息。其他非所有者節點只知道佇列的元資料和指向該佇列存在的那個節點的指標。因此崩潰時會丟失訊息。

不同於佇列那樣擁有自己的程序,交換器其實只是一個名稱和繫結列表。當訊息釋出到交換器時,實際上是由所連線的通道將訊息上的路由鍵同交換器的繫結列表進行比較,然後再路由訊息。當建立一個新的交換器時,RabbitMQ 所要做的就是將繫結列表新增到叢集中的所有節點上。這樣,每個節點上的每條通道都可以訪問到新的交換器了。

多機多節點配置

應只在區域網使用,廣域網種使用 Federation 或者 Shovel 來代替。

  1. 修改hosts檔案,新增主機名
  2. 編輯 RabbitMQ cookie 檔案,以確保各個節點的 cookie 檔案使用的是同一個值
    可以讀取 node1 節點的 cookie 值,然後將其複製到 node2 和 node3 節點中。
    /var/lib/rabbitmq/.erlang.cookie 或者$HOME/.erlang.cookie
  3. 配 置 集 群
    三臺機器:node1 node2 node3
    1. 分別啟動機器 rabbitmq-server –detached
    2. 以node1為基準,將node2和node3加入叢集|
      rabbitmqctl stop_app
      rabbitmqctl reset
      rabbitmqctl join_cluster rabbit@node1
      rabbitmqctl start_app

如果關閉了叢集中的所有節點,則需要確保在啟動的時候最後關閉的那個節點是第一個啟動的。如果第一個啟動的不是最後關閉的節點,那麼這個節點會等待最後關閉的節點啟動。這個等待時間是 30 秒,如果沒有等到,那麼這個先啟動的節點也會失敗。在最新的版本中會有重試機制,預設重試 10 30 秒以等待最後關閉的節點啟動。

若因異常無法正常啟動用 rabbitmqctl forget_cluster_node將其剔除叢集。
使用 rabbitmqctl force_boot 強制啟動

叢集節點型別

記憶體節點將所有的佇列、交換器、繫結關係、使用者、許可權和 vhost的元資料定義都儲存在記憶體中,而磁碟節點則將這些資訊儲存到磁碟中。
單節點的叢集中必然只有
磁碟型別的節點,否則當重啟 RabbitMQ 之後,所有關於系統的配置資訊都會丟失。
不過在叢集中,可以選擇配置部分節點為記憶體節點,這樣可以獲得更高的效能。

rabbitmqctl join_cluster rabbit@node1 --ram # 當前節點作為記憶體節點加入
# 沒有引數預設磁碟節點
rabbitmqctl change_cluster_node_type {disc,ram} # 切換節點型別,執行命令前後要執行 stop_app start_app

記憶體節點速度快,磁碟節點更持久。叢集要求至少一個磁碟節點,當磁碟節點崩潰後集群仍能執行但是無法修改佇列、許可權、使用者等狀態。應當保證叢集至少有兩個磁碟節點。節點加入或者離開叢集時,它們必須將變更通知到至少一個磁碟節點。

 剔除單個節點

rabbitmqctl forget_cluster_node rabbit@node2           # 在node1或node3上執行將其剔除
rabbitmqctl forget_cluster_node rabbit@node2 –offline # 上條命令要求rabbitmq服務線上,這條不需要
rabbitmqctl reset       # 重置當前服務,建議這種方式

 叢集節點的升級

 如果由單節點組成,只要關閉原服務,再解壓新版本即可。

叢集由多個節點組成,具體步驟:

  1. 關閉所有節點的服務,注意採用 rabbitmqctl stop 命令關閉。
  2. 儲存各個節點的 Mnesia 資料。
  3. 解壓新版本的 RabbitMQ 到指定的目錄。
  4. 指定新版本的 Mnesia 路徑為步驟 2 中儲存的 Mnesia 資料路徑。
  5. 啟動新版本的服務,注意先重啟原版本中最後關閉的那個節點

日誌

位於$RABBITMQ_HOME/var/log/rabbitmqRABBITMQ_NODENAME-sasl.log
RABBITMQ_NODENAME.log

可用  tail -f $RABBITMQ_HOME/var/log/rabbitmq/rabbit@$HOSTNAME.log -n 200 實時檢視

rabbitmqctl rotate_logs {suffix} 輪換日誌,可使用crontab

預設交換機 amq.rabbitmq.trace 就是用於收集日誌的,其為topic型別,可訂閱 queue.debug、queue.info、queue.warning、queue.error四個級別用於訂閱不同主題,使用#可訂閱所有日誌。

單節點故障恢復

移除故障節點:rabbitmqctl forget_cluster_node {nodename}

節點重啟後不要直接加入叢集,否則會引起網路分割槽。要執行 rabbitmqctl forget_cluster_node {nodename} 將其剔除後再作為新節點加入。

叢集遷移

擴容較為簡單,直接加入新節點即可,但是新節點中沒有佇列建立。只有後面新建佇列才會進入這個新節點。

遷移用於解決擴容和叢集故障的問題。

元資料重建

在新叢集中建立與舊叢集相同的交換機、佇列、使用者等元資料。可從web管理介面下載和匯入。

新叢集有資料與 metadata.json 中的資料相沖突,對於交換器、佇列及繫結關係這類非可變物件而言會報錯,而對於其他可變物件如 Parameter、使用者等則會被覆蓋,沒有發生衝突的則不受影響。如果過程中發生錯誤,則匯入過程終止,導致 metadata.json 中只有部分資料載入成功。

有3個問題:

  1. 機器故障,無法獲得json檔案
    可以採取一個通用的備份任務,在元資料有變更或者達到某個儲存週期時將最新的 metadata.json 備份至另一處安全的地方。
  2. 新舊叢集的 RabbitMQ 版本不一致
    一般高版本價值低版本的沒有問題
    直接修改json檔案內容
  3. 所有的佇列都只會落到同一個叢集節點上,而其他節點處於空置狀態
    通過程式(或者指令碼)的方式在新叢集上建立元資料,而非簡單地在頁面上上傳元資料檔案而已

資料遷移與客戶端連線的切換

先將生產者客戶端斷開連線,然後接入新叢集。

消費者客戶端可直接切換也可等舊叢集中訊息消費完後再切換。

資料遷移原理是將原叢集中訊息取出再發布到新叢集,RabbitMQ本身提供的 Federation 和 Shovel 外掛都可以實現。也可以自己編寫。

自動化遷移

在使用相關資源時就做好一些準備工作,方便在自動化遷移過程中進行無縫切換。與生產者和消費者客戶端相關的是交換器、佇列及叢集的資訊,一旦改變就要讓客戶端感知到,然後載入到ZooKeeper或etcd。如圖,叢集分為三部分:客戶端、叢集、ZooKeeper配置管理

建立元資料資源時都要在Zookeeper中建立相應資源。客戶端在互動時在相應的ZooKeeper節點中新增watcher,以便資料變化時相應變更。

元資料管理

元資料(如Exchange、Queue、使用者)的操作應當通過元資料稽核系統申請操作,申請後由專門的人審批,之後在資料庫和RabbitMQ中建立相應的元資料,由專門的人審批。通過後分別儲存。資料庫和 RabbitMQ 叢集之間會有一個元資料一致性校驗程式來檢測元資料不一致的地方,不同之處推上監控管理系統。然後人工修改。

主要有queues、exchange、bindings三張表。

跨越叢集界限

RabbitMQ有 3 種方式實現分散式部署:叢集、Federation 和 Shovel。可以搭配使用

Federation

Federation Plugin — RabbitMQ

主要用於不同城市間的MQ叢集通訊,在不同的 Broker 節點之間進行訊息傳遞而無須建立叢集

  1. Federation外掛能在不同域(使用者、vhost、應用、Erlang)中傳遞訊息
  2. 能容忍不穩定的網路連線情況
  3. 一個Broker節點可同時建立聯邦交換器(或佇列)或者本地交換器(或佇列),只需要對特定的交換器(或佇列)建立 Federation 連線(Federation link
  4. 不需要在N個節點中建立N2個連線,意味著更容易拓展

Federation 外掛可以讓多個交換器或者多個佇列進行聯邦,聯邦交換器和聯邦佇列可接收上游訊息。
聯邦交換器能夠將原本傳送給上游交換器的訊息路由到本地的某個佇列中;
聯邦佇列則允許一個本地消費者接收到來自上游佇列(upstream queue)的訊息

聯邦交換器

broker1 的訊息要傳遞給 broker3 的 exchangeA 消費,broker3 使用 Federation 外掛在broker1 中建立一個新的 exchange(默認同名)、一個聯邦交換機exchangeA->broker3 B、一個佇列。其中broker1中exchangeA和聯邦交換機繫結的key和broker3中exchangeA和Queue繫結的key一致。

向broker1的exchangeA傳送訊息,轉發到聯邦交換機再通過聯邦佇列傳送到broker3,可以從broker3中的佇列接收到訊息。

聯邦交換機可以作為其他聯邦交換機的上游

聯邦佇列

聯邦佇列可以在多個Broker 節點(或者叢集)之間為單個佇列提供均衡負載的功能。一個聯邦佇列可以連線一個或者多個上游佇列

當消費者消費broker2中queue1或queue2中訊息時,會先消費本地訊息,沒有了就到broker1中拉取。

有消費者消費broker1中queue1,又有消費者消費brker2中queue1,則實現了類似負載均衡的效果。

使用

要配置兩個功能:

  1. 需要配置一個或多個 upstream,每個 upstream 均定義了到其他節點的 Federation link。
  2. 定義匹配交換器或者佇列的一種/多種策略(Policy)

方法:

  1. rabbitmq-plugins enable rabbitmq_federation開啟外掛,其基於AMQP拉取協議,也會開啟 amqp_client 外掛
    rabbitmq-plugins enable rabbitmq_federation_management 開啟管理外掛
    當需要在叢集中使用 Federation 功能的時候,叢集中所有的節點都應該開啟 Federation 外掛
  2. 為下游MQ指定upstream
    三種方法
    1. rabbitmqctl set_parameter federation-upstream f1  \
      '{"uri":"amqp://root:root123@<upstream-ip>:5672","ack-mode":"on-confirm"}'
    2. 使用HTTP API
      curl -i -u root:root123 -XPUT -d \
      '{"value":{"uri":"amqp://root:[email protected]:5672","ack-mode":"on-confirm"}}' \
      http://192.168.0.4:15672/api/parameters/federation-upstream/%2f/f1
    3. 安裝聯邦管理工具後再web介面新增
  3. 定義一個 Policy 用於匹配交換器 exchangeA,並使用第二步中所建立的 upstream
    1. rabbitmqctl set_policy --apply-to exchanges p1 "^exchange" '{"federation-upstream":"f1"}'
    2. HTTP API
      curl -i -u root:root123 -XPUT -d \
      '{"pattern":"^exchange","definition":{"federation-upstream":"f1"},"apply-to":"exchanges"}' \
      http://192.168.0.4:15672/api/policies/%2F/p1
    3. 再web介面的 "Admin"->"Policies"->"Add/update a policy" 中建立

Shovel

Shovel Plugin — RabbitMQ

可靠、持續地從一個Broker中地佇列拉取資料並轉發至另一個Broker的交換機。源和目的地可以是一個Broker。

優勢:1. 鬆耦合。Broker可包含不同的使用者和vhost,不同的MQ和Erlang版本 2.支援廣域網 3.高度定製,可配置AMQP命令

Shovel 的原理

看起來像是直接從queue1傳送到queue2,實際上中間經過了預設交換機。queue2中訊息會被添上一些頭資訊。

使用

rabbitmq-plugins enable rabbitmq_shovel & rabbitmq-plugins enable rabbitmq_shovel_management 開啟外掛,web介面中會多出Shovel相關的選項。

兩種配置方式:靜態(在 rabbitmq.config)動態(通過Runtime Parameter)

# 靜態配置格式,source destination queue 必須
{shovel_name, [ {sources, [ ... ]}
                , {destinations, [ ... ]}
                , {queue, queue_name}
                , {prefetch_count, count}
                , {ack_mode, a_mode}
                , {publish_properties, [ ... ]}
                , {publish_fields, [ ... ]}
                , {reconnect_delay, reconn_delay}
                ]}
示例
                
[{rabbitmq_shovel,
    [{shovels,
        [{hidden_shovel,
            [{sources,
                [{broker, "amqp://root:[email protected]:5672"},
                {declarations,
                [
                {'queue.declare',[{queue, <<"queue1">>}, durable]},
                {'exchange.declare',[
                        {exchange, <<"exchange1">>},
                        {type, <<"direct">>},
                        durable
                    ]
                },
                {'queue.bind',[
                    {exchange, <<"exchange1">>},
                        {queue, <<"queue1">>},     
                        {routing_key, <<"rk1">>}
                    ]
                }]}]},
            {destinations,
                [{broker, "amqp://root:[email protected]:5672"},
                {declarations,
                [
                {'queue.declare',[{queue, <<"queue2">>}, durable]},
                {'exchange.declare',[
                        {exchange, <<"exchange2">>},
                        {type, <<"direct">>},
                        durable
                    ]
                },
                {'queue.bind',[
                    {exchange, <<"exchange2">>},
                        {queue, <<"queue2">>},
                        {routing_key, <<"rk2">>}
                    ]
                }]}]},
            {queue, <<"queue1">>},
            {ack_mode, no_ack},
            {prefetch_count, 64},
            {publish_properties, [{delivery_mode, 2}]},
            {add_forward_headers, true},
            {publish_fields, [{exchange, <<"exchange2">>},
            {routing_key,<<"rk2">>}]},
        {reconnect_delay, 5}]
        }]
    }]
}]

 動態配置:

  1. 命令列
    rabbitmqctl set_parameter shovel hidden_shovel \
    '{"src-uri":"amqp://root:[email protected]:5672",
    "src-queue":"queue1",
    "dest-uri":"amqp://root:[email protected]:5672","src-exchange-key":"rk2",
    "prefetch-count":64, "reconnect-delay":5, "publish-properties":[],
    "add-forward-headers":true, "ack-mode":"on-confirm"}'
  2. HTTP API
    curl -i -u root:root123 -XPUT -d
    '{"value":{"src-uri":"amqp://root:[email protected]:5672","src-queue":"que
    ue1",
    "dest-uri":"amqp://root:[email protected]:5672","src-exchange-key":"rk2",
    "prefetch-count":64, "reconnect-delay":5, "publish-properties":[],
    "add-forward-headers":true, "ack-mode":"on-confirm"}}'
    http://192.168.0.2:15672/api/parameters/shovel/%2f/hidden_shovel
  3. web介面

 訊息堆積

 適量的訊息堆積有削峰、快取的作用,但堆積過重會影響服務。可以丟棄資料,也可以使用Shovel將訊息轉移給其他叢集。

當叢集1中堆積嚴重時,將訊息轉發給其他叢集,當堆積緩解後停止轉移,合適條件下再轉移回原叢集

上圖為“一對一”,還可以有“一對多”


RabbitMQ高階 

一個佇列的內部存儲其實是由 5 個子佇列來流轉運作的,佇列中的訊息可以有 4 種不同的狀態

在使用 RabbitMQ 時儘量不要有過多的訊息堆積,不然會影響整體服務的效能

儲存機制

持久化訊息到達時寫入磁碟,記憶體充裕時寫入記憶體,記憶體吃緊時從記憶體清除。非持久化訊息只寫入記憶體,記憶體不足時換入磁碟。

映象佇列

單點故障時交換機和繫結關係能倖免於難,但佇列和訊息不可以。映象佇列將佇列複製到其他節點,一個節點失效,自動切換到其他節點。每組映象佇列都包含一個master和多個slave。

slave與master上狀態相同。master失效時時間最長的slave會成為新的master。發到映象佇列的所有訊息都同時發到master和所有slave。除傳送訊息(Basic.Public)外所有動作都發到master再由master將結果廣播給slave。

消費者與slave的連線,本質是slave從master獲得訊息再發給消費者。並非像mysql一樣的負載均衡。RabbitMQ中的master和slave是針對佇列而言的,其可以均勻的分佈在叢集的各個節點中以達到物理機器負載均衡的目的。

映象佇列支援釋出確認和事務兩種機制,只有所有節點都完成了事務或確認,生產者的當前訊息才算被接收了。

網路分割槽

RabbitMQ的資料一致性原理:將映象組成環形,在master上執行確認命令,之後轉向B然後C、D節點,最後由D執行後返回給A,這樣才確認了一條訊息。這樣可以保證更強的一致性。如果C阻塞,那麼整個A->B->C->D->A 就會阻塞,所以要將異常節點剝離出來,確保MQ服務的可靠。

網路分割槽大多是由單個節點的網路故障引起的,且通常會形成一個大分割槽和一個單節點的分割槽,如果之前又配置了映象,那麼可以在不影響服務可用性,不丟失訊息的情況下從網路分割槽的情形下得以恢復。

由 net_ticktime 決定節點間超時時間。使用 rabbitmqctl cluster_status 檢視,或WEB介面,或使用HTTP API curl -i -u root:root123 -H "content-type:application/json" -X GET http://ip:15672/api/nodes 

手動處理分割槽

  1. 挑選信任分割槽
    分割槽中disc節點、節點數、佇列數、客戶端數越多越好,越靠前越重要。
  2. 重啟非信任節點
    推薦使用 rabbitmqctl start_app 命令
  3. 如果還有警告,重啟信任分割槽中節點

注意重啟節點時映象佇列的“漂移”現象:master集中到某一個節點上。可以重啟之前先刪除映象佇列的配置,這樣能夠在一定程度上阻止佇列的“過分漂移”。

1:掛起生產者和消費者程序。這樣可以減少訊息不必要的丟失,如果程序數過多,情形又比較緊急,也可跳過此步驟。
2:刪除映象佇列的配置。
3:挑選信任分割槽。
4:關閉非信任分割槽中的節點。採用 rabbitmqctl stop_app 命令關閉。
5:啟動非信任分割槽中的節點。採用與步驟 4 對應的 rabbitmqctl start_app命令啟動。
6:檢查網路分割槽是否恢復,如果已經恢復則轉步驟 8;如果還有網路分割槽的報警則轉步驟 7。
7:重啟信任分割槽中的節點。
8:新增映象佇列的配置。
9:恢復生產者和消費者的程序。

自動處理分割槽

預設不開啟,有3種方式:pause-minority 模式、pause-if-all-down 模式 autoheal 模式

# 可在 rabbitmq.config 中配置
[
    {
        rabbit, [
            {cluster_partition_handling, ignore}
        ]
    }
].

pause-minority 模式

叢集觀察到有節點“down”時自動檢測自己是否是“少數派”,會使用 rabbitmqctl stop_app 自動關閉這些節點。大多數節點得以繼續執行。關閉節點每秒檢測一次是否可連線到剩餘叢集中,可以則啟動自身 rabbitmqctl start_app

[
    {
        rabbit, [
            {cluster_partition_handling, pause_minority}
        ]
    }
].

當出現2v2、3v3 這種對等分割槽時可能會關閉所有機器。

pause-if-all-down 模式

叢集中的節點在和所配置的列表中的任何節點不能互動時才會關閉 {pause_if_all_down, [nodes], ignore|autoheal}。如下方配置,任何節點無法與 rabbot@node1 通訊時會關閉自己的應用,如果是它發生故障,所有節點都關閉。直到網路恢復再重啟。

注意這種模式有 ignore 和 autoheal 兩種配置。ignore不會關閉節點,要配置為autoheal。

[
    {
        rabbit, [
            {cluster_partition_handling,
                {pause_if_all_down, ['rabbit@node1'], autoheal}}
        ]
    }
].

autoheal 模式

分割槽時自動決定一個獲勝分割槽,重啟不在分割槽中的節點。判斷獲勝的依據依次是:客戶端連線數,節點數,節點名稱字典序。

[
    {
        rabbit, [
            {cluster_partition_handling, autoheal}
        ]
    }
].

各種分割槽處理自動模式比較

配置與不可靠網路要使用 Federation 或 Shovel。即使網路恢復了,也要防止二次分割槽

  1. ignore:發生分割槽時不做任何動作,要人工介入
  2. pause-minority:對等分割槽處理不夠優雅,可能關閉所有節點。可用於非跨機架、奇數節點的叢集
  3. pause-if-all-down:對受信節點的選擇極為考究
  4. autoheal:可處理各情形下網路分割槽。有節點處於非執行狀態時此模式失效

RabbitMQ擴充套件

訊息追蹤

Firehose

將生產者投遞給 RabbitMQ 的訊息,或者 RabbitMQ 投遞給消費者的訊息按照指定的格式傳送到預設的交換器上。這個預設的交換器的名稱為 amq.rabbitmq.trace,它是一個 topic 型別的交換器。傳送到這個交換器上的訊息的路由鍵為 publish.exchangename} deliver.{queuename}rabbitmqctl trace_on [-p vhost] 開啟外掛。

發訊息給exchange,佇列 1、3、4 接收到被封裝的訊息
發訊息給exchange.another,佇列 1、3 接收到被封裝的訊息
消費queue,佇列 2、3、5 接收到被封裝的訊息
消費queue.another,,佇列 2、3 接收到被封裝的訊息

rabbitmq_tracing

命令 rabbitmq-plugins enable rabbitmq_tracing 開啟Firehose的GUI版本。

負載均衡

客戶端內部實現

客戶端在程式碼中實現,使用一個專門的獲取連線函式

  1. 輪詢
  2. 加權輪詢
  3. 隨機
  4. 加權隨機
  5. 源地址雜湊
  6. 最小連線數法

HAProxy

HAProxy 實現了一種事件驅動、單一程序模型,此模型支援非常大的併發連線數。

環境:

HAProxy 主機:192.168.0.9 5671
RabbitMQ 1:192.168.02 5672
RabbitMQ 2:192.168.03 5672
RabbitMQ 3:192.168.04 5672
#全域性配置
global
    #日誌輸出配置,所有日誌都記錄在本機,通過 local0 輸出
    log 127.0.0.1 local0 info
    #最大連線數
    maxconn 4096
    #改變當前的工作目錄
    chroot /opt/haproxy-1.7.8
    #以指定的 UID 執行 haproxy 程序
    uid 99
    #以指定的 GID 執行 haproxy 程序
    gid 99
    #以守護程序方式執行 haproxy #debug #quiet
    daemon
    #debug
    #當前程序 pid 檔案
    pidfile /opt/haproxy-1.7.8/haproxy.pid
#預設配置
defaults
    #應用全域性的日誌配置
    log global
    #預設的模式 mode{tcp|http|health}
    #TCP 是 4 層,HTTP 是 7 層,health 只返回 OK
    mode tcp
    #日誌類別 tcplog
    option tcplog
    #不記錄健康檢查日誌資訊
    option dontlognull
    #3 次失敗則認為服務不可用
    retries 3
    #每個程序可用的最大連線數
    maxconn 2000
    #連線超時
    timeout connect 5s
    #客戶端超時
    timeout client 120s
    #服務端超時
    timeout server 120s
#繫結配置
listen rabbitmq_cluster :5671
    #配置 TCP 模式
    mode tcp
    #簡單的輪詢
    balance roundrobin
    #RabbitMQ 叢集節點配置
    #      起名      指定地址        5000毫秒檢測可用  故障後檢查2次才被確認可用    3次檢查失敗後停止使用   服務權重
    server rmq_node1 192.168.0.2:5672 check inter 5000 rise 2 fall 3 weight 1
    server rmq_node2 192.168.0.3:5672 check inter 5000 rise 2 fall 3 weight 1
    server rmq_node3 192.168.0.4:5672 check inter 5000 rise 2 fall 3 weight 1
#haproxy 監控頁面地址
listen monitor :8100
    mode http
    option httplog
    stats enable
    stats uri /stats
    stats refresh 5s

Keepalived

HAProxy 故障後,所有MQ都無法連線。 Keepalived 能夠通過自身健康檢查、資源接管功能做高可用(雙機熱備),實現故障轉移。

Keepalived 採用 VRRP(Virtual Router Redundancy Protocol,虛擬路由冗餘協議),以軟體的形式實現服務的熱備功能。通常情況下是將兩臺 Linux 伺服器組成一個熱備組(Master Backup,同一時間內熱備組只有一臺主伺服器 Master 提供服務,同時 Master 會虛擬出一個公用的虛擬 IP 地址,簡稱 VIP。這個 VIP 只存在於 Master 上並對外提供服務。如果 Keepalived檢測到 Master 宕機或者服務故障,備份伺服器 Backup 會自動接管 VIP 併成為 MasterKeepalived將原 Master 從熱備組中移除。當原 Master 恢復後,會自動加入到熱備組,預設再搶佔成為 Master起到故障轉移的功能。

Keepalived 工作在 OSI 模型中的第 3 層、第 4 層和第 7 層。
第3層,定期用ICMP包判斷熱備組中機器是否異常,異常則移除
第4層,通過TCP埠判斷應用是否正常,移除則移除
第7層,通過使用者自定義指令碼判斷服務是否正常,異常則移除

兩臺Keepalived通過VRRP互動,對外虛擬出一個VIP。Keepalived HAProxy 部署在同一臺機器上通過 Keeaplived 實現 HAProxy 的雙機熱備。

client通過VIP建立通訊鏈路,通過Keepalived的Master節點路由到HAProxy上,通過負載均衡演算法分發到各個MQ節點。正常流量通過左側,當Master掛掉時Backup提升為Master。

# Keepalived 的 Mater 配置
global_defs {
    router_id NodeA #路由 ID、主/備的 ID 不能相同
}
#自定義監控指令碼
vrrp_script chk_haproxy {
    script "/etc/keepalived/check_haproxy.sh"
    interval 5
    weight 2
}
vrrp_instance VI_1 {
    state MASTER #Keepalived 的角色。Master 表示主伺服器,從伺服器設定為 BACKUP
    interface eth0 #指定監測網絡卡
    virtual_router_id 1
    priority 100 #優先順序,BACKUP 機器上的優先順序要小於這個值
    advert_int 1 #設定主備之間的檢查時間,單位為 s
    authentication { #定義驗證型別和密碼
        auth_type PASS
        auth_pass root123
    }
    track_script {
        chk_haproxy
    }
    virtual_ipaddress { #VIP 地址,可以設定多個:
        192.168.0.10
    }
}

# Backup 配置,大多數一致
global_defs {
    router_id NodeB
}
vrrp_script chk_haproxy {
    ...
}
vrrp_instance VI_1 {
    state BACKUP
    ...
    priority 50   # 小於 100
    ...
}

檢查HAProxy服務狀態的指令碼

#!/bin/bash
if [ $(ps -C haproxy --no-header | wc -l) -eq 0 ];then
    haproxy -f /opt/haproxy-1.7.8/haproxy.cfg
fi
sleep 2
if [ $(ps -C haproxy --no-header | wc -l) -eq 0 ];then
    service keepalived stop
fi

Keepalived + LVS

LVS Linux Virtual Server 的簡稱,也就是 Linux 虛擬伺服器。LVS 支援 TCP/UDP 的負載均衡,由3部分組成:

  1. 負載排程器:整個叢集對外的前端機,將來自客戶的請求轉發到一組伺服器
  2. 伺服器池:執行請求的一組伺服器,如MQ叢集
  3. 共享儲存:為伺服器池提供一個共享的儲存區,這樣很容易使伺服器池擁有相同的內容,提供相同的服務

有3中負載均衡方式:

  1. VS/NAT:所有的 RealServer 只需要將自己的閘道器指向 Director 即可。但負載有限
  2. VS/TUN:將一個 IP報文封裝在另一個 IP 報文的技術,這可以使目標為一個 IP 地址的資料報文能夠被封裝和轉發到另一個 IP 地址。IP 隧道技術也可以稱之為 IP 封裝技術
  3. VS/DR:通過改寫報文中的MAC地址部分來實現的。Director 和 RealServer 必須在物理上有一個網絡卡通過不間斷的區域網相連。RealServer 上繫結的 VIP 配置在各自 Non-ARP 的網路裝置上(如 lo 或tunl),Director 的 VIP 地址對外可見,而 RealServer 的 VIP 對外是不可見的。RealServer的地址既可以是內部地址,也可以是真實地址

LVS 可以完全替代 HAProxy 而其他內容可以保持不變。LVS 不需要額外的配置檔案,直接
整合在 Keepalived 的配置檔案之中。

#Keepalived 配置檔案(Master)
global_defs {
    router_id NodeA #路由 ID、主/備的 ID 不能相同
}
vrrp_instance VI_1 {
    state MASTER #Keepalived 的角色。Master 表示主伺服器,從伺服器設定為 BACKUP
    interface eth0 #指定監測網絡卡
    virtual_router_id 1
    priority 100 #優先順序,BACKUP 機器上的優先順序要小於這個值
    advert_int 1 #設定主備之間的檢查時間,單位為 s
    authentication { #定義驗證型別和密碼
        auth_type PASS
        auth_pass root123
    }
    track_script {
        chk_haproxy
    }
    virtual_ipaddress { #VIP 地址,可以設定多個:
        192.168.0.10
    }
}

virtual_server 192.168.0.10 5672 { #設定虛擬伺服器
    delay_loop 6 #設定執行情況檢查時間,單位是秒
    #設定負載排程演算法,共有 rr、wrr、lc、wlc、lblc、lblcr、dh、sh 這 8 種
    lb_algo wrr #這裡是加權輪詢
    lb_kind DR #設定 LVS 實現的負載均衡機制方式 VS/DR
    #指定在一定的時間內來自同一 IP 的連線將會被轉發到同一 RealServer 中
    persistence_timeout 50
    protocal TCP #指定轉發協議型別,有 TCP 和 UDP 兩種
    #這個 real_server 即 LVS 的三大部分之一的 RealServer,這裡特指 RabbitMQ 的服務
    real_server 192.168.0.2 5672 { #配置服務節點
        weight 1 #配置權重
        TCP_CHECK {
            connect_timeout 3
            nb_get_retry 3
            delay_before_retry 3
            connect_port 5672
        }
    }
    real_server 192.168.0.3 5672 {
        weight 1
        TCP_CHECK {
            connect_timeout 3
            nb_get_retry 3
            delay_before_retry 3
            connect_port 5672
        }
    }
    real_server 192.168.0.4 5672 {
        weight 1
        TCP_CHECK {
            connect_timeout 3
            nb_get_retry 3
            delay_before_retry 3
            connect_port 5672
        }
    }
}
#為 RabbitMQ 的 RabbitMQ Management 外掛設定負載均衡
virtual_server 192.168.0.10 15672 {
    delay_loop 6
    lb_algo wrr
    lb_kind DR
    persistence_timeout 50
    protocal TCP
    real_server 192.168.0.2 15672 {
        weight 1
        TCP_CHECK {
            connect_timeout 3
            nb_get_retry 3
            delay_before_retry 3
            connect_port 15672
        }
    }
    real_server 192.168.0.3 15672 {
        weight 1
        TCP_CHECK {
            connect_timeout 3
            nb_get_retry 3
            delay_before_retry 3
            connect_port 15672
        }
    }
    real_server 192.168.0.4 15672 {
        weight 1
        TCP_CHECK {
            connect_timeout 3
            nb_get_retry 3
            delay_before_retry 3
            connect_port 15672
        }
    }
}

LVS主要的工作是提供排程演算法,把客戶端請求按照需求排程在 RealServer 中,Keepalived 主要的工作是提供 LVS 控制器的一個冗餘,並且對 RealServer 進行健康檢查,發現不健康的 RealServer就把它從 LVS 叢集中剔除,RealServer 只負責提供服務。

在VS/SR模式下,LVS將client的包轉發給RealServer時因為包的目的地址是VIP,會發現地址不匹配然後丟棄資料包。要將這個VIP綁到網絡卡下,傳送應答包是RealServer會將包的源和目的地址調換回復給客戶端。

為所有RealServer的 lo:0 網絡卡繫結VIP:

#!/bin/bash
VIP=192.168.0.10
/etc/rc.d/init.d/functions

case "$1" in
start)
    /sbin/ifconfig lo:0 $VIP netmask 255.255.255.255 broadcast $VIP
    /sbin/route add -host $VIP dev lo:0
    echo "1" >/proc/sys/net/ipv4/conf/lo/arp_ignore
    echo "2" >/proc/sys/net/ipv4/conf/lo/arp_announce
    echo "1" >/proc/sys/net/ipv4/conf/all/arp_ignore
    echo "2" >/proc/sys/net/ipv4/conf/all/arp_announce
    sysctl -p >/dev/null 2>&1
    echo "RealServer Start Ok"
;;
stop)
    /sbin/ifconfig lo:0 down
    /sbin/route del -host $VIP dev lo:0
    echo "0" >/proc/sys/net/ipv4/conf/lo/arp_ignore
    echo "0" >/proc/sys/net/ipv4/conf/lo/arp_announce
    echo "0" >/proc/sys/net/ipv4/conf/all/arp_ignore
    echo "0" >/proc/sys/net/ipv4/conf/all/arp_announce
;;
status)
    islothere=`/sbin/ifconfig lo:0 | grep $VIP | wc -l`
    isrothere=`netstat -rn | grep "lo:0"|grep $VIP | wc -l`
    if [ $islothere -eq 0 ]
    then
        if [ $isrothere -eq 0 ]
        then
            echo "LVS of RealServer Stopped."
        else
            echo "LVS of RealServer Running."
        fi
    else
        echo "LVS of RealServer Running."
    fi
;;
*)
    echo "Usage:$0{start|stop}"
    exit 1
;;
esac