1. 程式人生 > 其它 >RabbitMQ-Work Queues

RabbitMQ-Work Queues

工作佇列(Work Queues)

場景:假設生產者向佇列中新增一條資料的時間為1秒,消費者從佇列中消費一條資料執行完業務邏輯需要5秒,在這種情況下佇列就會不斷堆積最終導致服務癱瘓。

解決方案:執行多個消費者,同時消費佇列中的任務

生產者

定義一個task_Queue佇列;1秒向佇列中傳送一條訊息

var factory = new ConnectionFactory
{
Uri = new Uri("amqp://admin:[email protected]:5672"),
AutomaticRecoveryEnabled = true
};
using (var connection = factory.CreateConnection())
using (var channel = connection.CreateModel())
{
channel.QueueDeclare(queue: "task_queue",
durable: true,//可持久化
exclusive: false,
autoDelete: false,
arguments: null);
int count = 0;
while (true)
{
count++;
var message = $"Task {count}";
var body = Encoding.UTF8.GetBytes(message);

var properties = channel.CreateBasicProperties();
properties.Persistent = true;

channel.BasicPublish(exchange: "",
routingKey: "task_queue",
basicProperties: properties,
body: body);
Console.WriteLine("Send {0}", message);
//暫停一秒
Task.Delay(1000).Wait();
}
}

消費者

定義一個task_Queue佇列;5秒消費一條訊息

var factory = new ConnectionFactory
{
Uri = new Uri("amqp://admin:[email protected]:5672"),
AutomaticRecoveryEnabled = true
};
using (var connection = factory.CreateConnection())
using (var channel = connection.CreateModel())
{
channel.QueueDeclare(queue: "task_queue",
durable: true,//可持久化
exclusive: false,
autoDelete: false,
arguments: null);

channel.BasicQos(prefetchSize: 0, prefetchCount: 1, global: false);

Console.WriteLine("Waiting for messages.");

var consumer = new EventingBasicConsumer(channel);
consumer.Received += (sender, ea) =>
{
var body = ea.Body.ToArray();
var message = Encoding.UTF8.GetString(body);
Console.WriteLine("Received {0}", message);
Task.Delay(5000).Wait(); //等待5秒
Console.WriteLine("Task Done");

channel.BasicAck(deliveryTag: ea.DeliveryTag, multiple: false);//手動確認
};
channel.BasicConsume(queue: "task_queue",
autoAck: false,//關閉自動確認
consumer: consumer);
Console.WriteLine(" Press [enter] to exit.");
Console.ReadLine();
}

測試

啟動5個消費者

視覺化介面中檢視連線數為5個

啟動1個生產者

視覺化介面連線多了1個生產者

測試結果

5個消費者一起消費佇列中的任務。

執行一段時間後 rabbitmq 佇列中 也沒有過多的堆積任務

公眾號同步更新