RabbitMQ——安裝與概念
目錄
RabbitMQ安裝:
erlang下載地址:http://www.erlang.org/downloads
RabbitMQ下載地址:http://www.rabbitmq.com/
在windows中安裝RabbitMQ還是十分簡便的,在以前地址找到合適的安裝包下載。
先安裝erlang,因為RabbitMQ服務端使用的語言是erlang。注意安裝時,以管理員身份安裝。
再安裝RabbitMQ,一步步安裝就可以啦。
配置環境變數,方便使用外掛。在rabbitmq的安裝目錄找到RabbitMQ Server\rabbitmq_server-3.6.6\sbin,新增到環境變數path中。如下圖:
執行一個bat檔案
@echo on
attrib -R "C:\Users\daisy\.erlang.cookie"
copy /y "C:\Windows\.erlang.cookie" "C:\Users\daisy\.erlang.cookie"
cd /d "C:\Program Files\RabbitMQ Server\rabbitmq_server-3.6.6\sbin"
call rabbitmqctl.bat add_user daisy daisy
call rabbitmqctl.bat set_user_tags daisy administrator
call rabbitmqctl.bat set_permissions -p / daisy ".*" ".*" ".*"
call rabbitmq-plugins.bat enable rabbitmq_management
拷貝windows下的cookie到使用者目錄下
建立使用者
然後安裝RabbitMQ的web監控外掛
訪問127.0.0.1:15672即可看到該視覺化頁面。
rabbitMQ概念
遵循AMQP協議,使用erlang語言實現
AMQP 高階訊息佇列協議
AMQP:
1.connection => open、use、close 【open-ok、close、tune-ok】
2.channel => open、flow、close 【構建在connection之上,在amqp中常作為長連結】
3.exchange
4.queue是
5.basic => 釋出和獲取message中的一些設定
6.tx => 事務處理
7.confirm => 釋出確認機制
erlang自帶資料庫 measia
交換機的四種類型: 不同型別的交換機,定義了不同的規則
- direct: 直連 提前預知性的binding info,bug.error->exchange
- headers :and、or 性的binding x-match any all
- topic: 歸類性的binding,天生帶有正則
- fanout: 多播現象,群發性的binding
RabbitMQ demo
生產者:
var factory = new ConnectionFactory
{
Uri = new Uri("amqp://daisy:[email protected]:5672/")
};
using (var connection = factory.CreateConnection())
{
using (var channel = connection.CreateModel())
{
//一次只處理一個,序列處理佇列中訊息,不會給應用程式造成太大的負擔
channel.BasicQos(0,1,false);
//定義溢位訊息傳送的交換機
channel.ExchangeDeclare("mydead_exchange", ExchangeType.Direct);
//宣告一個帶名稱的佇列
channel.QueueDeclare("hello", true, false, false, new Dictionary<string, object>()
{
{ "x-max-length",10}, //佇列存放最多的條數
{ "x-dead-letter-exchange","mydead_exchange"}, //超過最大限度的,從前面開始,發到這個交換機,然後通過路由key,路由到指定的佇列中
{ "x-dead-letter-routing-key","medead_key"},
});
//將訊息標記為永續性,同時需要指定佇列的durable為true
var properties = channel.CreateBasicProperties();
properties.Persistent = true;
//構建byte訊息資料包
string message = "Hello World!";
var i = 0;
do
{
var body = Encoding.UTF8.GetBytes(message + i);
properties.Priority = (byte)i;
channel.BasicPublish("", "hello", properties, body);
Console.WriteLine("[x] send {0}:{1}", message, i);
i++;
Console.WriteLine("是否繼續傳送訊息。是:y,否:其他任意鍵");
} while (Console.ReadKey().KeyChar.Equals('y'));
}
Console.ReadKey();
}
消費者:
var factory = new ConnectionFactory
{
Uri = new Uri("amqp://daisy:[email protected]:5672")
};
using (var conn = factory.CreateConnection())
{
using (var channel = conn.CreateModel())
{
#region test
//定義交換機
channel.ExchangeDeclare("myexchange", ExchangeType.Direct);
//定義佇列log_else,路由debug、info、warning都指向該佇列
channel.QueueDeclare("log_else", true, false, false, null);
var array = new string[3] { "debug", "info", "warning" };
foreach (var s in array)
{
channel.QueueBind("log_else", "myexchange", s, null);
}
//定義佇列log_error,路由為error
channel.QueueDeclare("log_error", true, false, false, null);
channel.QueueBind("log_error", "myexchange", "error", null);
#endregion
//定義溢位資訊存放佇列
channel.QueueDeclare("mydead_queue", true, false, false, null);
channel.QueueBind("mydead_queue", "mydead_exchange", "medead_key", null);
var consumer = new EventingBasicConsumer(channel);
consumer.Received += (model, ea) =>
{
var body = ea.Body;
var message = Encoding.UTF8.GetString(body);
Console.WriteLine("[x] received {0}", message);
// ReSharper disable once AccessToDisposedClosure
//傳送訊息,手動確認
channel.BasicAck(ea.DeliveryTag, false);
};
channel.BasicConsume("hello", false, consumer); //false 關閉自動確認
Console.WriteLine("Press [enter] to Exit");
Console.ReadKey();
}