1. 程式人生 > 實用技巧 >golang操作Rabbit

golang操作Rabbit

GOLANG操作rabbitmq

簡單模式

一個生產者對應一個消費者!!!

生產者

package main

import (
	"fmt"
	"github.com/streadway/amqp"
	"log"
)

func main() {
	url := "amqp://guest:guest@localhost:5672/"

	// 建立連結
	coon, err := amqp.Dial(url)
	if err != nil {
		log.Fatal(err)
	}
	defer coon.Close()

	// 獲取channel
	channel, err := coon.Channel()
	if err != nil {
		log.Fatal(err)
	}
	defer channel.Close()

	// 宣告佇列,沒有則建立
	_, err = channel.QueueDeclare("hello", true, true, false, false, nil)
	/*
		name:queue的名稱
		durable:是否持久化
		autoDelete: 是否自動刪除(當沒有customer時會自動刪除)
		exclusive:
			1.是否獨佔,只有一個消費者監聽這個佇列
			2.當coon關閉的時候,是否刪除佇列
		noWait: 是否等待
		args: 額外的引數
	*/
	// 釋出訊息
	message := amqp.Publishing{
		Body: []byte("hello world"),
	}
	if err := channel.Publish("", "hello", false, false, message); err != nil {
		fmt.Println(err)
	}
	/*
		exchange:交換機的名稱,簡單模式下使用預設的""
		routerKey: 路由名稱, 簡單模式下使用和佇列名稱一樣
	*/
}

消費者

func main() {
	url := "amqp://guest:guest@localhost:5672/"

	// 建立連結
	coon, err := amqp.Dial(url)
	if err != nil {
		log.Fatal(err)
	}
	defer coon.Close()

	// 獲取channel
	channel, err := coon.Channel()
	if err != nil {
		log.Fatal(err)
	}
	defer channel.Close()

	// 宣告佇列,沒有則建立
	_, err = channel.QueueDeclare("hello", true, true, false, false, nil)

	delivery, err := channel.Consume("hello", "", false, false, false, false, nil)
	if err != nil {
		log.Fatal(err)
	}
	for message := range delivery {
		fmt.Println(string(message.Body))
	}
}

工作模式

一個生產者對應多個消費者,但是隻能有一個消費者獲得訊息!!!

package main

import (
	"fmt"
	"github.com/streadway/amqp"
	"log"
	"strconv"
)

const (
	url   = "amqp://guest:guest@localhost:5672/"
	queue = "workMode"
)

func Consume(name string) {

	// 建立連結
	coon, err := amqp.Dial(url)
	if err != nil {
		log.Fatal(err)
	}
	defer coon.Close()

	// 獲取channel
	channel, err := coon.Channel()
	if err != nil {
		log.Fatal(err)
	}
	defer channel.Close()

	// 宣告佇列,沒有則建立
	_, err = channel.QueueDeclare(queue, true, true, false, false, nil)

	delivery, err := channel.Consume(queue, "", false, false, false, false, nil)
	if err != nil {
		log.Fatal(err)
	}
	for message := range delivery {
		fmt.Printf("name:%s   body:%s\n", name, string(message.Body))
	}
}

func Publish() {
	coon, err := amqp.Dial(url)
	if err != nil {
		log.Fatal(err)
	}
	defer coon.Close()

	channel, err := coon.Channel()
	if err != nil {
		log.Fatal(err)
	}
	defer channel.Close()

	// 宣告佇列,沒有則建立
	_, err = channel.QueueDeclare(queue, true, true, false, false, nil)

	for i := 0; i < 10; i++ {
		message := amqp.Publishing{
			Body: []byte("hello world" + strconv.Itoa(i)),
		}
		if err := channel.Publish("", queue, false, false, message); err != nil {
			fmt.Println(err)
		}
	}
}

func main() {
	// 開啟兩個消費者
	for i := 0; i < 2; i++ {
		go Consume(fmt.Sprintf("name%d", i))
	}
	// 開啟一個生產者
	go Publish()
	// 阻塞主goroutine
	<-make(chan int)
}

測試結果

name:name1   body:hello world0
name:name1   body:hello world2
name:name1   body:hello world4
name:name1   body:hello world6
name:name1   body:hello world8
name:name0   body:hello world1
name:name0   body:hello world3
name:name0   body:hello world5
name:name0   body:hello world7
name:name0   body:hello world9

