1. 程式人生 > >訊息佇列rabbitmq的五種工作模式(go語言版本)

訊息佇列rabbitmq的五種工作模式(go語言版本)

前言:如果你對rabbitmq基本概念都不懂,可以移步此篇博文查閱訊息佇列RabbitMQ

一、單發單收

二、工作佇列Work Queue

三、釋出/訂閱 Publish/Subscribe

四、路由Routing

五、Topic型別的exchange

六、rabbitmq部分封裝程式碼及裝備工作

 

一、單發單收

在下圖中,“ P”是我們的生產者,“ C”是我們的消費者。中間的框是一個佇列-RabbitMQ代表使用者保留的訊息緩衝區。

單發單收模式下:一發一收

傳送端只需要建立佇列,然後向佇列傳送訊息。

接收端也需要建立佇列,因為如果接收端先啟動,沒有此佇列就會報錯,雖然傳送端和接收端都建立此佇列,但rabbitmq還是很智慧的,它只會建立一次。

需要注意的地方:

1.傳送端和接收端都需要建立同名佇列

2.接收端指定從這個同名佇列中接收訊息

 

 

 

傳送端

package main

import (
	"RabbitMQ"
	"time"
)

func main(){
	//第一個引數指定rabbitmq伺服器的連結,第二個引數指定建立佇列的名字
	send_mq := rabbitMQ.New("amqp://user:password@ip:port/","hello")
	for{
		time.Sleep(1)
		send_mq.Send("Hello World!")
	}

} 

接收端

package main

import (
	rabbitMQ "RabbitMQ"
	"log"
)

func  main(){
	//第一個引數指定rabbitmq伺服器的連結,第二個引數指定建立佇列的名字

	receive_mq := rabbitMQ.New("amqp://user:password@ip:port/","hello")
	for{
		//接收訊息時,指定
		msgs := receive_mq .Consume()
		go func() {
			for d := range msgs {
				log.Printf("Received a message: %s", d.Body)
			}
		}()
	}


}

  

二、工作佇列Work Queue

工作佇列和單發單收模式比起來,接收端可以有多個,接收端多了以後就會出現資料分配問題,發過來的資料到底該被哪個接收端接收,所以有兩種模式:

公平分發:每個接收端接收訊息的概率是相等的,傳送端會迴圈依次給每個接收端傳送訊息,圖一是公平分發。

公平派遣:保證接收端在處理完某個任務,併發送確認資訊後,RabbitMQ才會向它推送新的訊息,在此之間若是有新的訊息話,將會被推送到其它接收端,若所有的接收端都在處理任務,那麼就會等待,圖二為公平派遣。

圖一:

 

 

 圖二:

公平分發模式下的傳送端和接收端

傳送端

package main

import (
	"RabbitMQ"
	"strconv"
	"strings"
	"time"
)

func main(){
	//第一個引數指定rabbitmq伺服器的連結,第二個引數指定建立佇列的名字
	send_mq := rabbitMQ.New("amqp://user:password@ip:port/","hello")
	i := 0
	for{
		time.Sleep(1)
		greetings :=  []string{"Helloworld!",strconv.Itoa(i)}
		send_mq.Send(strings.Join( greetings, " "))
		i = i+1
	}

}

 接收端1

package main

import (
	rabbitMQ "RabbitMQ"
	"log"
)

func  main(){
	//第一個引數指定rabbitmq伺服器的連結,第二個引數指定建立佇列的名字

	receive_mq := rabbitMQ.New("amqp://user:password@ip:port/","hello")
	for{
		//接收訊息時,指定
		msgs := receive_mq .Consume()
		go func() {
			for d := range msgs {
				log.Printf("recevie 1 Received a message: %s", d.Body)
			}
		}()
	}


}  

接收端2

package main

import (
	rabbitMQ "RabbitMQ"
	"log"
)

func  main(){
	//第一個引數指定rabbitmq伺服器的連結,第二個引數指定建立佇列的名字

	receive_mq := rabbitMQ.New("amqp://user:password@ip:port/","hello")
	for{
		//接收訊息時,指定
		msgs := receive_mq .Consume()
		go func() {
			for d := range msgs {
				log.Printf("recevie 1 Received a message: %s", d.Body)
			}
		}()
	}


}

公平派遣模式下的傳送端和接收端 

公平派遣模式下發送端與公平分發相同,接收端只需要加一端配置程式碼

