RabbitMQ通過Exchange.Direct、同一個佇列繫結不同的routekey實現不同的消費
阿新 • • 發佈:2019-01-05
通過消費者去進行Exchange和Queue通過不同的RouteKey進行繫結
消費者1:
static void Main(string[] args) { ConnectionFactory factory = new ConnectionFactory() { HostName = "192.168.59.101", UserName = "admin", Password = "admin" };string myexchange = "myexchange"; string myqueue = "myqueue"; using (var connection = factory.CreateConnection()) { var channel = connection.CreateModel(); channel.ExchangeDeclare(myexchange, ExchangeType.Direct,true, false, null); channel.QueueDeclare(myqueue, true, false, false, null); channel.QueueBind(myqueue, myexchange, "log_info", null); EventingBasicConsumer consumer = new EventingBasicConsumer(channel); consumer.Received += (sender, e) => {var msg = Encoding.UTF8.GetString(e.Body); Console.WriteLine(msg); }; channel.BasicConsume(myqueue, false, consumer); Console.ReadKey(); } }
消費者2:
static void Main(string[] args) { ConnectionFactory factory = new ConnectionFactory() { HostName = "192.168.59.101", UserName = "admin", Password = "admin" }; string myexchange = "myexchange"; string myqueue = "myqueue"; using (var connection = factory.CreateConnection()) { var channel = connection.CreateModel(); channel.ExchangeDeclare(myexchange, ExchangeType.Direct, true, false, null); channel.QueueDeclare(myqueue, true, false, false, null); channel.QueueBind(myqueue, myexchange, "log_error", null); EventingBasicConsumer consumer = new EventingBasicConsumer(channel); consumer.Received += (sender, e) => { var msg = Encoding.UTF8.GetString(e.Body); Console.WriteLine(msg); }; channel.BasicConsume(myqueue, false, consumer); Console.ReadKey(); } }
生產者:
static void Main(string[] args) { ConnectionFactory factory = new ConnectionFactory() { HostName = "192.168.59.101", UserName = "admin", Password = "admin" }; string myexchange = "myexchange"; string myroutekey = "myrotekey"; string myqueue = "myqueue"; using (var connection = factory.CreateConnection()) { var channel = connection.CreateModel(); //channel.ExchangeDeclare(myexchange, ExchangeType.Direct, true, false, null); //channel.QueueDeclare(myqueue,true,false, false, null); //channel.QueueBind(myqueue, myexchange, myroutekey, null); for (int i = 0; i < 10; i++) { var msg = Encoding.UTF8.GetBytes($"{i},你好"); var routeKey = i % 2 == 0 ? "log_info" : "log_error"; channel.BasicPublish(myexchange, routingKey: routeKey, basicProperties: null, body: msg); Console.WriteLine(i); } Console.ReadKey(); } }
先啟動消費者,進行繫結和監聽,再執行生產者進行傳送訊息,結果是監聽同一個佇列,不同的routeKey結果不同