事件訊息生產消費中介軟體-OSS.DataFlow
系統重構解耦的過程涉及不同領域服務分拆,或同一服務下實時響應部分和非響應部分分拆,分解後的各部分通過非同步訊息的流轉傳遞,完成整體的業務邏輯,但是頻繁的在業務層面直接呼叫不同訊息佇列的SDK,個人感覺不夠簡潔,最近開源一箇中間件OSS.Dataflow,希望能幫到看到的同學。
OSS.Dataflow主要實現非同步訊息傳遞的過程抽象,在業務層面提供訊息釋出訂閱的統一抽象介面,在業務邏輯分支之間,以簡單的呼叫完成訊息的傳遞,和具體的訊息儲存觸發實現無關。同時,在底層的儲存和觸發層面提取介面,能夠在系統的全域性適配具體的訊息基礎設施。(在這些介面之上,還實現了事件處理器,通過訊息的重複投放,實現事件執行的容錯補充機制,這個後邊文章再介紹,原始碼單元測試有示例。)
一. 訊息業務側使用
OSS.Dataflow 的程式碼可以通過Gitee和GitHub獲取,使用時可以通過Nuget直接安裝,也可以通過命令列:Install-Package OSS.DataFlow
元件的使用非常簡單,只需要關注:
- 訊息釋出者介面,由元件註冊時返回,供業務方法呼叫傳入訊息體。
- 訊息訂閱(消費)者介面實現或委託方法,在元件註冊時傳入。
具體示例:
- 訊息的釋出訂閱獨立呼叫示例
// 全域性初始化,注入訂閱者實現 const string msgPSKey = "Publisher-Subscriber-MsgKey"; DataFlowFactory.RegisterSubscriber<MsgData>(msgPSKey, async (data) => { // 當前通過注入消費的委託方法,也可通過介面實現 // DoSomething(data); return true; }); // 獲取釋出者介面 private static readonly IDataPublisher publisher = DataFlowFactory.CreatePublisher(); // 業務方法中釋出訊息await publisher.Publish(msgPSKey,new MsgData() {name = "test"});
2.訊息的流式呼叫示例
// 直接註冊消費實現並獲取訊息釋出介面 private static readonly IDataPublisher _delegateFlowpusher = DataFlowFactory.RegisterFlow<MsgData>("delegate_flow",async (data) => { // 當前通過注入消費的委託方法,也可通過介面實現 // DoSomething(data); return true; }); // 業務方法中釋出訊息 await _delegateFlowpusher.Publish("normal_flow",new MsgData() {name = "test"});
如上,只需要獲取釋出者,並注入消費實現,即可完成整個訊息的非同步消費處理,同一個訊息key可以註冊多個消費實現,當有訊息進入消費時,會併發處理。
二.訊息底層儲存適配擴充套件
前邊介紹了業務介面的使用,和具體訊息佇列或資料庫等隔離,這是對接業務層面的使用。因為業務場景不同,不同的專案對訊息的響應速度和處理機制又各有需求,所以 OSS.DataFlow 同樣提供了對接訊息產品的擴充套件介面,方便使用者適配已有訊息基礎設施。
1. 訊息儲存適配介面
對於事件訊息處理,需要關注兩件事情:接收儲存 和 消費觸發。在類庫中提供了 DataFlowManager 訊息流管理類,使用者可以通過實現IDataPublisherProvider介面,完成具體的儲存實現。
同時在不同的訊息產品觸發消費時(比如資料庫定時任務或者RabbitMQ消費), 呼叫通知方法(NotifySubscriber ),來觸發通過類庫註冊的具體的業務訂閱處理。
// 訊息流核心部件管理者 public static class DataFlowManager { /// <summary> /// 自定義 資料流釋出(儲存)實現的 提供者 /// </summary> public static IDataPublisherProvider PublisherProvider { get; set; } /// <summary> /// 通過自定義訊息觸發機制通知訂閱者 /// 呼叫時請做異常攔截,防止髒資料導致 msgData 型別錯誤 /// </summary> /// <param name="msgDataKey"></param> /// <param name="msgData">訊息內容,自定義觸發時,請注意和註冊訂閱者的消費資料型別轉換安全</param> /// <returns></returns> public static Task<bool> NotifySubscriber(string msgDataKey, object msgData) { .... } }
關於 IDataPublisherProvider
public interface IDataPublisherProvider { /// <summary> /// 資料釋出者 /// </summary> /// <param name="option"></param> /// <returns> 返回訊息釋出介面實現 </returns> IDataPublisher CreatePublisher(DataPublisherOption option); } /// <summary> /// 資料的釋出者 /// </summary> public interface IDataPublisher { /// <summary> /// 推進資料(儲存具體訊息佇列或者資料庫實現) /// </summary> /// <param name="dataKey"></param> /// <param name="data"></param> /// <returns>是否推入成功</returns> Task<bool> Publish<TData>(string dataKey,TData data); }
可以看到 IDataPublisher 介面負責具體的儲存實現,可以根據 DataPublisherOption 的 source_name 業務屬性實現對不同業務需求返回不同的具體實現。
2. 預設實現介紹
藉助.Net 自身的記憶體訊息佇列,在類庫中提供了預設的內部訊息儲存轉發實現(記憶體級別),使用者可以自行實現擴充套件相關介面並進行全域性配置。
內建的.Net Core訊息佇列, 設定了預設1個佇列,最大併發為32執行緒。 如果需要可以通過設定DataPublisherOption的source_name,類庫將會為每個source_name 建立獨立的記憶體佇列。
如果你已經看到這裡,並且感覺還行的話可以在下方點個贊,或者也可以關注我的公總號(見二維碼)