1. 程式人生 > 實用技巧 >WORK QUEUES和ROUTING

WORK QUEUES和ROUTING

1、交換機 exchange [direct,fanout,headers,topic ]
direct:work queues和routing都是直連場景
BasicGet:主動的去拉取。
1)work queues:
預設輪詢
使用 subscribe和publish 釋出、訂閱的方式

EventingBasicConsumer:多個consumer可以分攤CPU的計算壓力

2)routing:
需要建立交換機,並綁定當前的佇列
//第三步:建立exchange
channel.ExchangeDeclare("myexchange", ExchangeType.Direct, true, false, null);

//第四步:建立一個佇列(queue)並繫結
channel.QueueDeclare("mytest", true, false, false, null);
channel.QueueBind("mytest", "myexchange", "mytest", null);

2、相關程式碼

work queues和routing的區別僅僅是是否使用自己設定的交換機,直接展示routing程式碼,相關資料可以查詢:

https://www.rabbitmq.com/getstarted.html

1)生產者

            //基礎配置
            ConnectionFactory factory = new ConnectionFactory()
            {
                HostName = "10.123.44.12",
                UserName = "datamip",
                Password = "datamip"
            };

            //第一步:建立connection
            using (var connection = factory.CreateConnection())
            {
                //第二步:建立channel
                using (var channel = connection.CreateModel())
                {
                    //第三步:建立exchange
                    channel.ExchangeDeclare("myexchange", ExchangeType.Direct, true, false, null);

                    //第四步:建立一個佇列(queue)並繫結
                    channel.QueueDeclare("mytest", true, false, false, null);
                    channel.QueueBind("mytest", "myexchange", "mytest", null);

                    //第五步:釋出訊息
                    for (int i = 0; i < 100; i++)
                    {
                        var msg = Encoding.UTF8.GetBytes(string.Format("{0}:{1}", i, "你好"));

                        channel.BasicPublish("myexchange", "mytest", basicProperties: null, body: msg);
                    }

                }
            }
            Console.WriteLine("生產成功!");
            Console.ReadKey();

2)消費者

            //建立連線工廠
            ConnectionFactory factory = new ConnectionFactory
			{
				UserName = "datamip",//使用者名稱
				Password = "datamip",//密碼
				HostName = "10.123.44.12"//rabbitmq ip
			};           
            //第一步:建立connection
            using (var connection = factory.CreateConnection())
            {
                //第二步:建立channel
                using (var channel = connection.CreateModel())
                {

                    //第三步:宣告佇列
                    channel.QueueDeclare("mytest",true,false,false,null);

                    //處理訊息
                    EventingBasicConsumer consumer = new EventingBasicConsumer(channel);
                    consumer.Received += (sender,e)=> {
                        var msg = Encoding.UTF8.GetString(e.Body.ToArray());
                        Console.WriteLine(msg);
                    };
                    //消費
                    channel.BasicConsume("mytest", true, consumer);
                    Console.ReadKey();
                }
            }