1. 程式人生 > >.net core RabbitMQ 訊息佇列

.net core RabbitMQ 訊息佇列

上篇我們說到erlang的安裝,現在有了基礎前提,就可以繼續安裝RabbitMQ了!
這裡我選用的RabbitMQ版本是:

PS:這個RabbitMQ版本是要對應前面erlang版本,所以前面我們安裝的版本是21.2,因為需要提供21.X版本的erlang才能安裝 rabbitmq-server-3.7.10

1.安裝RabbitMQ
下載地址:http://www.rabbitmq.com/install-windows.html
download完,執行exe檔案,安裝到自己選用的目錄,並配置環境變數

rabbitmq的基本操作:

  • 啟動:rabbitmq-server -detached
  • 關閉:rabbitmqctl stop
  • 啟動:rabbitmqctl status

2.配置rabbitmq網頁管理外掛
以管理員執行命令提示啟用外掛:

rabbitmq-plugins enable rabbitmq_management

開啟瀏覽器頁面:http://localhost:15672 可以看到

預設登陸為:guest/guest

3.開啟rabbitMQ遠端訪問

  • 新增使用者,使用者名稱:XRom 密碼:XRom123
rabbitmqctl add_user XRom XRom123
  • 新增許可權
rabbitmqctl set_permissions -p "/" XRom ".*" ".*" ".*"
  • 修改使用者角色
rabbitmqctl set_user_tags XRom administrator

然後就可以遠端訪問了,可以用新增的使用者登入RabbitMQ

4.Producer與Exchange

  • Producer
    訊息的生產者,也就是建立訊息的物件
  • Exchange
    訊息的接受者,也就是用來接收訊息的物件,Exchange接收到訊息後將訊息按照規則傳送到與他繫結的Queue中。下面我們來定義一個Producer與Exchange。

新建.net core 控制檯專案,並引入NuGet包

接下來可以用程式碼看實現效果:

using RabbitMQ.Client;
using RabbitMQ.Client.Events;
using System;
using System.Text;
using System.Threading;

namespace RabbitMQConsole
{
    class Program
    {
        /// <summary>
        ///  建立只讀連線物件
        /// </summary>
        private static readonly ConnectionFactory rabbitMqFactory = new ConnectionFactory()
        {
            HostName = "",//這裡寫自己電腦hostname,可以通過命令提示符,直接輸入hostname查詢
            Port = 5672,
            UserName = "XRom",
            Password = "XRom123",
            VirtualHost = "/"
        };

        static void Main(string[] args)
        {
            var exchange = "change2";
            var route = "route2";
            var queue = "queue2";

            using (IConnection conn = rabbitMqFactory.CreateConnection())
            {
                using (IModel channel = conn.CreateModel())
                {
                    channel.ExchangeDeclare(exchange, type: "direct", durable: true, autoDelete: false);//建立change2
                    channel.QueueDeclare(queue, durable: true, exclusive: false, autoDelete: false);//建立queue2
                    channel.QueueBind(queue, exchange, route);//將queue2繫結到change2

                    #region 傳送訊息
                    var props = channel.CreateBasicProperties();
                    props.Persistent = true; //持久化
                    channel.BasicPublish(exchange, route, true, props, Encoding.UTF8.GetBytes("hello rabbitmq!!"));
                    #endregion

                    #region 消費訊息
                    //while (true)
                    //{
                    //    var message = channel.BasicGet(queue, true);  //第二個引數說明自動釋放訊息,如為false需手動釋放訊息
                    //    if (message != null)
                    //    {
                    //        var msgBody = Encoding.UTF8.GetString(message.Body);
                    //        Console.WriteLine(string.Format("***接收時間:{0},訊息內容:{1}", DateTime.Now.ToString("yyyy-MM-dd HH:mm:ss"), msgBody));
                    //    }
                    //    System.Threading.Thread.Sleep(TimeSpan.FromSeconds(1));
                    //}
                    #endregion

                    #region 讓失敗的訊息回到佇列中
                    //while (true)
                    //{
                    //    var message = channel.BasicGet(queue, false);
                    //    if (message != null)
                    //    {
                    //        var msgBody = Encoding.UTF8.GetString(message.Body);
                    //        Console.WriteLine(string.Format("***接收時間:{0},訊息內容:{1}", DateTime.Now.ToString("yyyy-MM-dd HH:mm:ss"), msgBody));
                    //        Console.WriteLine(message.DeliveryTag);   //當前訊息被處理的次序數
                    //    if (1 == 1)
                    //            channel.BasicReject(message.DeliveryTag, true);
                    //    }

                    //    System.Threading.Thread.Sleep(TimeSpan.FromSeconds(1));
                    //}
                    #endregion

                    #region 監聽訊息
                    //channel.BasicQos(prefetchSize: 0, prefetchCount: 20, global: false);  //一次接受10條訊息,否則rabbit會把所有的訊息一次性推到client,會增大client的負荷
                    //EventingBasicConsumer consumer = new EventingBasicConsumer(channel);
                    //consumer.Received += (model, ea) =>
                    //{
                    //    Byte[] body = ea.Body;
                    //    String message = Encoding.UTF8.GetString(body);
                    //    Console.WriteLine(message + Thread.CurrentThread.ManagedThreadId);
                    //    channel.BasicAck(deliveryTag: ea.DeliveryTag, multiple: false);
                    //};

                    //channel.BasicConsume(queue: queue, autoAck: false, consumer: consumer);
                    //Console.ReadLine();
                    #endregion
                }
            }





        }
    }
}