.net平臺的rabbitmq使用封裝demo詳解
目錄
- 前言
- 什麼是rabbitMQ
- Rabbitmq的關鍵術語
- Rabbitmq的運作
- Publish(釋出)的封裝
- Subscribe(訂閱)的封裝
- Pull(拉)的封裝
- Rpc(遠端呼叫)的封裝
- 結尾
前言
RabbitMq大家再熟悉不過,這篇文章主要針對rabbitmq學習後封裝RabbitMQ.Client的一個分享。文章最後,我會把封裝元件和demo奉上。
什麼是rabbitMQ
RabbitMQ是一個由erlang開發的AMQP(Advanced Message Queue 高階訊息佇列協議 )的開源實現,能夠實現非同步訊息處理
RabbitMQ是一個訊息代理:它接受和轉發訊息。
你可以把它想象成一個郵局:當你把你想要釋出的郵件放在郵箱中時,你可以確定郵差先生最終將郵件傳送給你的收件人。在這個比喻中,RabbitMQ是郵政信箱,郵局和郵遞員。
RabbitMQ和郵局的主要區別在於它不處理紙張,而是接受,儲存和轉發二進位制資料塊
優點:非同步訊息處理
業務解耦(下訂單操作:扣減庫存、生成訂單、發紅包、發簡訊),將下單操作主流程:扣減庫存、生成訂單,然後通過MQ訊息佇列完成通知,發紅包、發簡訊,錯峰流控 (通知量 訊息量 訂單量大的情況實現MQ訊息佇列機制,淡季情況下訪問量會少)
靈活的路由(Flexible Routing)
在訊息進入佇列之前,通過 Exchange 來路由訊息的。對於典型的路由功能,RabbitMQ 已經提供了一些內建的 Exchanhttp://www.cppcns.comge 來實現。針對更復雜的路由功能,可以將多個 Exchange 繫結在一起,也通過外掛機制實現自己的 Exchange 。
RabbitMQ埠號:15672
程式裡面實現的埠為:5672
Rabbitmq的關鍵術語
1、繫結器(Binding):根據路由規則繫結Queue和Exchange。
2、路由鍵(Routing Key):Exchange根據關鍵字進行訊息投遞。
3、交換機(Exchange):指定訊息按照路由規則進入指定佇列
4、訊息佇列(Queue):訊息的儲存載體
5、生產者(Producer):訊息釋出者。
6、消費者(Consumer):訊息接收者。
Rabbitmq的運作
從下圖可以看出,釋出者(Publisher)是把訊息先發送到交換器(Exchange),再從交換器傳送到指定佇列(Queue),而先前已經宣告交換器與佇列繫結關係,最後消費者(Customer)通過訂閱或者主動取指定佇列訊息進行消費。
那麼剛剛提到的訂閱和主動取可以理解成,推(被動),拉(主動)。
推,只要佇列增加一條訊息,就會通知空閒的消費者進行消費。(我不找你,就等你找我,觀察者模式)
拉,不會通知消費者,而是由消費者主動輪循或者定時去取佇列訊息。(我需要才去找你)
使用場景我舉個例子,假如有兩套系統 訂單系統和發貨系統,從訂單系統發起發貨訊息指令,為了及時發貨,發貨系統需要訂閱佇列,只要有指令就處理。
可是程式偶爾會出異常,例如網路或者DB超時了,把訊息丟到失敗佇列,這個時候需要重發機制。但是我又不想while(IsPostSuccess == True),因為只要出異常了,會在某個時間段內都會有異常,這樣的重試是沒意義的。
這個時候不需要及時的去處理訊息,有個JOB定時或者每隔幾分鐘(失敗次數*間隔分鐘)去取失敗佇列訊息,進行重發。
Publish(釋出)的封裝
步驟:初始化連結->宣告交換器->宣告佇列->換機器與佇列繫結->釋出訊息。注意的是,我將Model存到了ConcurrentDictionary裡面,因為宣告與繫結是非常耗時的,其次,往重複的佇列傳送訊息是不需要重新初始化的。
/// <summary>
/// 交換器宣告
/// </summary>
/// <param name="iModel"></param>
/// <param name="exchange">交換器</param>
/// <param name="type">交換器型別:
/// 1、Direct Exchange – 處理路由鍵。需要將一個佇列繫結到交換機上,要求該訊息與一個特定的路由鍵完全
/// 匹配。這是一個完整的匹配。如果一個佇列繫結到該交換機上要求路由鍵 “dog”,則只有被標記為“dog”的
/// 訊息才被轉發,不會轉發dog.puppy,也不會轉發dog.guard,只會轉發dog
/// 2、Fanout Exchange – 不處理路由鍵。你只需要簡單的將佇列繫結到交換機上。一個傳送到交換機的訊息都
/// 會被轉發到與該交換機繫結的所有佇列上。很像子網廣播,每臺子網內的主機都獲得了一份複製的訊息。Fanout
/// 交換機轉發訊息是最快的。
/// 3、Topic Exchange – 將路由鍵和某模式進行匹配。此時佇列需要繫結要一個模式上。符號“#”匹配一個或多
/// 個詞,符號“*”匹配不多不少一個詞。因此“audiwww.cppcns.comt.#”能夠匹配到“audit.irs.corporate”,但是“audit.*”
/// 只會匹配到“audit.irs”。</param>
/// <param name="durable">持久化</param>
/// <param name="autoDelete">自動刪除</param>
/// <param name="arguments">引數</param>
private static void ExchangeDeclare(IModel iModel,string exchange,string type = ExchangeType.Direct,bool durable = true,bool autoDelete = false,IDictionary<string,object> arguments = null)
{
exchange = exchange.IsNullOrWhiteSpace() ? "" : exchange.Trim();
iModel.ExchangeDeclare(exchange,type,durable,autoDelete,arguments);
}
/// <summary>
/// 佇列宣告
/// </summary>
/// <param name="channel"></param>
/// <param name="queue">佇列</param>
/// <param name="durable">持久化</param>
/// <param name="exclusive">排他佇列,如果一個佇列被宣告為排他佇列,該佇列僅對首次宣告它的連線可見,
/// 並在連線斷開時自動刪除。這裡需要注意三點:其一,排他佇列是基於連線可見的,同一連線的不同通道是可
/// 以同時訪問同一個連線建立的排他佇列的。其二,“首次”,如果一個連線已經聲明瞭一個排他佇列,其他連
/// 接是不允許建立同名的排他佇列的,這個與普通佇列不同。其三,即使該佇列是持久化的,一旦連線關閉或者
/// 客戶端退出,該排他佇列都會被自動刪除的。這種佇列適用於只限於一個客戶端傳送讀取訊息的應用場景。</param>
/// <param name="autoDelete">自動刪除</param>
/// <param name="arguments">引數</param>
private static void QueueDeclare(IModel channel,string queue,bool exclusive = false,object> arguments = null)
{
queue = queue.IsNullOrWhiteSpace() ? "UndefinedQueueName" : queue.Trim();
channel.QueueDeclare(queue,exclusive,arguments);
}
/// <summary>
/// 獲取Model
/// </summary>
/// <param name="exchange">交換機名稱</param>
/// <param name="queue">佇列名稱</param>
/// <param name="routingKey"></param>
/// <param name="isProperties">是否持久化</param>
/// <returns></returns>
private static IModel GetModel(string exchange,string routingKey,bool isProperties = false)
{
return ModelDic.GetOrAdd(queue,key =>
{
var model = _conn.CreateModel();
ExchangeDeclare(model,exchange,ExchangeType.Fanout,isProperties);
QueueDeclare(model,queue,isProperties);
model.QueueBind(queue,routingKey);
ModelDic[queue] = model;
return model;
});
}
/// <summary>
/// 釋出訊息
/// </summary>
/// <param name="routingKey">路由鍵</param>
/// <param name="body">佇列資訊</param>
/// <param name="exchange">交換機名稱</param>
/// <param name="queue">佇列名</param>
/// <param name="isProperties">是否持久化</param>
/// <returns></returns>
public void Publish(string exchange,string body,bool isProperties = false)
{
var channel = GetModel(exchange,routingKey,isProperties);
try
{
channel.BasicPublish(exchange,null,body.SerializeUtf8());
}
catch (Exception ex)
{
throw ex.GetInnestException();
}
}
下次是本機測試的釋出速度截圖:
4.2W/S屬於穩定速度,把反序列化(Toon)會稍微快一些。
Subscribe(訂閱)的封裝
釋出的時候是申明瞭交換器和佇列並繫結,然而訂閱的時候只需要宣告佇列就可。從下面程式碼能看到,捕獲到異常的時候,會把訊息送到自定義的“死信佇列”裡,由另外的JOB進行定時重發,因此,finally是應答成功的。
/// <summary> /// 獲取Model /// </summary> /// <param name="queue">佇列名稱</param> /// <param name="isProperties"></param> /// <returns></returns> private static IModel GetModel(string queue,bool isProperties = false) { return ModelDic.GetOrAdd(queue,value => { var model = _conn.CreateModel(); QueueDeclare(model,isProperties); //每次消費的訊息數 model.BasicQos(0,1,false); ModelDic[queue] = model; return model; }); } /// <summary> /// 接收訊息 /// </summary> /// <typeparam name="T"></typeparam> /// <param name="queue">佇列名稱</param> /// <param name="isProperties"></param> /// <param name="handler">消費處理</param> /// <param name="isDeadLetter"></param> public void Subscribe<T>(string queue,bool isProperties,Action<T> handler,bool isDeadLetter) where T : class { //佇列宣告 var channel = GetModel(queue,isProperties); var consumer = new EventingBasicConsumer(channel); consumer.Received += (model,ea) => { var body = ea.Body; var msgStr = body.DeserializeUtf8(); var msg = msgStr.FromJson<T>(); try { handler(msg); } catch (Exception ex) { ex.GetInnestException().WriteToFile("佇列接收訊息","RabbitMq"); if (!isDeadLetter) PublishToDead<DeadLetterQueue>(queue,msgStr,ex); } finally { channel.BasicAck(ea.DeliveryTag,false); } }; channel.BasicConsume(queue,false,consumer); }
下次是本機測試的釋出速度截圖:
快的時候有1.9K/S,慢的時候也有1.7K/S
Pull(拉)的封裝
直接上程式碼:
/// <summary>
/// 獲取訊息
/// </summary>
/// <typeparam name="T"></typeparam>
/// <param name="exchange"></param>
/// <param name="queue"></param>
/// <param name="routingKey"></param>
/// <param name="handler">消費處理</param>
private void Poll<T>(string exchange,Action<T> handler) where T : class
{
var channel = GetModel(exchange,routingKey);
var result = channel.BasicGet(queue,false);
if (result.IsNull())
return;
var msg = result.Body.DeserializeUtf8().FromJson<T>();
try
{
handler(msg);
}
catch (Exception ex)
{
ex.GetInnestException().WriteToFile("佇列接收訊息","RabbitMq");
YrSBXlOL }
finally
{
channel.BasicAck(result.DeliveryTag,false);
}
}
快的時候有1.8K/s,穩定是1.5K/S
Rpc(遠端呼叫)的封裝
首先說明下,RabbitMq只是提供了這個RPC的功能,但是並不是真正的RPC,為什麼這麼說:
1、傳統Rpc隱藏了呼叫細節,像呼叫本地方法一樣傳參、丟擲異常
2、RabbitMq的Rpc是基於訊息的,消費者消費後,通過新佇列返回響應結果。
/// <summary> /// RPC客戶端 /// </summary> /// <param name="exchange"></param> /// <param name="queue"></param> /// <param name="routingKey"></param> /// <param name="body"></param> /// <param name="isProperties"></param> /// <returns></returns> public string RpcClient(string exchange,bool isProperties = false) { var channel = GetModel(exchange,isProperties); var consumer = new QueueingBasicConsumer(channel); channel.BasicConsume(queue,true,consumer); try { var correlationId = Guid.NewGuid().ToString(); var basicProperties = channel.CreateBasicProperties(); basicProperties.ReplyTo = queue; basicProperties.CorrelationId = correlationId; channel.BasicPublish(exchange,basicProperties,body.SerializeUtf8()); var sw = Stopwatch.StartNew(); while (true) { var ea = consumer.Queue.Dequeue(); if (ea.BasicProperties.CorrelationId == correlationId) { return ea.Body.DeserializeUtf8(); } if (sw.ElapsedMilliseconds > 30000) throw new Exception("等待響應超時"); } } catch (Exception ex) { throw ex.GetInnestException(); } } /// <summary> /// RPC服務端 /// </summary> /// <typeparam name="T"></typeparam> /// <param name="exchange"></param> /// <param name="queue"></param> /// <param name="isProperties"></param> /// <param nYrSBXlOLame="handler"></param> /// <param name="isDeadLetter"></param> public void RpcService<T>(string exchange,Func<T,T> handler,bool isDeadLetter) { //佇列宣告 var channel = GetModel(queue,ea) => { var body = ea.Body; var msgStr = body.DeserializeUtf8(); var msg = msgStr.FromJson<T>(); var props = ea.BasicProperties; var replyProps = channel.CreateBasicProperties(); replyProps.CorrelationId = props.CorrelationId; try { msg = handler(msg); } catch (Exception ex) { ex.GetInnestException().WriteToFile("佇列接收訊息","RabbitMq"); } finally { channel.BasicPublish(exchange,props.ReplyTo,replyProps,msg.ToJson().SerializeUtf8()); channel.BasicAck(ea.DeliveryTag,consumer); }
可以用,但不建議去用。可以考慮其他的RPC框架。grpc、thrift等。
結尾
本篇文章,沒有過多的寫RabbitMq的知識點,因為園子的學習筆記實在太多了。下面把我的程式碼奉上 https://.com/SkyChenSky/Sikiro.Mq.Rabbit。如果有發現寫得不對的地方麻煩在評論指出,我會及時修改以免誤導別人。
到此這篇關於.net平臺的rabbitmq使用封裝的文章就介紹到這了,更多相關.net使用rabbitmq內容請搜尋我們以前的文章或繼續瀏覽下面的相關文章希望大家以後多多支援我們!