我們可以將預取計數設定為1。這告訴RabbitMQ一次不要給工人一個以上的訊息。換句話說,在處理並確認上一條訊息之前,不要將新訊息傳送給工作人員。而是將其分派給不忙的下一個工作程式。

//配置佇列引數
func (q *RabbitMQ)Qos(){
	e := q.channel.Qos(1,0,false)
	failOnError(e,"無法設定QoS")
}

接收端

package main

import (
	rabbitMQ "RabbitMQ"
	"log"
)

func  main(){
	//第一個引數指定rabbitmq伺服器的連結,第二個引數指定建立佇列的名字
	receive_mq := rabbitMQ.New("amqp://user:password@ip:port/","hello")
	//配置公平派遣
	receive_mq.Qos()
	for{
		//接收訊息時,指定
		msgs := receive_mq .Consume()
		go func() {
			for d := range msgs {
				log.Printf("recevie 2 Received a message: %s", d.Body)
			}
		}()
	}


}

官方在這裡介紹了出現以下兩種問題的解決辦法:

1.當接收者掛掉的時候,我們將丟失傳送給接收端還沒有處理的訊息。

2.當rabbitmq伺服器掛了,我們怎麼保證我們的訊息不丟失。

具體參考:https://www.rabbitmq.com/tutorials/tutorial-two-go.html

 

三、釋出/訂閱 Publish/Subscribe

釋出訂閱模式下多了一個概念:exchange,如何理解這個exchange,exchange的作用就是類似路由器,傳送端傳送訊息需要帶有routing key 就是路由鍵,伺服器會根據路由鍵將訊息從交換器路由到佇列上去,所以傳送端和接收端之間有了中介。

exchange有多個種類:direct,fanout,topic,header(非路由鍵匹配,功能和direct類似,很少用)。

首先介紹exchange下的fanout exchange,它會將發到這個exchange的訊息廣播到關注此exchange的所有接收端上。

廣播模式下(1:N):

傳送端連線到rabbitmq後,建立exchange,需要指定交換機的名字和型別,fanout為廣播,然後向此exchange傳送訊息,其它就不用管了。

接收端的執行流程在程式備註中。

注意:廣播模式下的exchange是傳送端是不需要帶路由鍵的哦。

package main

import (
	"RabbitMQ"
	"strconv"
	"strings"
	"time"
)

func main(){
	ch := rabbitMQ.Connect("amqp://user:password@ip:port/")
	rabbitMQ.NewExchange("amqp://user:password@ip:port/","exchange1","fanout")
	i := 0
	for{
		time.Sleep(1)
		greetings :=  []string{"Helloworld!",strconv.Itoa(i)}
		ch.Publish("exchange1",strings.Join( greetings, " "),"")
		i = i+1
	}

}

 

接收端1

package main

import (
	rabbitMQ "RabbitMQ"
	"log"
)

func main(){
	// 1.接收者,首先建立自己佇列
	// 2.建立交換機
	// 3.將自己繫結到交換機上
	// 4.接收交換機上發過來的訊息

	//第一個引數指定rabbitmq伺服器的連結,第二個引數指定建立佇列的名字
	//1
	receive_mq := rabbitMQ.New("amqp://user:password@ip:port/","hello1")
	//2
	//第一個引數:rabbitmq伺服器的連結,第二個引數:交換機名字,第三個引數:交換機型別
	rabbitMQ.NewExchange("amqp://user:password@ip:port/","exchange1","fanout")
	//3
	// 佇列繫結到exchange
	receive_mq.Bind("exchange1","")
	//4
	for{
		//接收訊息時,指定
		msgs := receive_mq .Consume()
		go func() {
			for d := range msgs {
				log.Printf("recevie1  Received a message: %s", d.Body)
			}
		}()
	}
}

接收端2

package main

import (
	rabbitMQ "RabbitMQ"
	"log"
)

func main(){
	// 1.接收者,首先建立自己佇列
	// 2.建立交換機
	// 3.將自己繫結到交換機上
	// 4.接收交換機上發過來的訊息

	//第一個引數指定rabbitmq伺服器的連結,第二個引數指定建立佇列的名字
	//1
	receive_mq := rabbitMQ.New("amqp://user:password@ip:port/","hello2")
	//2
	//第一個引數:rabbitmq伺服器的連結,第二個引數:交換機名字,第三個引數:交換機型別
	rabbitMQ.NewExchange("amqp://user:password@ip:port/","exchange1","fanout")
	//3
	// 佇列繫結到exchange
	receive_mq.Bind("exchange1","")
	//4
	for{
		//接收訊息時,指定
		msgs := receive_mq .Consume()
		go func() {
			for d := range msgs {
				log.Printf("recevie2  Received a message: %s", d.Body)
			}
		}()
	}
}

  

