1. 程式人生 > 其它 >RabbitMQ (一)

RabbitMQ (一)

一、RabbitMQ 簡介

​ RabbitMQ 是實現了高階訊息佇列協議(AMQP)的開源訊息代理軟體(也可以稱為:面向訊息的中介軟體)。

​ RabbitMQ 伺服器是用Erlang語言編寫的,而叢集和故障轉移是構建在開放電信平臺框架上的。

特性:

​ 可伸縮性:叢集服務

​ 訊息持久化:從記憶體持久化訊息到硬碟,再從硬碟載入到記憶體。

二、RabbitMQ安裝

1.1安裝條件:

​ 1、 OTP 24.1 Windows 64-bit Binary File 需要先裝erlang
​ 下載地址:

https://www.erlang.org/downloads

​ 2、下載 rabbitmq-server-3.8.3.exe 安裝包

1.2、裝完了erlang,回去找到剛才第一步下載的rabbitMQ的安裝包雙擊安裝,一樣的一直下一步就行了(中間遇到需要給它網路點確定就可以了)

​ 1、開啟cmd介面進入rabbitMQ的安裝目錄下的sbin目錄

​ 2、根據官網步驟執行命令

rabbitmq-plugins enable rabbitmq_management
1.3、安裝成功找到安裝目錄找到rabbitmq-server.bat雙擊執行(如果有錯就右鍵以管理員身份執行)

​ 1、訪問地址:http://localhost:15672/

​ 預設登入名:guest

​ 登入密碼:guest

三、RabbitMQ 的基本使用

​ 新建生產者控制檯專案:RabbitMQ_Producer

​ 通過nuget安裝:

RabbitMQ.Client

​ 程式碼中的使用者名稱和密碼不能使用預設的要不然會報錯,在Admin中新增一個使用者,如圖:

​ 新增之後預設沒有連線許可權需要去設定,點選新建使用者,如圖:

生產者程式碼如下:

  static void Main(string[] args)
        {
            string host = "192.168.1.4";  //IP 地址
            int port = 5672;  //埠號
            string userName = "admin";  //RabbitMQ 使用者名稱
            string userPw = "admin";  //RabbitMQ  密碼
            string virtualHost = "/";

            //建立工廠 
            var conFactory = new ConnectionFactory();
            //賦值
            conFactory.HostName = host;
            conFactory.Port = port;
            conFactory.UserName = userName;
            conFactory.Password = userPw;
            conFactory.VirtualHost = virtualHost;

            //建立連線
            var connection = conFactory.CreateConnection();
            //佇列名稱
            string queuesName = "test";
            //建立通道
            var channel = connection.CreateModel(); 
        //給通道繫結一個佇列,佇列如果不存在,則會建立新佇列,如果佇列已存在,那麼引數一定要正確,特別是param引數,否則會報錯
            var param = new Dictionary<string, object>() { { "x-queue-type", "classic" } }; 
            channel.QueueDeclare(queue: queuesName,durable:true,exclusive:false,arguments:param);
            //傳送佇列
            for (int i = 0; i < 5; i++)
            {
                var buffer = Encoding.UTF8.GetBytes(i.ToString());
                channel.BasicPublish("", queuesName,null,buffer);
                Console.WriteLine("傳送訊息:"+i);
            }
            connection.Close();
            Console.ReadKey();
        }

消費者程式碼:

   static void Main(string[] args)
        {
            string host = "192.168.1.4";
            int port = 5672;
            string userName = "admin";
            string userPw = "admin";
            string virtualHost = "/";

            //建立工廠 
            var conFactory = new ConnectionFactory();
            //賦值
            conFactory.HostName = host;
            conFactory.Port = port;
            conFactory.UserName = userName;
            conFactory.Password = userPw;
            conFactory.VirtualHost = virtualHost;

            //建立連線
            var connection = conFactory.CreateConnection();
            //佇列名稱
            string queuesName = "test";
            //建立通道
            var channel = connection.CreateModel();
              //給通道繫結一個佇列,佇列如果不存在,則會建立新佇列,如果佇列已存在,那麼引數一定要正確,param引數,否則會報錯
            var param = new Dictionary<string, object>() { { "x-queue-type", "classic" } }; 
            channel.QueueDeclare(queue: queuesName, durable: true, exclusive: false, arguments: param);
            //建立消費者
            EventingBasicConsumer consumer = new EventingBasicConsumer(channel);

            consumer.Received += (sender, e) =>
            {
                var body = e.Body.Span;
                var message = Encoding.UTF8.GetString(body);
                Console.WriteLine($"接收到訊息:{message}");

                Thread.Sleep(500);//暫停一下

                //通知訊息已被處理,如果沒有,那麼訊息將會被重複消費
                channel.BasicAck(e.DeliveryTag, false);
            };
            //ack設定成false,表示不自動提交,那麼就需要在訊息被消費後,手動呼叫BasicAck去提交訊息
            channel.BasicConsume(queuesName, false, consumer);

            Console.ReadKey();
        }