RabbitMQ (一)
阿新 • • 發佈:2021-09-28
一、RabbitMQ 簡介
RabbitMQ 是實現了高階訊息佇列協議(AMQP)的開源訊息代理軟體(也可以稱為:面向訊息的中介軟體)。
RabbitMQ 伺服器是用Erlang語言編寫的,而叢集和故障轉移是構建在開放電信平臺框架上的。
特性:
可伸縮性:叢集服務
訊息持久化:從記憶體持久化訊息到硬碟,再從硬碟載入到記憶體。
二、RabbitMQ安裝
1.1安裝條件:
1、 OTP 24.1 Windows 64-bit Binary File 需要先裝erlang
下載地址: https://www.erlang.org/downloads
2、下載 rabbitmq-server-3.8.3.exe 安裝包
1.2、裝完了erlang,回去找到剛才第一步下載的rabbitMQ的安裝包雙擊安裝,一樣的一直下一步就行了(中間遇到需要給它網路點確定就可以了)
1、開啟cmd介面進入rabbitMQ的安裝目錄下的sbin目錄
2、根據官網步驟執行命令
rabbitmq-plugins enable rabbitmq_management
1.3、安裝成功找到安裝目錄找到rabbitmq-server.bat雙擊執行(如果有錯就右鍵以管理員身份執行)
1、訪問地址:http://localhost:15672/
預設登入名:guest
登入密碼:guest
三、RabbitMQ 的基本使用
新建生產者控制檯專案:RabbitMQ_Producer
通過nuget安裝:
RabbitMQ.Client
程式碼中的使用者名稱和密碼不能使用預設的要不然會報錯,在Admin中新增一個使用者,如圖:
新增之後預設沒有連線許可權需要去設定,點選新建使用者,如圖:
生產者程式碼如下:
static void Main(string[] args)
{
string host = "192.168.1.4"; //IP 地址
int port = 5672; //埠號
string userName = "admin"; //RabbitMQ 使用者名稱
string userPw = "admin"; //RabbitMQ 密碼
string virtualHost = "/";
//建立工廠
var conFactory = new ConnectionFactory();
//賦值
conFactory.HostName = host;
conFactory.Port = port;
conFactory.UserName = userName;
conFactory.Password = userPw;
conFactory.VirtualHost = virtualHost;
//建立連線
var connection = conFactory.CreateConnection();
//佇列名稱
string queuesName = "test";
//建立通道
var channel = connection.CreateModel();
//給通道繫結一個佇列,佇列如果不存在,則會建立新佇列,如果佇列已存在,那麼引數一定要正確,特別是param引數,否則會報錯
var param = new Dictionary<string, object>() { { "x-queue-type", "classic" } };
channel.QueueDeclare(queue: queuesName,durable:true,exclusive:false,arguments:param);
//傳送佇列
for (int i = 0; i < 5; i++)
{
var buffer = Encoding.UTF8.GetBytes(i.ToString());
channel.BasicPublish("", queuesName,null,buffer);
Console.WriteLine("傳送訊息:"+i);
}
connection.Close();
Console.ReadKey();
}
消費者程式碼:
static void Main(string[] args)
{
string host = "192.168.1.4";
int port = 5672;
string userName = "admin";
string userPw = "admin";
string virtualHost = "/";
//建立工廠
var conFactory = new ConnectionFactory();
//賦值
conFactory.HostName = host;
conFactory.Port = port;
conFactory.UserName = userName;
conFactory.Password = userPw;
conFactory.VirtualHost = virtualHost;
//建立連線
var connection = conFactory.CreateConnection();
//佇列名稱
string queuesName = "test";
//建立通道
var channel = connection.CreateModel();
//給通道繫結一個佇列,佇列如果不存在,則會建立新佇列,如果佇列已存在,那麼引數一定要正確,param引數,否則會報錯
var param = new Dictionary<string, object>() { { "x-queue-type", "classic" } };
channel.QueueDeclare(queue: queuesName, durable: true, exclusive: false, arguments: param);
//建立消費者
EventingBasicConsumer consumer = new EventingBasicConsumer(channel);
consumer.Received += (sender, e) =>
{
var body = e.Body.Span;
var message = Encoding.UTF8.GetString(body);
Console.WriteLine($"接收到訊息:{message}");
Thread.Sleep(500);//暫停一下
//通知訊息已被處理,如果沒有,那麼訊息將會被重複消費
channel.BasicAck(e.DeliveryTag, false);
};
//ack設定成false,表示不自動提交,那麼就需要在訊息被消費後,手動呼叫BasicAck去提交訊息
channel.BasicConsume(queuesName, false, consumer);
Console.ReadKey();
}