四、路由Routing 

路由模式其實就是全值匹配模式(direct),傳送端傳送訊息需要帶有路由鍵,就是下面傳送端程式的routing key1,是一個字串,傳送端發給exchange,路由模式下的exchange會匹配這個路由鍵,如下面這個圖,傳送者傳送時帶有orange此路由鍵時,這條訊息只會被轉發給Q1佇列,如果路由鍵沒有匹配上的怎麼辦?,全值匹配,沒有匹配到,那麼所有接收者都接收不到訊息,訊息只會傳送給匹配的佇列,接收端的路由鍵是繫結exchange的時候用的。

注意:接收佇列可以繫結多個路由鍵到exchange上,比如下面,當傳送路由鍵為black,green,會被Q2接收。

 

 

 傳送端

package main

import (
	"RabbitMQ"
	"strconv"
	"strings"
	"time"
)

func main(){
	ch := rabbitMQ.Connect("amqp://user:password@ip:port/")
	rabbitMQ.NewExchange("amqp://user:password@ip:port/","exchange","direct")
	i := 0
	for{
		time.Sleep(1)
		greetings :=  []string{"Helloworld!",strconv.Itoa(i)}
		if i%2 ==1 {
			//如果是奇數
			ch.Publish("exchange",strings.Join( greetings, " "),"routing key1")
		} else{
			ch.Publish("exchange",strings.Join( greetings, " "),"routing key2")
		}
		i = i+1
	}

}

接收端1

package main

import (
	rabbitMQ "RabbitMQ"
	"log"
)

func main(){
	// 1.接收者,首先自己佇列
	// 2.建立交換機
	// 3.將自己繫結到交換機上
	// 4.接收交換機上發過來的訊息
	//第一個引數指定rabbitmq伺服器的連結,第二個引數指定建立佇列的名字

	//1
	receive_mq := rabbitMQ.New("amqp://user:password@ip:port/","hello2")

	//2
	//第一個引數:rabbitmq伺服器的連結,第二個引數:交換機名字,第三個引數:交換機型別
	rabbitMQ.NewExchange("amqp://user:password@ip:port/","exchange","direct")

	//3
	receive_mq.Bind("exchange","routing key1")

	//4
	for{
		//接收訊息時,指定
		msgs := receive_mq .Consume()
		go func() {
			for d := range msgs {
				log.Printf("recevie1  Received a message: %s", d.Body)
			}
		}()
	}
}

接收端2

package main

import (
	rabbitMQ "RabbitMQ"
	"log"
)

func main(){
	// 1.接收者,首先自己佇列
	// 2.建立交換機
	// 3.將自己繫結到交換機上
	// 4.接收交換機上發過來的訊息
	//第一個引數指定rabbitmq伺服器的連結,第二個引數指定建立佇列的名字

	//1
	receive_mq := rabbitMQ.New("amqp://user:password@ip:port/","hello2")

	//2
	//第一個引數:rabbitmq伺服器的連結,第二個引數:交換機名字,第三個引數:交換機型別
	rabbitMQ.NewExchange("amqp://user:password@ip:port/","exchange","direct")

	//3
	receive_mq.Bind("exchange","routing key2")

	//4
	for{
		//接收訊息時,指定
		msgs := receive_mq .Consume()
		go func() {
			for d := range msgs {
				log.Printf("recevie2  Received a message: %s", d.Body)
			}
		}()
	}
}

  

 

 

五、Topic型別的exchange 

前面的direct是全值匹配,那麼topic就可以部分匹配,又可以全值匹配,比direct更加靈活。

訊息傳送到topic型別的exchange上時不能隨意指定routing_key(一定是指由一系列由點號連線單詞的字串,單詞可以是任意的,但一般都會與訊息或多或少的有些關聯)。Routing key的長度不能超過255個位元組。

Binding key也一定要是同樣的方式。Topic型別的exchange就像一個直接的交換:一個由生產者指定了確定routing key的訊息將會被推送給所有Binding key能與之匹配的消費者。然而這種繫結有兩種特殊的情況:

  • *(星號):可以(只能)匹配一個單詞
  • #(井號):可以匹配多個單詞(或者零個)

下邊來舉個例子:

在這個例子中,我們將會發送一些描述動物的訊息。Routing key的第一個單詞是描述速度的,第二個單詞是描述顏色的,第三個是描述物種的:“<speed>.<colour>.<species>”。

