1. 程式人生 > 實用技巧 >go操作rabbitmq

go操作rabbitmq

conf.go

交換機為 /test

注意: 連結地址為 amqp://admin:[email protected]:5672//test 兩個斜線,一開始寫的一個,老是報錯沒許可權

交換機模式為topic模式

package config

const (
	RMQADDR      =  "amqp://admin:[email protected]:5672//test"
	EXCHANGENAME = "syslog_topic"
)

  

consumer.go

package main

import (
	config "dg/rabbitMq/conf"
	"fmt"
	"log"
	"os"

	"github.com/streadway/amqp"
)

/*


./consumer "#" info.payment.* *.log debug.payment.#

*/

func main() {

	conn, err := amqp.Dial(config.RMQADDR)
	failOnError(err, "Failed to connect to RabbitMQ")
	defer conn.Close()

	forever := make(chan bool)

	ch, err := conn.Channel()
	failOnError(err, "Failed to open a channel")
	defer ch.Close()

	err = ch.ExchangeDeclare(
		config.EXCHANGENAME, //exchange name
		"topic",             //exchange kind
		true,                //durable
		false,               //autodelete
		false,
		false,
		nil,
	)

	failOnError(err, "Failed to declare exchange")

	if len(os.Args) < 2 {
		log.Println(len(os.Args))
		log.Println(`"Arguments error(Example: ./consumer "#" info.payment.* *.log debug.payment.#"`)
		return
	}

	topics := os.Args[1:]
	topicsCnt := len(topics)

	for routing := 0; routing < topicsCnt; routing++ {
		go func(routingNum int) {

			q, err := ch.QueueDeclare(
				"",
				false, //durable
				false, //delete when unused
				true,  //exclusive
				false, //no-wait
				nil,   //arguments
			)

			failOnError(err, "Failed to declare a queue")

			err = ch.QueueBind(
				q.Name,
				topics[routingNum],
				config.EXCHANGENAME,
				false,
				nil,
			)
			failOnError(err, "Failed to bind exchange")

			msgs, err := ch.Consume(
				q.Name,
				"",
				true, //Auto Ack
				false,
				false,
				false,
				nil,
			)

			failOnError(err, "Failed to register a consumer")

			for msg := range msgs {
				log.Printf("In %s consume a message: %s\n", topics[routingNum], msg.Body)
			}

		}(routing)
	}

	<-forever
}

func failOnError(err error, msg string) {
	if err != nil {
		fmt.Printf("%s: %s\n", msg, err)
	}
}

  

producer.go

package main

import (
	config "dg/rabbitMq/conf"
	"fmt"
	"log"
	"os"

	"github.com/streadway/amqp"
)

func main() {

	conn, err := amqp.Dial(config.RMQADDR)
	failOnError(err, "Failed to connect to RabbitMQ")
	defer conn.Close()

	ch, err := conn.Channel()
	failOnError(err, "Failed to open a channel")
	defer ch.Close()

	err = ch.ExchangeDeclare(
		config.EXCHANGENAME, //exchange name
		"topic",             //exchange kind
		true,                //durable
		false,               //autodelete
		false,
		false,
		nil,
	)

	failOnError(err, "Failed to declare exchange")

	if len(os.Args) < 3 {
		fmt.Println("Arguments error(ex:producer topic msg1 msg2 msg3")
		return
	}

	routingKey := os.Args[1]

	msgs := os.Args[2:]

	msgNum := len(msgs)

	for cnt := 0; cnt < msgNum; cnt++ {
		msgBody := msgs[cnt]
		err = ch.Publish(
			config.EXCHANGENAME, //exchange
			routingKey,          //routing key
			false,
			false,
			amqp.Publishing{
				ContentType: "text/plain",
				Body:        []byte(msgBody),
			})

		log.Printf(" [x] Sent %s", msgBody)
	}
	failOnError(err, "Failed to publish a message")

}

func failOnError(err error, msg string) {
	if err != nil {
		fmt.Printf("%s: %s\n", msg, err)
	}
}