RabbitMQ (八) : 消息確認機制之事務機制
阿新 • • 發佈:2019-02-07
private 消息 lar except publish help bsp 消費者 consumer
實在沒啥好說的.
生產者
public class Producer { private const string QueueName = "test_work_queue"; public static void Send() { //獲取一個連接 IConnection connection = ConnectionHelper.GetConnection(); //從連接中獲取一個通道 IModel channel = connection.CreateModel();//聲明隊列 channel.QueueDeclare(QueueName, false, false, false, null); //創建一個消息 string msg = "hello world "; try { //開啟事務機制 //事務機制性能不好,不建議使用.因為需要和服務器發生額外的通信,降低了 RabbitMQ 的吞吐量 channel.TxSelect();//發送消息 channel.BasicPublish("", QueueName, null, Encoding.Default.GetBytes(msg)); //提交 channel.TxCommit(); Console.WriteLine($"send {msg}"); } catch (Exception e) { //回滾 channel.TxRollback();Console.WriteLine(e); } channel.Close(); connection.Close(); } }
消費者
public class Consumer { private const string QueueName = "test_work_queue"; public static void Receive() { //獲取連接 RabbitMQ.Client.IConnection connection = ConnectionHelper.GetConnection(); //創建通道 RabbitMQ.Client.IModel channel = connection.CreateModel(); //聲明隊列 channel.QueueDeclare(QueueName, false, false, false, null); //添加消費者 EventingBasicConsumer consumer = new EventingBasicConsumer(channel); //註冊事件 consumer.Received += Consumer_Received; //監聽隊列 channel.BasicConsume(QueueName, true, "", false, false, null, consumer); } private static void Consumer_Received(object sender, BasicDeliverEventArgs e) { byte[] bytes = e.Body; string str = Encoding.Default.GetString(bytes); Console.WriteLine("consumer : " + str); } }
RabbitMQ (八) : 消息確認機制之事務機制