1. 程式人生 > >使用EventBus + Redis釋出訂閱模式提升業務執行效能(下)

使用EventBus + Redis釋出訂閱模式提升業務執行效能(下)

前言

上一篇部落格上已經實現了使用EventBus對具體事件行為的分發處理,某種程度上也算是基於事件驅動思想程式設計了。但是如上篇部落格結尾處一樣,我們原始碼的執行效率依然達不到心裡預期。在下單流程裡我們明顯可以將部分行為進行非同步處理,提升下單操作的執行效率。

Redis基礎命令

Redis有兩種方式可支援我們實現MQ功能,1、使用列表(List)相關命令特性;2、使用publish、subscribe命令特性;
這裡我是採取列表相關命令實現。

使用列表(List)相關命令的特性實現

  • 壓入資料(釋出訊息)
    使用列表(List)的 LPUSH RPUSH 命令可以從列表左邊和右邊壓入資料;

LPUSH
將一個或多個值插入到列表頭部(此處可以將列表想象成一個從左到右的連結串列資料結構,LPUSH就是將指定的值插入最左側!)

 如下命令,將多個元素壓入list1的頭部(最左側)

LPUSH list1 測試1 測試2

執行結果如下:

 

 

 

上面是寫入多個元素,我們也可以寫入單個元素

LPUSH list1 測試3

  

 

 

 

需要留意,每次執行完LPUSH後,Redis會返回當前列表的長度。

RPUSH
在指定列表的尾部(相當於一個連結串列的最右側)新增單個或多個元素

如下命令,還是在list1上新增多個元素,並檢視執行後的list1元素資訊

RPUSH list 測試4 測試5

  

 

 

 

同理,RPUSH也可直接寫入單個元素,和LPUSH一樣。

 

  • 拉取資料(消費資料)
    這裡的拉取資料不單單是讀取List內的元素,而是將元素從列表中取出來

BLPOP
移出並獲取列表的第一個元素(從左至右), 如果列表沒有元素會阻塞當前執行緒,直到等待超時或發現可彈出元素為止。

如下命令,從list1這個列表獲取從左至右第一個元素,在100秒內如果獲取則結束阻塞,否則阻塞到100秒之後。

BLPOP list1 100

  

執行結果如下:

 

 

需要留意的是BLPOP命令如果拉取到資料則會返回兩行資料,1行為列表的key名稱,1行為獲取到的元素值。如果直到阻塞結束都沒有獲取到元素值則直接返回命令執行超時。如下圖:

 

 

BRPOP
移出並獲取列表的最後一個元素(從左至右), 如果列表沒有元素會阻塞當前執行緒直到等待超時或發現可彈出元素為止。該命令與BLPOP除了獲取的元素位置不同,其他特性全部一致。

LPOP
移出並獲取列表的第一個元素(從左至右),如獲取到元素則返回元素資訊,沒有元素則立即返回null。

如下命令:

LPOP list1

  

 

 

RPOP
移出並獲取列表的最後一個元素(從左至右),如獲取到元素則返回元素資訊,沒有元素則立即返回null。該命令與LPOP除了獲取的元素位置不同其他特性全部一致;

RPOPLPUSH
移除列表的最後一個元素(最右側的元素),並將該元素新增到另一個列表並返回。該命令如獲取到元素則返回元素資訊,否則返回錯誤資訊。

可以通過RPOPLPUSH這個命令的特性對MQ內一致性要求較高的業務進行處理,在從列表獲取元素成功後將該元素新增到一個備份列表,在業務處理完畢後再從備份列表將該元素刪除。
執行下面命令測試下:

RPOPLPUSH list1 listback

  

 

 

BRPOPLPUSH
從列表中彈出一個值,將彈出的元素插入到另外一個列表中並返回它; 如果列表沒有元素會阻塞列表直到等待超時或發現可彈出元素為止。

該命令其實就是在BRPOP的基礎上將LPUSH的功能加上了,依舊也保留了指定超時時間內未獲取到元素則阻塞執行緒。
執行下面命令測試下:

BRPOPLPUSH list1 listback 10

  

執行結果如下:

 

 

完善程式碼

