1. 程式人生 > >RabbitMQ——安裝與概念

RabbitMQ——安裝與概念

目錄

生產者:

 消費者:

RabbitMQ安裝:

erlang下載地址:http://www.erlang.org/downloads

RabbitMQ下載地址:http://www.rabbitmq.com/

在windows中安裝RabbitMQ還是十分簡便的,在以前地址找到合適的安裝包下載。

先安裝erlang,因為RabbitMQ服務端使用的語言是erlang。注意安裝時,以管理員身份安裝。

再安裝RabbitMQ,一步步安裝就可以啦。

配置環境變數,方便使用外掛。在rabbitmq的安裝目錄找到RabbitMQ Server\rabbitmq_server-3.6.6\sbin,新增到環境變數path中。如下圖:

執行一個bat檔案

@echo on
attrib -R "C:\Users\daisy\.erlang.cookie"
copy /y "C:\Windows\.erlang.cookie" "C:\Users\daisy\.erlang.cookie"
cd /d "C:\Program Files\RabbitMQ Server\rabbitmq_server-3.6.6\sbin" 
call rabbitmqctl.bat add_user daisy daisy
call rabbitmqctl.bat set_user_tags daisy administrator
call rabbitmqctl.bat set_permissions -p / daisy ".*" ".*" ".*"
call rabbitmq-plugins.bat enable rabbitmq_management

拷貝windows下的cookie到使用者目錄下

建立使用者

然後安裝RabbitMQ的web監控外掛

訪問127.0.0.1:15672即可看到該視覺化頁面。

rabbitMQ概念

遵循AMQP協議,使用erlang語言實現

AMQP 高階訊息佇列協議

AMQP:

1.connection => open、use、close 【open-ok、close、tune-ok】

2.channel => open、flow、close 【構建在connection之上,在amqp中常作為長連結】

3.exchange

4.queue是

5.basic => 釋出和獲取message中的一些設定

6.tx => 事務處理

7.confirm => 釋出確認機制

erlang自帶資料庫 measia

交換機的四種類型: 不同型別的交換機,定義了不同的規則

  1. direct: 直連 提前預知性的binding info,bug.error->exchange
  2. headers :and、or 性的binding x-match any all
  3. topic: 歸類性的binding,天生帶有正則
  4. fanout: 多播現象,群發性的binding

RabbitMQ demo

生產者:

var factory = new ConnectionFactory
            {
                Uri = new Uri("amqp://daisy:[email protected]:5672/")
            };

            using (var connection = factory.CreateConnection())
            {
                using (var channel = connection.CreateModel())
                {
                    //一次只處理一個,序列處理佇列中訊息,不會給應用程式造成太大的負擔
                    channel.BasicQos(0,1,false);
                    //定義溢位訊息傳送的交換機
                    channel.ExchangeDeclare("mydead_exchange", ExchangeType.Direct);
                    //宣告一個帶名稱的佇列
                    channel.QueueDeclare("hello", true, false, false, new Dictionary<string, object>()
                    {
                        { "x-max-length",10}, //佇列存放最多的條數
                        { "x-dead-letter-exchange","mydead_exchange"}, //超過最大限度的,從前面開始,發到這個交換機,然後通過路由key,路由到指定的佇列中
                        { "x-dead-letter-routing-key","medead_key"},
                    });

                    //將訊息標記為永續性,同時需要指定佇列的durable為true
                    var properties = channel.CreateBasicProperties();
                    properties.Persistent = true;
                    //構建byte訊息資料包
                    string message = "Hello World!";

                    var i = 0;
                    do
                    {
                        var body = Encoding.UTF8.GetBytes(message + i);
                        properties.Priority = (byte)i;
                        channel.BasicPublish("", "hello", properties, body);
                        Console.WriteLine("[x] send {0}:{1}", message, i);
                        i++;
                        Console.WriteLine("是否繼續傳送訊息。是:y,否:其他任意鍵");
                    } while (Console.ReadKey().KeyChar.Equals('y'));
                }
                Console.ReadKey();
            }

 消費者:

var factory = new ConnectionFactory
            {
                Uri = new Uri("amqp://daisy:[email protected]:5672")
            };
            using (var conn = factory.CreateConnection())
            {
                using (var channel = conn.CreateModel())
                {
                    #region test
                    //定義交換機
                    channel.ExchangeDeclare("myexchange", ExchangeType.Direct);
                    //定義佇列log_else,路由debug、info、warning都指向該佇列
                    channel.QueueDeclare("log_else", true, false, false, null);
                    var array = new string[3] { "debug", "info", "warning" };
                    foreach (var s in array)
                    {
                        channel.QueueBind("log_else", "myexchange", s, null);
                    }
                    //定義佇列log_error,路由為error
                    channel.QueueDeclare("log_error", true, false, false, null);
                    channel.QueueBind("log_error", "myexchange", "error", null);
                    #endregion

                    //定義溢位資訊存放佇列
                    channel.QueueDeclare("mydead_queue", true, false, false, null);
                    channel.QueueBind("mydead_queue", "mydead_exchange", "medead_key", null);

                    var consumer = new EventingBasicConsumer(channel);
                    consumer.Received += (model, ea) =>
                    {
                        var body = ea.Body;
                        var message = Encoding.UTF8.GetString(body);
                        Console.WriteLine("[x] received {0}", message);
                        // ReSharper disable once AccessToDisposedClosure
                        //傳送訊息,手動確認
                        channel.BasicAck(ea.DeliveryTag, false);
                    };

                    channel.BasicConsume("hello", false, consumer); //false 關閉自動確認
                    Console.WriteLine("Press [enter] to Exit");
                    Console.ReadKey();
                }