1. 程式人生 > >事件驅動框架EventNext之執行緒容器

事件驅動框架EventNext之執行緒容器

EventNext.net core下的一個事件驅動的應用框架,通過它代理建立的介面行為都是通過事件驅動的模式進行呼叫.由於EventNext的所有呼叫都是基於事件佇列來進行,所以在資源控制上非常方便;它可以進行多樣性的執行緒分配,其中Actor應用就是它的一種基礎實現;在新的版中EventNext增加了一個新的特性就是執行緒容器,通過執行緒容器可以讓N個類的行為在指定執行緒資源下執行。接來詳細分析這個功能的應用。

執行緒容器的便利性

平時在多執行緒應用的時候一般都用執行緒池或自己定義執行緒處理,但這些都需要手動去處理和呼叫。實際有些情況是希望不同例項的所有方法執行在指定執行緒資源下,但又不想入侵式修改程式碼,這樣處理起來就比較複雜了。但EventNext

新版本的執行緒容器就能解決這種事情,它可以建立一個執行緒容器並指定那些物件的所有方法都在這個容器裡執行,而容器的執行緒則可以根據實際情況來指定,這種完全透明的執行緒控制使用起來就非常方便(不過這版本的Actor暫不支援線上程容器中建立)。

介面定義準則

EventNext的所有任務都在由佇列接管,但它實現的佇列和平常的佇列有所不同;EventNext的事件驅動隊要求所有任務都用非同步描述,它的特點是由上一個任務的完成來驅動下一下任務,不像同步處理佇列在處理非同步的時候不得不做執行緒等待。為了達到非同步任務的要求介面的所有方法必須定義Task作為返回值(暫不支援屬性處理)。

建立執行緒容器

元件提供一個EventNext.GetThreadContainer方法來獲取一個執行緒容器,方法有兩個引數,一個是容器的名稱和對應容器的執行緒數量;當容器被建立後內部的執行緒數量不變,具體程式碼如下:

 var tc = EventCenter.GetThreadContainer("t1");

以上是獲取一個名稱為t1的執行緒容器,在不指定執行緒數的情況下為一個執行緒處理。接下來就可以通過容器建立相應的介面例項:

 var account = tc.Create<IAccount>();

以上是建立一個IAccount的例項,這個例項不管在多少個執行緒下呼叫最終都由t1

這個執行緒容器去執行處理;每個容器可以建立多個接例項。

IAcouunt的實現

    [Service(typeof(IAccount))]
    public class AccountImpl : IAccount
    {

        static AccountImpl()
        {
            Redis.Default.Host.AddWriteHost("192.168.2.19");
        }

        public async Task Income(string name, int value)
        {
            await Redis.Default.Incrby(name, value);
        }

        public async Task<string> Value(string name)
        {
            return await Redis.Default.Get<string>(name);
        }
    }

以上是針對一個Redis操作的實現,主要存在IO操作更容易測試到容器在不同執行緒下的差異。

測試程式碼

為了測試容器的效果,使用了不同的方式進行測試,分別是:不用執行緒容器和使用不同執行緒數的容器,測試程式碼如下:

  • 非執行緒容器

        static async Task None(int count)
        {
            var account = EventCenter.Create<IAccount>();
            List<Task> tasks = new List<Task>();
            var now = BeetleX.TimeWatch.GetElapsedMilliseconds();
            for(int i=0;i<20;i++)
            {
                var t = Task.Run( async ()=> {
                    for (int k = 0; k < count; k++)
                        await account.Income("ken",k);
                });
                tasks.Add(t);
            }
            await Task.WhenAll(tasks);
            var value = await account.Value("ken");
            Console.WriteLine($"none use time:{BeetleX.TimeWatch.GetElapsedMilliseconds()-now} ms");
        }
  • 執行緒容器

        static async Task Threads(int threads, int count)
        {
            var tc = EventCenter.GetThreadContainer($"t{threads}",threads);
            var account = tc.Create<IAccount>();
            List<Task> tasks = new List<Task>();
            var now = BeetleX.TimeWatch.GetElapsedMilliseconds();
            for (int i = 0; i < 20; i++)
            {
                var t = Task.Run(async () => {
                    for (int k = 0; k < count; k++)
                        await account.Income("ken", k);
                });
                tasks.Add(t);
            }
            await Task.WhenAll(tasks);
            var value = await account.Value("ken");
            Console.WriteLine($"{threads} thread use time:{BeetleX.TimeWatch.GetElapsedMilliseconds() - now} ms");
        }

執行程式碼

        static async void Test()
        {
            Console.WriteLine("Warm-Up...");
            await None(10);
            await Threads(1, 10);
            await Threads(2, 10);
            await Threads(4, 10);
            for (int i = 0; i < 4; i++)
            {
                Console.WriteLine("Test...");
                await None(2000);
                await Threads(1, 2000);
                await Threads(2, 2000);
                await Threads(4, 2000);
            }
        }

測試結果

Test...
none use time:1231 ms
1 thread use time:3479 ms
2 thread use time:2025 ms
4 thread use time:1208 ms
Test...
none use time:1246 ms
1 thread use time:3358 ms
2 thread use time:2025 ms
4 thread use time:1179 ms
Test...
none use time:1344 ms
1 thread use time:3385 ms
2 thread use time:1954 ms
4 thread use time:1187 ms
Test...
none use time:1210 ms
1 thread use time:3338 ms
2 thread use time:1933 ms
4 thread use time:1181 ms

預設情況下元件會針對請呼叫進行一個執行緒分配,這個分配機制依據佇列的負載情況進行分配呼叫;執行緒容器則會根據當前容器的執行緒數來進行一個平均分配處理,從測試結果可以看到不同執行緒數下完成所需要的時間。

示例程式碼

https://github.com/IKende/BeetleX-Sam