1. 程式人生 > >從無到有實現.net協程(一)

從無到有實現.net協程(一)

name 叠代器 out 之前 ring args ren 執行 images

協程的概念,就我而言,來源自當初學習Go,它可以用一句話來總結,“單線程無阻塞異步處理”,也就是說,首先,它的範圍是針對單個線程來說的,一個線程可以運行多個代碼片段,當運行期間遇到IO等待(包括網絡IO、磁盤IO等,常見的數據庫操作、web服務調用都屬於IO等待)時,自動切換到其他代碼片段上執行,當執行完畢或再次遇到IO等待時,再回到之前已經完成IO等待的代碼片段繼續執行,這樣,就保證了線程無阻塞,並且多個片段都能被逐步處理(前提是代碼中存在IO等待,否則還是順序處理),以上就是協程的定義,從描述可以看到,這種處理方式特別適合用於高IO、低CPU計算的程序,現實中,大部分的Web應用都是屬於這種模式,這就是為什麽現在Nodejs, Go這類語言越來越火的原因。

下面的圖描述了協程的運行

技術分享

協程的實際處理順序(實際取決與協程調度程序)

技術分享

在.net中,web處理(asp.net)的模式為多線程異步,其實也很好,也可以做到無阻塞,充分利用CPU資源,這裏不談這種模式,只談協程模式。

那麽.net如何實現協程呢?這裏首先必須介紹一個關鍵字,yield,.net協程就是利用它來實現的

Yield是用來處理叠代器(IEnumerator)的,利用叠代器的特性,間接提供了一種可以中斷恢復的處理方式,以代碼說話

 1     class Program
 2     {
 3         static void Main(string[] args)
 4
{ 5 var result=TestYield(); 6 7 result.MoveNext(); 8 Console.WriteLine(result.Current); 9 10 result.MoveNext(); 11 Console.WriteLine(result.Current); 12 13 result.MoveNext(); 14 Console.WriteLine(result.Current);
15 16 result.MoveNext(); 17 18 Console.Read(); 19 20 } 21 22 23 static IEnumerator<string> TestYield() 24 { 25 yield return "A"; 26 Console.WriteLine("執行完成A"); 27 yield return "B"; 28 Console.WriteLine("執行完成B"); 29 yield return "C"; 30 Console.WriteLine("執行完成C"); 31 } 32 }

執行結果為

技術分享

從上面的代碼可以看出,每當使用MoveNext方法,代碼都從yield return語句之後繼續執行,當遇到yield return或方法完成,再次返回調用方,這種叠代器模式(叠代器模式是本身就是23種設計模式之一),就提供了分段執行代碼的能力,我們通過這種模式,就能用來完成協程,這裏有一個特別需要註意的地方,就是,當我們調用TestYield方法時,你會發現它其實並沒有被執行,直到我們第一次調用MoveNext方法,該方法才真正開始被執行,還記得Linq嗎,Linq說了,只有調用了諸如ToList()、Count()之類的方法,才真正開始計算,和這個是不是很像?其實,在Linq內部,就是使用了叠代器。

好了,現在,已經具備了實現協程的基本元素了,接下來就可以開始構建了,由兩個部分組成,協程容器和協程單元,協程容器用來負責調度和執行,協程單元用來處理實際的業務,協程容器提供Register方法來註冊每個協程單元,繼續用代碼來說話

 1     /// <summary>
 2     /// 協程容器接口
 3     /// </summary>
 4     public interface ICoroutineContainer
 5     {
 6         /// <summary>
 7         /// 註冊協程單元
 8         /// </summary>
 9         /// <param name="unit">協程單元</param>
10         void Register(ICoroutineUnit unit);
11         /// <summary>
12         /// 執行
13         /// </summary>
14         void Run();
15 }
16     /// <summary>
17     /// 協程單元接口
18     /// </summary>
19     public interface ICoroutineUnit
20     {
21         /// <summary>
22         /// 處理業務
23         /// </summary>
24         /// <returns></returns>
25         IEnumerator<Task> Do();
26 }

這兩個接口為整個實現的核心接口,下面的協程容器的基本實現

