1. 程式人生 > 其它 >C#使用RabbitMq佇列 worke模式

C#使用RabbitMq佇列 worke模式

//簡單生產端 ui呼叫者

using System;
namespace RabbitMqPublishDemo
{
using MyRabbitMqService;
using System.Runtime.CompilerServices;

class Program
{
static void Main(string[] args)
{
//就是簡單的佇列,生產者
Console.WriteLine("====RabbitMqPublishDemo====");
for (int i = 0; i < 500; i++)
{
ZrfRabbitMqHelper.PublishSampleMsg("smapleMsg", $"nihaifengge:{i}");
}
Console.WriteLine("生成完畢!");
Console.ReadLine();
}
}
}

/// <summary>
/// 簡單生產者 邏輯
/// </summary>
/// <param name="queueName"></param>
/// <param name="msg"></param>
public static void PublishSampleMsg(string queueName, string msg)
{

using (IConnection conn = connectionFactory.CreateConnection())
{
using (IModel channel = conn.CreateModel())
{
channel.QueueDeclare(queue: queueName, durable: false, exclusive: false, autoDelete: false, arguments: null);
var msgBody = Encoding.UTF8.GetBytes(msg);
channel.BasicPublish(exchange: "", routingKey: queueName, basicProperties: null, body: msgBody);
}
}
}


//簡單消費端
using System;

namespace RabbitMqConsumerDemo
{
using MyRabbitMqService;
using System.Runtime.InteropServices;

class Program
{
static void Main(string[] args)
{
Console.WriteLine("====RabbitMqConsumerDemo====");
ZrfRabbitMqHelper.ConsumeSampleMsg("smapleMsg", isBasicNack: true, handleMsgStr: handleMsgStr =>
{
Console.WriteLine($"訂閱到訊息:{DateTime.Now}:{handleMsgStr}");
});
Console.ReadLine();
}
}
}

#region 簡單生產者後端邏輯
/// <summary>
/// 簡單消費者
/// </summary>
/// <param name="queueName">佇列名稱</param>
/// <param name="isBasicNack">失敗後是否自動放到佇列</param>
/// <param name="handleMsgStr">有就自己對字串的處理,如果要儲存到資料庫請自行擴充套件</param>
public static void ConsumeSampleMsg(string queueName, bool isBasicNack = false, Action<string> handleMsgStr = null)// bool ifBasicReject = false,
{
Console.WriteLine("ConsumeSampleMsg Waiting for messages....");
IConnection conn = connectionFactory.CreateConnection();
IModel channel = conn.CreateModel();
channel.QueueDeclare(queue: queueName, durable: false, exclusive: false, autoDelete: false, arguments: null);
var consumer = new EventingBasicConsumer(channel);
consumer.Received += (sender, ea) =>
{
byte[] bymsg = ea.Body.ToArray();
string msg = Encoding.UTF8.GetString(bymsg);
if (handleMsgStr != null)
{
handleMsgStr.Invoke(msg);
}
else
{
Console.WriteLine($"{DateTime.Now}->收到訊息:{msg}");
}
};
channel.BasicConsume(queueName, autoAck: true, consumer);
}
#endregion