RabbitMq筆記(3)
阿新 • • 發佈:2020-08-21
public class RabbitBasic { public ConnectionFactory Factory; public string PickRelease => System.Configuration.ConfigurationManager.AppSettings.Get("MQName"); public RabbitBasic() { string host = System.Configuration.ConfigurationManager.AppSettings.Get("RabbitMQHost"); string pwd = System.Configuration.ConfigurationManager.AppSettings.Get("RabbitMQPwd"); string username = System.Configuration.ConfigurationManager.AppSettings.Get("RabbitMQUserName"); string vhost = System.Configuration.ConfigurationManager.AppSettings.Get("RabbitMQVirtualHost"); //例項化連線工廠 Factory = new ConnectionFactory() { HostName = host, Password = pwd, UserName = username, VirtualHost = vhost, }; } }
public class RabbitSender:RabbitBasic { public void Sender(byte[] ary) { Send(PickRelease, ary); } public void Send(string MQName, byte[] ary) { try { using (var connection = Factory.CreateConnection()) { using (var channel = connection.CreateModel()) { channel.QueueDeclare(queue: MQName, durable: true, exclusive: false, autoDelete: false, arguments: null); channel.BasicPublish(exchange: "", routingKey: MQName, basicProperties: null, body: ary); } } } catch (Exception ex) { throw ex; } } }
public class RabbitReceiver : RabbitBasic { public void Receive() { var connection = Factory.CreateConnection();//這裡就是已經連結上了RabbitMQ var channel = connection.CreateModel(); channel.QueueDeclare(queue: PickRelease, durable: true, exclusive: false, autoDelete: false, arguments: null); channel.BasicQos(0, 1, false); //5. 構造消費者例項 var consumer = new EventingBasicConsumer(channel); //6. 繫結訊息接收後的事件委託 consumer.Received += (model, ea) => { var message = Encoding.UTF8.GetString(ea.Body);//ea.Body 接收到Rabbit中的資料 var obj = new Hub.SO(); try { message就是從MQ中收到的資料,這裡可以做邏輯計算了 channel.BasicAck(deliveryTag: ea.DeliveryTag, multiple: false); } catch (System.Exception ex) { if (ex.GetType().ToString() == "Exception") { channel.BasicReject(deliveryTag: ea.DeliveryTag, requeue: false); } else { channel.BasicAck(deliveryTag: ea.DeliveryTag, multiple: false); } } }; //7. 啟動消費者 channel.BasicConsume(queue: PickRelease, autoAck: false, consumer: consumer); } }
static void Main(string[] args) { var sender = new MQ.RabbitSender(); foreach (var q in logs) { var bary = Encoding.UTF8.GetBytes(JsonConvert.SerializeObject(m)); sender.Sender(bary);//傳送訊息 } //這裡的生產者和消費者寫在同一個啟動方法中,這樣一個Main方法啟動,生產者和消費者就可以運行了。 //要是寫在兩個專案下,還要啟動另外一個的專案 var rabbit = new MQ.RabbitReceiver();//例項化一個Rabbit消費者 rabbit.Receive();// 接收訊息(開始提取資料包) }