基於上面Redis的相關命令,我們再完善下上篇部落格的程式碼。這裡我們需要新增一個控制檯啟動項,將它作為消費服務,原來的控制檯即訂單儲存的控制檯作為訊息釋出的服務。
下單程式碼更改為下面的樣子:

        /// <summary>
        /// 非同步方式觸發訂單相關事件
        /// </summary>
        public static void AsynEventHandle()
        {
            Guid userId = Guid.NewGuid();
            Stopwatch stopwatch = new Stopwatch();
            stopwatch.Start();

            var order = new OrderModel()
            {
                CreateTime = DateTime.Now,
                Id = Guid.NewGuid(),
                Money = (decimal)300.00,
                Number = 1,
                ProductName = "鮮花一束",
                UserId = userId
            };
            Console.WriteLine($"模擬儲存訂單【採取Redis做訊息佇列的非同步方式】");
            Thread.Sleep(1000);

            FullRedis fullRedis = new FullRedis("127.0.0.1:6379", "", 1);
            //這裡嘗試過使用redis 的訂閱釋出模式,在執行釋出命令時候發現值但凡出現空格或者"符號則會異常...         
            fullRedis.LPUSH("orders", new OrderModel[] { order });

            stopwatch.Stop();
            Console.WriteLine($"下單總耗時:{stopwatch.ElapsedMilliseconds}毫秒");
            Console.ReadLine();
        }

  

可以看到,我們已經將事件匯流排相關程式碼給移除了,上面程式碼除了向Redis的佇列(List)裡寫入元素外就只是對訂單進行了持久化動作,所以看程式碼就知道執行效率的提升了。
接下來,看消費服務的程式碼。

        static void Main(string[] args)
        {
            XTrace.UseConsole();
            Console.WriteLine("進入Redis訊息訂閱者模式訂單訊息推送訂閱者客戶端!");

            EventBus eventBus = new EventBus();
            eventBus.EventRegister(typeof(OrderCreateEventNotifyHandle), typeof(OrderCreateEventData));
            eventBus.EventRegister(typeof(OrderCreateEventStockLockHandle), typeof(OrderCreateEventData));

            FullRedis fullRedis = new FullRedis("127.0.0.1:6379", "", 1);
            fullRedis.Log = XTrace.Log;
            fullRedis.Timeout = 30000;
            OrderModel order = null;
            while (order == null)
            { 
                order = fullRedis.BLPOP<OrderModel>("orders", 20);
                if (order != null)
                {
                    Console.WriteLine($"得到訂單資訊:{JsonConvert.SerializeObject(order)}");
                    //執行相關事件
                    eventBus.Trigger(new OrderCreateEventData()
                    {
                        Order = order,
                    });
                    //再次設定為null方便迴圈讀取
                    order = null;
                }
                  
            }
            Console.ReadLine();
        }

  

消費服務首先從Redis裡通過BLPOP從orders列表中獲取元素,再觸發事件匯流排,執行訂單儲存相關業務處理。

最終看下執行效率如何?
訊息釋出的執行效率(訂單儲存)

 

訊息消費

 

可以看到目前訊息釋出的執行效率下單總耗時間為1170毫秒,我們再改為同步的測試下結果:

 

可以看到,同步執行的結果是3035毫秒。

小結

兩種方式相差了將近2000毫秒~ 而且後續如果再繼續擴充套件訂單儲存相關處理的話同步執行的響應時間會更加拉長,而採取Redis MQ的方式配合事件匯流排我們可以將整個業務拆分為獨立的應用,採取分散式的方式提高響應效率,同時事件匯流排的加入方便我們後續業務的擴充套件。

訊息釋出端將訂單資訊寫入到列表後如果訊息消費者在拉取到資料後業務執行過程中程式碼出現異常導致無法滿足業務的完整性如何處理

答:可以使用上述Redis命令中的RPOPLPUSH或BRPOPLPUSH在拉取元素後寫入到一個備份的列表中,在我們的邏輯程式碼執行完畢後在將備份列表中的該元素值移除。

上述程式碼已釋出到Github,有需要的自行下載。

地址為:https://github.com/QQ897878763/OrderRedisSample