1. 程式人生 > 實用技巧 >RabbitMQ(dotnet基本使用

RabbitMQ(dotnet基本使用

前言

RabbitMQ環境環境搭建及基本配置,在此不討論。網上一大堆。

NET環境下,Rabbit庫可以在官網或NUGET上查詢得到。

生產者

static void Main(string[] args)
{
var factory = new ConnectionFactory();//連線工廠
factory.HostName = "127.0.0.1";//地址
factory.UserName = "Test";//登入名
factory.Password = "t123456!";//密碼 using (var connection = factory.CreateConnection())//建立連線
{
using (var channel = connection.CreateModel())//建立通道
{
//建立佇列,第二個引數bool:是否佇列持久化
channel.QueueDeclare(
queue: "test", //訊息佇列名稱
durable: false,//訊息佇列是否持久化
exclusive: false,//訊息佇列是否被本次連線connection獨享。(本次連線
//connection建立的通道可以共用).排外的queue在當前連線被斷開的時候會
//自動消失(清除)無論是否設定了持久化.
autoDelete: false,//訊息佇列是否自動刪除。也就是說queue會清理自己,但
是是在最後一個connection斷開的時候。
arguments: null);//引數對 var properties = channel.CreateBasicProperties();//可以為null
properties.DeliveryMode = ;//多個消費工作佇列時,設定此屬性
properties.SetPersistent(true);//訊息持久化 string message = "Hello World";
var body = Encoding.UTF8.GetBytes(message);
//釋出訊息
//第一個引數:交換器,空預設為direct
//第二個引數:direct時,為佇列名
//第三個引數:通道屬性,可以是BasicProperties,也可以是屬性介面
//第四個引數:訊息正文
channel.BasicPublish("", "hello", properties, body);
Console.WriteLine(" set {0}", message);
}
}
}

消費者

static void Main(string[] args)
{
var factory = new ConnectionFactory();//連線工廠
factory.HostName = "127.0.0.1";//地址
factory.UserName = "Test";//登入名
factory.Password = "t123456!";//密碼 using (var connection = factory.CreateConnection())//建立連線
{
using (var channel = connection.CreateModel())//建立通道
{
//建立佇列,第二個引數bool:是否佇列持久化
channel.QueueDeclare(
queue: "test", //訊息佇列名稱
durable: false,//訊息佇列是否持久化
exclusive: false,//訊息佇列是否被本次連線connection獨享。(本次連線
//connection建立的通道可以共用).排外的queue在當前連線被斷開的時候會
//自動消失(清除)無論是否設定了持久化.
autoDelete: false,//訊息佇列是否自動刪除。也就是說queue會清理自己,但
是是在最後一個connection斷開的時候。
arguments: null);//引數對 //第一種接收方式:迴圈接收方式
var consumer = new QueueingBasicConsumer(channel);//消費者例項
channel.BasicConsume(
queue: "Test", //佇列名稱
autoAck: false, //是否開啟收到訊息自動回覆
consumer: consumer//消費者
);
//多個消費者確寶公平,設定同一時間一個消費只能接收一個訊息;此方法慎用
channel.BasicQos(, , false);
while (true)
{
//佇列訊息物件
var ea = (BasicDeliverEventArgs)consumer.Queue.Dequeue();
var body = ea.Body;
var message = Encoding.UTF8.GetString(body);
//設定睡眠時間,可模擬工作佇列多個訊息者模式,自行決定
Thread.Sleep(* );
//假如,前面未設定自動回覆,則可以手動;
//響應給RabbitMQ服務:收到並處理了訊息。
channel.BasicAck(ea.DeliveryTag, false);
//遇到無法處理的訊息,拒絕且此訊息是否放回佇列中,傳送給其他消費者
channel.BasicReject(ea.DeliveryTag, false);
Console.WriteLine("Received {0}", message);
Console.WriteLine("Done");
}
//第二種接收方式:事件方式
//例項化一個事件型消費者
var consumer = new EventingBasicConsumer(channel);
//訂閱消費者接收訊息的事件
consumer.Received += (model, ea) =>
{
//獲取並解析資料
var body = ea.Body;
var message = Encoding.UTF8.GetString(body);
//響應給RabbitMQ服務:收到並處理了訊息。
channel.BasicAck(ea.DeliveryTag, false);
//遇到無法處理的訊息,拒絕且此訊息是否放回佇列中,傳送給其他消費者
channel.BasicReject(ea.DeliveryTag, false);
Console.WriteLine($"收到: {message}");
};
}
}
}