釋出訂閱模式

package main

import (
	"fmt"
	"github.com/streadway/amqp"
	"log"
)

const (
	url          = "amqp://guest:guest@localhost:5672/"
	exchangeName = "TestFanout"
	queue1       = "TestFanoutQueue1"
	queue2       = "TestFanoutQueue2"
)

func Consume(queue string, action string) {

	// 建立連結
	coon, err := amqp.Dial(url)
	if err != nil {
		log.Fatal(err)
	}
	defer coon.Close()

	// 獲取channel
	channel, err := coon.Channel()
	if err != nil {
		log.Fatal(err)
	}
	defer channel.Close()

	// 注意:這裡一定要寫上,否則有異常,ioException
	_, err = channel.QueueDeclare(queue, true, true, false, false, nil)

	delivery, err := channel.Consume(queue, "", false, false, false, false, nil)

	if err != nil {
		log.Fatal(err)
	}
	for message := range delivery {
		fmt.Printf("name:%s  action:%s body:%s\n", queue, action, string(message.Body))
	}
}

func Publish() {
	coon, err := amqp.Dial(url)
	if err != nil {
		log.Fatal(err)
	}
	defer coon.Close()

	channel, err := coon.Channel()
	if err != nil {
		log.Fatal(err)
	}
	defer channel.Close()

	// 建立交換機
	if err := channel.ExchangeDeclare(
		exchangeName,        // name
		amqp.ExchangeFanout, // kind
		false,               // durable
		true,                // autoDelete
		false,               // internal 是否rabbitmq內部使用
		true,                // noWait
		nil,                 // args
	); err != nil {
		log.Fatal(err)
	}

	// 宣告佇列,沒有則建立
	_, err = channel.QueueDeclare(queue1, true, true, false, false, nil)
	_, err = channel.QueueDeclare(queue2, true, true, false, false, nil)

	if err := channel.QueueBind(queue1, "", exchangeName, true, nil); err != nil {
		log.Fatal(err)
	}
	if err := channel.QueueBind(queue2, "", exchangeName, true, nil); err != nil {
		log.Fatal(err)
	}
	message := amqp.Publishing{
		Body: []byte("ExchangeFanout"),
	}
    // 使用FANOUT時,routeingKey設定為空字串
	if err := channel.Publish(exchangeName, "", false, false, message); err != nil {
		log.Fatal(err)
	}
}

func main() {
	// 開啟兩個消費者
	go Consume(queue1, "記錄日誌")
	go Consume(queue2, "儲存資訊")
	// 開啟一個生產者
	go Publish()
	// 阻塞主goroutine
	<-make(chan int)
}

路由模式

package main

import (
	"fmt"
	"github.com/streadway/amqp"
	"log"
)

const (
	url             = "amqp://guest:guest@localhost:5672/"
	exchangeName    = "direct"
	errorRouteKey   = "error"
	infoRouteKey    = "info"
	warningRouteKey = "warning"
	queue1          = "directQueue1"
	queue2          = "directQueue2"
)

func Consume(queue string, action string) {

	// 建立連結
	coon, err := amqp.Dial(url)
	if err != nil {
		log.Fatal(err)
	}
	defer coon.Close()

	// 獲取channel
	channel, err := coon.Channel()
	if err != nil {
		log.Fatal(err)
	}
	defer channel.Close()

	// 注意:這裡一定要寫上,否則有異常,ioException
	_, err = channel.QueueDeclare(queue, true, true, false, false, nil)

	delivery, err := channel.Consume(queue, "", false, false, false, false, nil)

	if err != nil {
		log.Fatal(err)
	}
	for message := range delivery {
		fmt.Printf("name:%s  action:%s body:%s\n", queue, action, string(message.Body))
	}
}

