RabbitMQ - hello world
阿新 • • 發佈:2022-03-06
依賴
# nuget 安裝
RabbitMQ.Client
# 引入名稱空間
using RabbitMQ.Client
生產者
1、建立控制檯專案
2、構建連線引數
//建立連線的工廠(指定連線引數)
var factory = new ConnectionFactory()
{
HostName = "192.168.65.133",
UserName = "admin",
Password = "admin",
Port = 5672,
AutomaticRecoveryEnabled = true
};
或者
var factory = new ConnectionFactory
{
Uri = new Uri("amqp://admin:[email protected]:5672"),
AutomaticRecoveryEnabled = true
};
3、建立伺服器連線和傳輸通道
//通過連線工廠建立連線
using (var connection = factory.CreateConnection())
//建立通道
using (var channel = connection.CreateModel())
{
//...........
}
4、在通道里定義一個佇列;然後就可以向佇列中傳送訊息
//建立連線的工廠(指定連線引數)
var factory = new ConnectionFactory()
{
HostName = "192.168.65.133",
UserName = "admin",
Password = "admin",
Port = 5672
};
//通過連線工廠建立連線
using (var connection = factory.CreateConnection())
//建立通道
using (var channel = connection.CreateModel())
{
//定義佇列
channel.QueueDeclare(queue: "hello",
durable: false,
exclusive: false,
autoDelete: false,
arguments: null);
Console.WriteLine("生產者準備就緒");
Console.WriteLine("請輸入要傳送的內容!");
string content;
while ((content = Console.ReadLine()) != "q")
{
//將訊息轉換為二進位制資料
string message = content;
var body = Encoding.UTF8.GetBytes(message);
//釋出
channel.BasicPublish(exchange: "",
routingKey: "hello",
basicProperties: null,
body: body);
Console.WriteLine($" {DateTime.Now}:");
Console.WriteLine($" Send: {message}");
Console.WriteLine("請輸入要傳送的內容!");
}
}
Console.WriteLine(" Press [enter] to exit.");
Console.ReadLine();
啟動
視覺化介面中 Queues 中已經有一條資料了
消費者
1、大概和生產者雷同,開啟連線和通道宣告消費的佇列。
注意:與生產者釋出訊息的佇列保持一致。
//建立連線的工廠(指定連線引數)
var factory = new ConnectionFactory()
{
HostName = "192.168.65.133",
UserName = "admin",
Password = "admin",
Port = 5672
};
using (var connection = factory.CreateConnection())
using (var channel = connection.CreateModel())
{
//定義消費佇列
channel.QueueDeclare(queue: "hello",
durable: false,
exclusive: false,
autoDelete: false,
arguments: null);
//........................
}
2、伺服器從佇列中傳遞訊息給消費者。由於它會異步向我們推送訊息,因此我們提供了一個回撥
使用 EventingBasicConsumer.Received 事件處理
//建立連線的工廠(指定連線引數)
var factory = new ConnectionFactory()
{
HostName = "192.168.65.133",
UserName = "admin",
Password = "admin",
Port = 5672
};
using (var connection = factory.CreateConnection())
using (var channel = connection.CreateModel())
{
channel.QueueDeclare(queue: "hello",
durable: false,
exclusive: false,
autoDelete: false,
arguments: null);
//回撥事件
var consumer = new EventingBasicConsumer(channel);
consumer.Received += (model, ea) =>
{
var body = ea.Body.ToArray();
var message = Encoding.UTF8.GetString(body);
Console.WriteLine(" [x] Received {0}", message);
};
//消費
channel.BasicConsume(queue: "hello",
autoAck: true,
consumer: consumer);
Console.WriteLine(" Press [enter] to exit.");
Console.ReadLine();
}
啟動
視覺化介面的資料已被消費