.net core RabbitMQ 訊息佇列
阿新 • • 發佈:2019-01-09
上篇我們說到erlang的安裝,現在有了基礎前提,就可以繼續安裝RabbitMQ了!
這裡我選用的RabbitMQ版本是:
PS:這個RabbitMQ版本是要對應前面erlang版本,所以前面我們安裝的版本是21.2,因為需要提供21.X版本的erlang才能安裝 rabbitmq-server-3.7.10
1.安裝RabbitMQ
下載地址:http://www.rabbitmq.com/install-windows.html
download完,執行exe檔案,安裝到自己選用的目錄,並配置環境變數
rabbitmq的基本操作:
- 啟動:rabbitmq-server -detached
- 關閉:rabbitmqctl stop
- 啟動:rabbitmqctl status
2.配置rabbitmq網頁管理外掛
以管理員執行命令提示啟用外掛:
rabbitmq-plugins enable rabbitmq_management
開啟瀏覽器頁面:http://localhost:15672 可以看到
預設登陸為:guest/guest
3.開啟rabbitMQ遠端訪問
- 新增使用者,使用者名稱:XRom 密碼:XRom123
rabbitmqctl add_user XRom XRom123
- 新增許可權
rabbitmqctl set_permissions -p "/" XRom ".*" ".*" ".*"
- 修改使用者角色
rabbitmqctl set_user_tags XRom administrator
然後就可以遠端訪問了,可以用新增的使用者登入RabbitMQ
4.Producer與Exchange
- Producer
訊息的生產者,也就是建立訊息的物件 - Exchange
訊息的接受者,也就是用來接收訊息的物件,Exchange接收到訊息後將訊息按照規則傳送到與他繫結的Queue中。下面我們來定義一個Producer與Exchange。
新建.net core 控制檯專案,並引入NuGet包
接下來可以用程式碼看實現效果:
using RabbitMQ.Client; using RabbitMQ.Client.Events; using System; using System.Text; using System.Threading; namespace RabbitMQConsole { class Program { /// <summary> /// 建立只讀連線物件 /// </summary> private static readonly ConnectionFactory rabbitMqFactory = new ConnectionFactory() { HostName = "",//這裡寫自己電腦hostname,可以通過命令提示符,直接輸入hostname查詢 Port = 5672, UserName = "XRom", Password = "XRom123", VirtualHost = "/" }; static void Main(string[] args) { var exchange = "change2"; var route = "route2"; var queue = "queue2"; using (IConnection conn = rabbitMqFactory.CreateConnection()) { using (IModel channel = conn.CreateModel()) { channel.ExchangeDeclare(exchange, type: "direct", durable: true, autoDelete: false);//建立change2 channel.QueueDeclare(queue, durable: true, exclusive: false, autoDelete: false);//建立queue2 channel.QueueBind(queue, exchange, route);//將queue2繫結到change2 #region 傳送訊息 var props = channel.CreateBasicProperties(); props.Persistent = true; //持久化 channel.BasicPublish(exchange, route, true, props, Encoding.UTF8.GetBytes("hello rabbitmq!!")); #endregion #region 消費訊息 //while (true) //{ // var message = channel.BasicGet(queue, true); //第二個引數說明自動釋放訊息,如為false需手動釋放訊息 // if (message != null) // { // var msgBody = Encoding.UTF8.GetString(message.Body); // Console.WriteLine(string.Format("***接收時間:{0},訊息內容:{1}", DateTime.Now.ToString("yyyy-MM-dd HH:mm:ss"), msgBody)); // } // System.Threading.Thread.Sleep(TimeSpan.FromSeconds(1)); //} #endregion #region 讓失敗的訊息回到佇列中 //while (true) //{ // var message = channel.BasicGet(queue, false); // if (message != null) // { // var msgBody = Encoding.UTF8.GetString(message.Body); // Console.WriteLine(string.Format("***接收時間:{0},訊息內容:{1}", DateTime.Now.ToString("yyyy-MM-dd HH:mm:ss"), msgBody)); // Console.WriteLine(message.DeliveryTag); //當前訊息被處理的次序數 // if (1 == 1) // channel.BasicReject(message.DeliveryTag, true); // } // System.Threading.Thread.Sleep(TimeSpan.FromSeconds(1)); //} #endregion #region 監聽訊息 //channel.BasicQos(prefetchSize: 0, prefetchCount: 20, global: false); //一次接受10條訊息,否則rabbit會把所有的訊息一次性推到client,會增大client的負荷 //EventingBasicConsumer consumer = new EventingBasicConsumer(channel); //consumer.Received += (model, ea) => //{ // Byte[] body = ea.Body; // String message = Encoding.UTF8.GetString(body); // Console.WriteLine(message + Thread.CurrentThread.ManagedThreadId); // channel.BasicAck(deliveryTag: ea.DeliveryTag, multiple: false); //}; //channel.BasicConsume(queue: queue, autoAck: false, consumer: consumer); //Console.ReadLine(); #endregion } } } } }