go操作rabbitmq
阿新 • • 發佈:2020-12-30
conf.go
交換機為 /test
注意: 連結地址為 amqp://admin:[email protected]:5672//test 兩個斜線,一開始寫的一個,老是報錯沒許可權
交換機模式為topic模式
package config const ( RMQADDR = "amqp://admin:[email protected]:5672//test" EXCHANGENAME = "syslog_topic" )
consumer.go
package main import ( config "dg/rabbitMq/conf" "fmt" "log" "os" "github.com/streadway/amqp" ) /* ./consumer "#" info.payment.* *.log debug.payment.# */ func main() { conn, err := amqp.Dial(config.RMQADDR) failOnError(err, "Failed to connect to RabbitMQ") defer conn.Close() forever := make(chan bool) ch, err := conn.Channel() failOnError(err, "Failed to open a channel") defer ch.Close() err = ch.ExchangeDeclare( config.EXCHANGENAME, //exchange name "topic", //exchange kind true, //durable false, //autodelete false, false, nil, ) failOnError(err, "Failed to declare exchange") if len(os.Args) < 2 { log.Println(len(os.Args)) log.Println(`"Arguments error(Example: ./consumer "#" info.payment.* *.log debug.payment.#"`) return } topics := os.Args[1:] topicsCnt := len(topics) for routing := 0; routing < topicsCnt; routing++ { go func(routingNum int) { q, err := ch.QueueDeclare( "", false, //durable false, //delete when unused true, //exclusive false, //no-wait nil, //arguments ) failOnError(err, "Failed to declare a queue") err = ch.QueueBind( q.Name, topics[routingNum], config.EXCHANGENAME, false, nil, ) failOnError(err, "Failed to bind exchange") msgs, err := ch.Consume( q.Name, "", true, //Auto Ack false, false, false, nil, ) failOnError(err, "Failed to register a consumer") for msg := range msgs { log.Printf("In %s consume a message: %s\n", topics[routingNum], msg.Body) } }(routing) } <-forever } func failOnError(err error, msg string) { if err != nil { fmt.Printf("%s: %s\n", msg, err) } }
producer.go
package main import ( config "dg/rabbitMq/conf" "fmt" "log" "os" "github.com/streadway/amqp" ) func main() { conn, err := amqp.Dial(config.RMQADDR) failOnError(err, "Failed to connect to RabbitMQ") defer conn.Close() ch, err := conn.Channel() failOnError(err, "Failed to open a channel") defer ch.Close() err = ch.ExchangeDeclare( config.EXCHANGENAME, //exchange name "topic", //exchange kind true, //durable false, //autodelete false, false, nil, ) failOnError(err, "Failed to declare exchange") if len(os.Args) < 3 { fmt.Println("Arguments error(ex:producer topic msg1 msg2 msg3") return } routingKey := os.Args[1] msgs := os.Args[2:] msgNum := len(msgs) for cnt := 0; cnt < msgNum; cnt++ { msgBody := msgs[cnt] err = ch.Publish( config.EXCHANGENAME, //exchange routingKey, //routing key false, false, amqp.Publishing{ ContentType: "text/plain", Body: []byte(msgBody), }) log.Printf(" [x] Sent %s", msgBody) } failOnError(err, "Failed to publish a message") } func failOnError(err error, msg string) { if err != nil { fmt.Printf("%s: %s\n", msg, err) } }