func Publish() {
	coon, err := amqp.Dial(url)
	if err != nil {
		log.Fatal(err)
	}
	defer coon.Close()

	channel, err := coon.Channel()
	if err != nil {
		log.Fatal(err)
	}
	defer channel.Close()

	// 建立交換機
	if err := channel.ExchangeDeclare(
		exchangeName,        // name
		amqp.ExchangeDirect, // kind
		false,               // durable
		true,                // autoDelete
		false,               // internal 是否rabbitmq內部使用
		true,                // noWait
		nil,                 // args
	); err != nil {
		log.Fatal(err)
	}

	// 宣告佇列,沒有則建立
	_, err = channel.QueueDeclare(queue1, true, true, false, false, nil)
	_, err = channel.QueueDeclare(queue2, true, true, false, false, nil)

	// 佇列1繫結error
	if err := channel.QueueBind(queue1, errorRouteKey, exchangeName, true, nil); err != nil {
		log.Fatal(err)
	}
	// 佇列2繫結error,info,warning
	if err := channel.QueueBind(queue2, errorRouteKey, exchangeName, true, nil); err != nil {
		log.Fatal(err)
	}
	if err := channel.QueueBind(queue2, infoRouteKey, exchangeName, true, nil); err != nil {
		log.Fatal(err)
	}
	if err := channel.QueueBind(queue2, warningRouteKey, exchangeName, true, nil); err != nil {
		log.Fatal(err)
	}

	message := amqp.Publishing{
		Body: []byte("ExchangeDirect"),
	}
	// 傳送訊息指定交換機和路由key
	if err := channel.Publish(exchangeName, infoRouteKey, false, false, message); err != nil {
		log.Fatal(err)
	}
}

func main() {
	// 開啟兩個消費者
	go Consume(queue1, "記錄日誌")
	go Consume(queue2, "儲存資訊")
	// 開啟一個生產者
	go Publish()
	// 阻塞主goroutine
	<-make(chan int)
}

此時生產者傳送的路由key為info,只有queue2才能收到,如果把路由key改為error,則兩個佇列都能收到。

萬用字元模式

package main

import (
	"fmt"
	"github.com/streadway/amqp"
	"log"
)

const (
	url          = "amqp://guest:guest@localhost:5672/"
	exchangeName = "topic"
	queue1       = "directQueue1"
	queue2       = "directQueue2"
)

func Consume(queue string, action string) {

	// 建立連結
	coon, err := amqp.Dial(url)
	if err != nil {
		log.Fatal(err)
	}
	defer coon.Close()

	// 獲取channel
	channel, err := coon.Channel()
	if err != nil {
		log.Fatal(err)
	}
	defer channel.Close()

	// 注意:這裡一定要寫上,否則有異常,ioException
	_, err = channel.QueueDeclare(queue, true, true, false, false, nil)

	delivery, err := channel.Consume(queue, "", false, false, false, false, nil)

	if err != nil {
		log.Fatal(err)
	}
	for message := range delivery {
		fmt.Printf("name:%s  action:%s body:%s\n", queue, action, string(message.Body))
	}
}

func Publish() {
	coon, err := amqp.Dial(url)
	if err != nil {
		log.Fatal(err)
	}
	defer coon.Close()

	channel, err := coon.Channel()
	if err != nil {
		log.Fatal(err)
	}
	defer channel.Close()

	// 建立交換機
	if err := channel.ExchangeDeclare(
		exchangeName,       // name
		amqp.ExchangeTopic, // kind
		false,              // durable
		true,               // autoDelete
		false,              // internal 是否rabbitmq內部使用
		true,               // noWait
		nil,                // args
	); err != nil {
		log.Fatal(err)
	}

	// 宣告佇列,沒有則建立
	_, err = channel.QueueDeclare(queue1, true, true, false, false, nil)
	_, err = channel.QueueDeclare(queue2, true, true, false, false, nil)

	// 佇列1繫結error
	if err := channel.QueueBind(queue1, "*.*", exchangeName, true, nil); err != nil {
		log.Fatal(err)
	}
	// 佇列2繫結error,info,warning
	if err := channel.QueueBind(queue2, "*.error", exchangeName, true, nil); err != nil {
		log.Fatal(err)
	}
	if err := channel.QueueBind(queue2, "#.info", exchangeName, true, nil); err != nil {
		log.Fatal(err)
	}

	message := amqp.Publishing{
		Body: []byte("ExchangeDirect"),
	}
	// 傳送訊息指定交換機和路由key
	if err := channel.Publish(exchangeName, "test.info", false, false, message); err != nil {
		log.Fatal(err)
	}
}

func main() {
	// 開啟兩個消費者
	go Consume(queue1, "記錄日誌")
	go Consume(queue2, "儲存資訊")
	// 開啟一個生產者
	go Publish()
	// 阻塞主goroutine
	<-make(chan int)
}

萬用字元模式是路由模式的進階版,它能夠讓一個佇列監聽動態的路由規則。