這裡我們建立三個Binding:Binding key為”*.orange.*”的Q1,和binding key為”*.*.rabbit”和”lazy.#”的Q2。

這些binding可以總結為:

  • Q1對所有橘色的(orange)的動物感興趣;
  • Q2希望能拿到所有兔子的(rabbit)資訊,還有比較懶惰的(lazy.#)動物資訊。

一條以” quick.orange.rabbit”為routing key的訊息將會推送到Q1和Q2兩個queue上,routing key為“lazy.orange.elephant”的訊息同樣會被推送到Q1和Q2上。但如果routing key為”quick.orange.fox”的話,訊息只會被推送到Q1上;routing key為”lazy.brown.fox”的訊息會被推送到Q2上,routing key為"lazy.pink.rabbit”的訊息也會被推送到Q2上,但同一條訊息只會被推送到Q2上一次。

如果在傳送訊息時所指定的exchange和routing key在消費者端沒有對應的exchange和binding key與之繫結的話,那麼這條訊息將會被丟棄掉。例如:"orange"和"quick.orange.male.rabbit"。但是routing為”lazy.orange.male.rabbit”的訊息,將會被推到Q2上。

Topic型別的exchange:

Topic型別的exchange是很強大的,也可以實現其它型別的exchange。

  • 當一個佇列被繫結為binding key為”#”時,它將會接收所有的訊息,此時和fanout型別的exchange很像。
  • 當binding key不包含”*”和”#”時,這時候就很像direct型別的exchange。

傳送端

package main

import (
	"RabbitMQ"
	"time"
)

func main(){
	ch := rabbitMQ.Connect("amqp://user:password@ip/")
	rabbitMQ.NewExchange("amqp://user:password@ip/","exchange","topic")
	for{
		time.Sleep(1)
		ch.Publish("exchange","hello world","lazy.brown.fox")
	}

}

 

接收端

package main

import (
	rabbitMQ "RabbitMQ"
	"log"
)

func main(){
	// 1.接收者,首先自己佇列
	// 2.建立交換機
	// 3.將自己繫結到交換機上
	// 4.接收交換機上發過來的訊息
	//第一個引數指定rabbitmq伺服器的連結,第二個引數指定建立佇列的名字

	//1
	receive_mq := rabbitMQ.New("amqp://user:password@ip:port/","hello1")

	//2
	//第一個引數:rabbitmq伺服器的連結,第二個引數:交換機名字,第三個引數:交換機型別
	rabbitMQ.NewExchange("amqp://user:password@ip:port/","exchange","topic")

	//3
	receive_mq.Bind("exchange","*.orange.*")

	//4
	for{
		//接收訊息時,指定
		msgs := receive_mq .Consume()
		go func() {
			for d := range msgs {
				log.Printf("recevie1  Received a message: %s", d.Body)
			}
		}()
	}
}

接收端2

package main

import (
	rabbitMQ "RabbitMQ"
	"log"
)

func main(){
	// 1.接收者,首先自己佇列
	// 2.建立交換機
	// 3.將自己繫結到交換機上
	// 4.接收交換機上發過來的訊息
	//第一個引數指定rabbitmq伺服器的連結,第二個引數指定建立佇列的名字

	//1
	receive_mq := rabbitMQ.New("amqp://user:password@ip:port/","hello2")

	//2
	//第一個引數:rabbitmq伺服器的連結,第二個引數:交換機名字,第三個引數:交換機型別
	rabbitMQ.NewExchange("amqp://user:password@ip:port/","exchange","topic")
	//3
	receive_mq.Bind("exchange","*.*.rabbit")
	receive_mq.Bind("exchange","lazy.#")
	//4
	for{
		//接收訊息時,指定
		msgs := receive_mq .Consume()
		go func() {
			for d := range msgs {
				log.Printf("recevie2  Received a message: %s", d.Body)
			}
		}()
	}
}

  

 

六、rabbitmq部分封裝程式碼及準備工作

目錄參考:

準備工作:

1.我們再建立go專案時,首先指定gopath目錄,然後在目錄下建立bin、src、pkg目錄。

2.下載github.com/streadway/amqp包,會自動新增到專案的pkg目錄下。

go get github.com/streadway/amqp

 3.在rabbitmq伺服器上建立使用者,指定管理員,並賦予訪問許可權。

4.rabbitmq封裝

package rabbitMQ

import (
	"encoding/json"
	"github.com/streadway/amqp"
	"log"
)

//宣告佇列型別
type RabbitMQ struct {
	channel *amqp.Channel
	Name string
	exchange string
}

//連線伺服器
func Connect(s string)  * RabbitMQ{
	//連線rabbitmq
	conn,e := amqp.Dial(s)
	failOnError(e,"連線Rabbitmq伺服器失敗!")
	ch ,e :=conn.Channel()
	failOnError(e,"無法開啟頻道!")
	mq := new(RabbitMQ)
	mq.channel =ch
	return  mq
}

//初始化單個訊息佇列
//第一個引數:rabbitmq伺服器的連結,第二個引數:佇列名字
func New(s string,name string) * RabbitMQ{
	//連線rabbitmq
	conn,e := amqp.Dial(s)
	failOnError(e,"連線Rabbitmq伺服器失敗!")
	ch ,e :=conn.Channel()
	failOnError(e,"無法開啟頻道!")
	q,e := ch.QueueDeclare(
		 name,//佇列名
		false,//是否開啟持久化
		true,//不使用時刪除
		false, //排他
		false, //不等待
		nil, //引數
	)
	failOnError(e,"初始化佇列失敗!")

	mq := new(RabbitMQ)
	mq.channel =ch
	mq.Name =q.Name
	return  mq
}

//批量初始化訊息佇列
//第一個引數:rabbitmq伺服器的連結,第二個引數:佇列名字列表


//配置佇列引數
func (q *RabbitMQ)Qos(){
	e := q.channel.Qos(1,0,false)
	failOnError(e,"無法設定QoS")
}



//配置交換機引數



//初始化交換機
//第一個引數:rabbitmq伺服器的連結,第二個引數:交換機名字,第三個引數:交換機型別
func NewExchange(s string,name string,typename string){
	//連線rabbitmq
	conn,e := amqp.Dial(s)
	failOnError(e,"連線Rabbitmq伺服器失敗!")
	ch ,e :=conn.Channel()
	failOnError(e,"無法開啟頻道!")
	e = ch.ExchangeDeclare(
		name,   // name
		typename, // type
		true,     // durable
		false,    // auto-deleted
		false,    // internal
		false,    // no-wait
		nil,      // arguments
	)
	failOnError(e,"初始化交換機失敗!")

}

//刪除交換機
func (q *RabbitMQ)ExchangeDelete(exchange string){
	e := q.channel.ExchangeDelete(exchange,false,true)
	failOnError(e,"繫結佇列失敗!")
}


//繫結訊息佇列到哪個exchange
func (q *RabbitMQ)Bind(exchange string,key string){
	e := q.channel.QueueBind(
		q.Name,
		 key,
		 exchange,
		false,
		nil,
		)
	failOnError(e,"繫結佇列失敗!")
	q.exchange = exchange
}


//向訊息佇列傳送訊息
//Send方法可以往某個訊息佇列傳送訊息
func (q *RabbitMQ) Send(body interface{}){
	str,e := json.Marshal(body)
	failOnError(e,"訊息序列化失敗!")
	e = q.channel.Publish(
		"",//交換
		q.Name,//路由鍵:當前佇列的名字
		false, //必填
		false, //立即
		amqp.Publishing{
			ReplyTo:q.Name,
			Body:[]byte(str),

		})
	msg := "向佇列:"+q.Name+"傳送訊息失敗!"
	failOnError(e,msg)
}

//向exchange傳送訊息
//Publish方法可以往某個exchange傳送訊息
func (q *RabbitMQ) Publish(exchange string,body interface{},key string) {
	str,e := json.Marshal(body)
	failOnError(e,"訊息序列化失敗!")
	e = q.channel.Publish(
		 exchange,
		 key,
		false,
		false,
		amqp.Publishing{ReplyTo:q.Name,
			Body:[]byte(str)},
		)
	failOnError(e,"向路由傳送訊息失敗!")
}

//接收某個訊息佇列的訊息
func (q * RabbitMQ) Consume() <-chan amqp.Delivery{
	c,e :=q.channel.Consume(
		q.Name,//指定從哪個佇列中接收訊息
		"",
		true,
		false,
		false,
		false,
		nil,
		)
	failOnError(e,"接收訊息失敗!")
	return c
}
//關閉佇列連線
func (q *RabbitMQ) Close() {
	q.channel.Close()
}

//錯誤處理函式
func failOnError(err error, msg string) {
	if err != nil {
		log.Fatalf("%s: %s", msg, err)
	}
}