從無到有實現.net協程(一)
協程的概念,就我而言,來源自當初學習Go,它可以用一句話來總結,“單線程無阻塞異步處理”,也就是說,首先,它的範圍是針對單個線程來說的,一個線程可以運行多個代碼片段,當運行期間遇到IO等待(包括網絡IO、磁盤IO等,常見的數據庫操作、web服務調用都屬於IO等待)時,自動切換到其他代碼片段上執行,當執行完畢或再次遇到IO等待時,再回到之前已經完成IO等待的代碼片段繼續執行,這樣,就保證了線程無阻塞,並且多個片段都能被逐步處理(前提是代碼中存在IO等待,否則還是順序處理),以上就是協程的定義,從描述可以看到,這種處理方式特別適合用於高IO、低CPU計算的程序,現實中,大部分的Web應用都是屬於這種模式,這就是為什麽現在Nodejs, Go這類語言越來越火的原因。
下面的圖描述了協程的運行
協程的實際處理順序(實際取決與協程調度程序)
在.net中,web處理(asp.net)的模式為多線程異步,其實也很好,也可以做到無阻塞,充分利用CPU資源,這裏不談這種模式,只談協程模式。
那麽.net如何實現協程呢?這裏首先必須介紹一個關鍵字,yield,.net協程就是利用它來實現的
Yield是用來處理叠代器(IEnumerator)的,利用叠代器的特性,間接提供了一種可以中斷恢復的處理方式,以代碼說話
1 class Program 2 { 3 static void Main(string[] args) 4{ 5 var result=TestYield(); 6 7 result.MoveNext(); 8 Console.WriteLine(result.Current); 9 10 result.MoveNext(); 11 Console.WriteLine(result.Current); 12 13 result.MoveNext(); 14 Console.WriteLine(result.Current);15 16 result.MoveNext(); 17 18 Console.Read(); 19 20 } 21 22 23 static IEnumerator<string> TestYield() 24 { 25 yield return "A"; 26 Console.WriteLine("執行完成A"); 27 yield return "B"; 28 Console.WriteLine("執行完成B"); 29 yield return "C"; 30 Console.WriteLine("執行完成C"); 31 } 32 }
執行結果為
從上面的代碼可以看出,每當使用MoveNext方法,代碼都從yield return語句之後繼續執行,當遇到yield return或方法完成,再次返回調用方,這種叠代器模式(叠代器模式是本身就是23種設計模式之一),就提供了分段執行代碼的能力,我們通過這種模式,就能用來完成協程,這裏有一個特別需要註意的地方,就是,當我們調用TestYield方法時,你會發現它其實並沒有被執行,直到我們第一次調用MoveNext方法,該方法才真正開始被執行,還記得Linq嗎,Linq說了,只有調用了諸如ToList()、Count()之類的方法,才真正開始計算,和這個是不是很像?其實,在Linq內部,就是使用了叠代器。
好了,現在,已經具備了實現協程的基本元素了,接下來就可以開始構建了,由兩個部分組成,協程容器和協程單元,協程容器用來負責調度和執行,協程單元用來處理實際的業務,協程容器提供Register方法來註冊每個協程單元,繼續用代碼來說話
1 /// <summary> 2 /// 協程容器接口 3 /// </summary> 4 public interface ICoroutineContainer 5 { 6 /// <summary> 7 /// 註冊協程單元 8 /// </summary> 9 /// <param name="unit">協程單元</param> 10 void Register(ICoroutineUnit unit); 11 /// <summary> 12 /// 執行 13 /// </summary> 14 void Run(); 15 } 16 /// <summary> 17 /// 協程單元接口 18 /// </summary> 19 public interface ICoroutineUnit 20 { 21 /// <summary> 22 /// 處理業務 23 /// </summary> 24 /// <returns></returns> 25 IEnumerator<Task> Do(); 26 }
這兩個接口為整個實現的核心接口,下面的協程容器的基本實現
/// <summary> /// 協程容器的基本實現 /// </summary> public class CoroutineContainerBase : ICoroutineContainer { /// <summary> /// 存儲協程單元的列表 /// </summary> private List<UnitItem> _units = new List<UnitItem>(); /// <summary> /// 存儲新註冊的協程單元,與協程單元列表分開,實現註冊與執行互不影響 /// </summary> private List<UnitItem> _addUnits = new List<UnitItem>(); /// <summary> /// 錯誤處理 /// </summary> private Action<ICoroutineUnit, Exception> _errorHandle; /// <summary> /// 構造函數 /// </summary> /// <param name="errorHandle">錯誤處理</param> public CoroutineContainerBase(Action<ICoroutineUnit, Exception> errorHandle) { _errorHandle = errorHandle; } public void Register(ICoroutineUnit unit) { lock (_addUnits) { _addUnits.Add(new UnitItem() { Unit = unit, UnitResult = null }); } } public void Run() { //開啟一個單獨任務執行 Task.Run(() => { //循環處理協程單元 while (true) { //將新註冊的協程單元加入到列表中 lock (_addUnits) { foreach (var addItem in _addUnits) { _units.Add(addItem); } _addUnits.Clear(); } //依次處理協程單元 foreach (var item in _units) { if (item.UnitResult == null) { var result = item.Unit.Do(); //運行到下一個斷點 try { result.MoveNext(); } catch (Exception ex) { _errorHandle(item.Unit, ex); _units.Remove(item); break; } item.UnitResult = result; } else { //檢查等待是否已經完成,如果已經完成則繼續運行 if (item.UnitResult.Current.IsCanceled || item.UnitResult.Current.IsCompleted || item.UnitResult.Current.IsFaulted) { var nextResult = true; try { nextResult = item.UnitResult.MoveNext(); } catch (Exception ex) { _errorHandle(item.Unit, ex); _units.Remove(item); break; } if (!nextResult) { _units.Remove(item); break; } } } } } }); } /// <summary> /// 協程單元存儲格式 /// </summary> private class UnitItem { /// <summary> /// 協程單元 /// </summary> public ICoroutineUnit Unit { get; set; } /// <summary> /// 協程單元使用的叠代器 /// </summary> public IEnumerator<Task> UnitResult { get; set; } } }
實現兩個協程單元
/// <summary> /// 協程單元1 /// 執行一個網絡IO,訪問163站點 /// </summary> public class Action1 : ICoroutineUnit { public IEnumerator<Task> Do() { Console.WriteLine("開始執行Action1"); HttpClient client = new HttpClient(); yield return innerDo(); Console.WriteLine("結束執行Action1"); } private Task innerDo() { HttpClient client = new HttpClient(); return client.GetAsync("http://www.163.com"); } } /// <summary> /// 協程單元2 /// 執行一個網絡IO,訪問163站點 /// </summary> public class Action2 : ICoroutineUnit { public IEnumerator<Task> Do() { Console.WriteLine("開始執行Action2"); yield return innerDo(); Console.WriteLine("結束執行Action2"); } private Task innerDo() { HttpClient client = new HttpClient(); return client.GetAsync("http://www.163.com"); } }
主程序調用執行
static void Main(string[] args) { //錯誤處理僅僅是將錯誤顯示在控制臺裏 Action<ICoroutineUnit,Exception> errorHandle = (unit, ex) => { Console.WriteLine(ex.ToString()); }; //初始化協程容器 ICoroutineContainer coroutineContainerBase = new CoroutineContainerBase(errorHandle); //註冊Action1 coroutineContainerBase.Register(new Action1()); //註冊Action2 coroutineContainerBase.Register(new Action2()); //運行容器 coroutineContainerBase.Run(); Console.Read(); }
執行結果
註意,每次執行的順序可能不一樣,取決於網絡速度,但可以明顯看到代碼的分段執行。
至此,一個最基本的協程框架已經完成
從無到有實現.net協程(一)