/// <summary>
    /// 協程容器的基本實現
    /// </summary>
    public class CoroutineContainerBase : ICoroutineContainer
    {
        /// <summary>
        /// 存儲協程單元的列表
        /// </summary>
        private List<UnitItem> _units = new List<UnitItem>();
        /// <summary>
        /// 存儲新註冊的協程單元,與協程單元列表分開,實現註冊與執行互不影響
        /// </summary>
        private List<UnitItem> _addUnits = new List<UnitItem>();
        /// <summary>
        /// 錯誤處理
        /// </summary>
        private Action<ICoroutineUnit, Exception> _errorHandle;

        /// <summary>
        /// 構造函數
        /// </summary>
        /// <param name="errorHandle">錯誤處理</param>
        public CoroutineContainerBase(Action<ICoroutineUnit, Exception> errorHandle)
        {
            _errorHandle = errorHandle;
        }

        public void Register(ICoroutineUnit unit)
        {
            lock (_addUnits)
            {
                _addUnits.Add(new UnitItem() { Unit = unit, UnitResult = null });
            }

        }

        public void Run()
        {
  	    //開啟一個單獨任務執行
            Task.Run(() =>
            {
                //循環處理協程單元
                while (true)
                {
                    //將新註冊的協程單元加入到列表中
                    lock (_addUnits)
                    {
                        foreach (var addItem in _addUnits)
                        {
                            _units.Add(addItem);
                        }
                        _addUnits.Clear();
                    }

                    //依次處理協程單元
                    foreach (var item in _units)
                    {
                        if (item.UnitResult == null)
                        {
                            var result = item.Unit.Do();

                            //運行到下一個斷點
                            try
                            {
                                result.MoveNext();
                            }
                            catch (Exception ex)
                            {
                                _errorHandle(item.Unit, ex);

                                _units.Remove(item);

                                break;
                            }

                            item.UnitResult = result;
                        }
                        else
                        {
                            //檢查等待是否已經完成,如果已經完成則繼續運行
                            if (item.UnitResult.Current.IsCanceled || item.UnitResult.Current.IsCompleted || item.UnitResult.Current.IsFaulted)
                            {
                                var nextResult = true;
                                try
                                {
                                    nextResult = item.UnitResult.MoveNext();
                                }
                                catch (Exception ex)
                                {
                                    _errorHandle(item.Unit, ex);
                                    _units.Remove(item);

                                    break;
                                }
                                if (!nextResult)
                                {

                                    _units.Remove(item);

                                    break;
                                }
                            }
                        }
                    }
                    
                }
            });

        }


        /// <summary>
        /// 協程單元存儲格式
        /// </summary>
        private class UnitItem
        {
            /// <summary>
            /// 協程單元
            /// </summary>
            public ICoroutineUnit Unit { get; set; }
            /// <summary>
            /// 協程單元使用的叠代器
            /// </summary>
            public IEnumerator<Task> UnitResult { get; set; }
        }
    }

實現兩個協程單元

/// <summary>
    /// 協程單元1
    /// 執行一個網絡IO,訪問163站點
    /// </summary>
    public class Action1 : ICoroutineUnit
    {
        public IEnumerator<Task> Do()
        {
            Console.WriteLine("開始執行Action1");
            HttpClient client = new HttpClient();

            yield return innerDo();

            Console.WriteLine("結束執行Action1");
        }

        private Task innerDo()
        {
            HttpClient client = new HttpClient();
            return client.GetAsync("http://www.163.com");
        }
    }

    /// <summary>
    /// 協程單元2
    /// 執行一個網絡IO,訪問163站點
    /// </summary>
    public class Action2 : ICoroutineUnit
    {
        public IEnumerator<Task> Do()
        {
            Console.WriteLine("開始執行Action2");
            yield return innerDo();
            Console.WriteLine("結束執行Action2");
        }

        private Task innerDo()
        {
            HttpClient client = new HttpClient();
            return client.GetAsync("http://www.163.com");
        }
}

主程序調用執行

        static void Main(string[] args)
        {
            //錯誤處理僅僅是將錯誤顯示在控制臺裏
            Action<ICoroutineUnit,Exception> errorHandle = (unit, ex) =>
              {
                  Console.WriteLine(ex.ToString());
              };
            //初始化協程容器
            ICoroutineContainer coroutineContainerBase = new CoroutineContainerBase(errorHandle);
            //註冊Action1
            coroutineContainerBase.Register(new Action1());
            //註冊Action2
            coroutineContainerBase.Register(new Action2());
            //運行容器
            coroutineContainerBase.Run();

            Console.Read();

        }

執行結果

技術分享

註意,每次執行的順序可能不一樣,取決於網絡速度,但可以明顯看到代碼的分段執行。

至此,一個最基本的協程框架已經完成

從無到有實現.net協程(一)