1. 程式人生 > >Redis學習筆記~分散式的Pub/Sub模式

Redis學習筆記~分散式的Pub/Sub模式

回到目錄

redis的客戶端有很多,這次用它的pub/sub釋出與訂閱我選擇了StackExchange.Redis,釋出與訂閱大家應該很清楚了,首先一個訂閱者,訂閱一個服務,服務執行一些處理程式(可能是寫個日誌,插入個數據,發個email)然後當另一個專案的某個業務釋出這個服務後,被訂閱的程式將會被執行,這個聽起來很有意思,redis有好的實現了這個pub/sub功能。

看一下結構圖

Sub訂閱(訊息消費者)

對於訂閱方,這裡類似於一個服務,它會長期執行著,被啟動後動態訂閱一些服務進來,以便以後再被其它服務呼叫

        //sub a function in A-project
PubSubManager.Instance.Subscribe("UserLog", (msg) => { //訂閱者處理自己的業務邏輯,這相關於佇列服務要乾的事 Console.WriteLine(msg); });

Pub釋出(訊息生產者)

而對於其它專案,如A網站,它可能在被在POST動作後釋出這個UserLog的服務,這時上面的訂閱方將會消費它(消費者模式),或者服務程式碼被執行!

     [HttpPost]
        
public ActionResult Index(string user) { PubSubManager.Instance.Publish("UserLog", user + "這個使用者提交表單了"); return View(); }

對於一直執行的服務,將會收到來自不同專案的訊息,而它只負責消費它!

Lind.DDD.PublishSubscribe

封裝了一些標準的pub/sub方法,它可以有多種實現方法,本例使用redis這個中介軟體

   /// <summary>
/// 釋出訂閱的介面規則 /// </summary> public interface IPubSub { /// <summary> /// 釋出,有順序,物件源是字串 /// </summary> /// <param name="channel"></param> /// <param name="value"></param> void Publish(string channel, string value); /// <summary> /// 訂閱,物件源是字串 /// </summary> /// <param name="channel"></param> /// <param name="action"></param> void Subscribe(string channel, Action<string> action); /// <summary> /// 非同步釋出,無順序,物件源是字串 /// </summary> /// <param name="channel"></param> /// <param name="value"></param> void PublishAsync(string channel, string value); /// <summary> /// 非同步訂閱,無順序,物件源是字串 /// </summary> /// <param name="channel"></param> /// <param name="action"></param> void SubscribeAsync(string channel, Action<string> action); /// <summary> /// 釋出,有順序,物件源是Byte[] /// </summary> /// <param name="channel"></param> /// <param name="value"></param> void PublishByte(string channel, byte[] value); /// <summary> /// 訂閱,物件源是Byte[] /// </summary> /// <param name="channel"></param> /// <param name="action"></param> void SubscribeByte(string channel, Action<byte[]> action); /// <summary> /// 非同步釋出,有順序,物件源是Byte[] /// </summary> /// <param name="channel"></param> /// <param name="value"></param> void PublishByteAsync(string channel, byte[] value); /// <summary> /// 非同步訂閱,物件源是Byte[] /// </summary> /// <param name="channel"></param> /// <param name="action"></param> void SubscribeByteAsync(string channel, Action<byte[]> action); /// <summary> /// 釋出,有順序,物件源是泛型物件 /// </summary> /// <typeparam name="T"></typeparam> /// <param name="channel"></param> /// <param name="value"></param> void Publish<T>(string channel, T value); /// <summary> /// 訂閱,物件源是泛型物件 /// </summary> /// <typeparam name="T"></typeparam> /// <param name="channel"></param> /// <param name="action"></param> void Subscribe<T>(string channel, Action<T> action); /// <summary> /// 非同步釋出,有順序,物件源是泛型物件 /// </summary> /// <typeparam name="T"></typeparam> /// <param name="channel"></param> /// <param name="value"></param> void PublishAsync<T>(string channel, T value); /// <summary> /// 非同步訂閱,物件源是泛型物件 /// </summary> /// <typeparam name="T"></typeparam> /// <param name="channel"></param> /// <param name="action"></param> void SubscribeAsync<T>(string channel, Action<T> action); /// <summary> /// 取消指定訂閱 /// </summary> /// <param name="channel"></param> void UnSubscribe(string channel); /// <summary> /// 取消所有訂閱 /// </summary> /// <param name="channel"></param> void UnSubscribeAll(); }

回到目錄