C# 多執行緒六之Task(任務)三之任務工廠 C# 多執行緒六之Task(任務)二
1、知識回顧,簡要概述
前面兩篇關於Task的隨筆,C# 多執行緒五之Task(任務)一 和 C# 多執行緒六之Task(任務)二,介紹了關於Task的一些基本的用法,以及一些使用的要點,如果都看懂了,本文將介紹另一個Task的特殊用法,前面介紹了,如何通過一個父任務建立多個子任務,且這些子任務都必須要支援取消的例子,常規做法是,通過new 一個Task陣列物件,然後在該物件的內部建立多個Task任務,然後給這些任務指定TaskCreationOptions.AttachedToParent,這樣所有的子任務都關聯到了父任務,接著給這些子任務,繫結一個CancellationToken類例項,當其中一個子任務發生異常時,呼叫CancellationToken類例項的Cancel方法,將其餘的子任務全都取消,大致程式碼如下:
static void Main(string[] args) { var parentTask = new Task<int[]>(() => { var results = new int[3]; var cancelTokenSource = new CancellationTokenSource(); var childTasks = new Task[] {new Task(() => results[0] = ChildThreadOne(cancelTokenSource.Token),cancelTokenSource.Token, TaskCreationOptions.AttachedToParent), new Task(() => results[1] = ChildThreadTwo(cancelTokenSource.Token),cancelTokenSource.Token, TaskCreationOptions.AttachedToParent),new Task(() => results[2] = ChildThreadThree(cancelTokenSource.Token),cancelTokenSource.Token, TaskCreationOptions.AttachedToParent), }; //開啟所有的子任務 childTasks.ForEach(f => { f.Start(); }); //如果有子任務發生異常,那麼通過取消訊號量終止所有的任務 childTasks.ForEach(f => { f.ContinueWith(task=> cancelTokenSource.Cancel(), TaskContinuationOptions.OnlyOnFaulted); }); return results; }); parentTask.Start(); parentTask.ContinueWith(x => { Console.WriteLine("當父任務執行完畢時,CLR會喚起一個新執行緒,將父任務的返回值(子任務的返回值)輸出,所以這裡不會有任何的執行緒發生阻塞"); foreach (var re in parentTask.Result) { Console.WriteLine("子任務的返回值分別為:{0}", re); } }); Console.WriteLine("主執行緒不會阻塞,它會繼續執行"); Console.ReadKey();//必須加這行程式碼,因為Task時執行緒池執行緒,屬於後臺執行緒 } /// <summary> /// 子任務一 /// </summary> static int ChildThreadOne(CancellationToken token) { Thread.Sleep(3000);//模擬長時間計算操作 token.ThrowIfCancellationRequested(); Console.WriteLine("子任務一完成了計算任務,並返回值:{0}", 6); return 6; } /// <summary> /// 子任務二 /// </summary> static int ChildThreadTwo(CancellationToken token) { Thread.Sleep(2000);//模擬長時間計算操作 token.ThrowIfCancellationRequested(); throw new Exception("模擬丟擲異常"); } /// <summary> /// 子任務三 /// </summary> static int ChildThreadThree(CancellationToken token) { Thread.Sleep(3000);//模擬長時間計算操作 token.ThrowIfCancellationRequested(); Console.WriteLine("子任務三完成了計算任務,並返回值:{0}", 6); return 6; } } /// <summary> /// Linq擴充套件 /// </summary> public static class LinqExtension { public static void ForEach<T>(this IEnumerable<T> enumerators, Action<T> action) { foreach (var item in enumerators) { action(item); } } }
這裡需要注意,這裡給父任務parentTask開啟了三個子任務,並且通過TaskCreationOptions.AttachedToParent指定了所有的子任務不能獨立於父任務執行,並且給所有的子任務,傳遞了CancellationToken訊號量,當其中一個子任務發生異常時,所有其餘的子任務都終止,但是你必須知道的是,你沒有判斷哪個任務會被終止,因為如果不指定執行緒優先順序,哪怕制定了優先順序,你也無法確定的判斷某個計算任務在什麼時候會排程完,所以我給正常的執行的任務,Sleep了三秒,丟擲異常的任務Sleep了兩秒,所以所有的子執行緒都無法執行完畢.
2、程式碼重構
ok,雖然上面的程式碼很好的完成了我們在程式碼層面的需求,但是處於對程式碼的重用性考慮,有沒有發現這個問題:
這塊操作,可以重構的,因為所有的引數都一樣,當然你可以去抽象一個共有的方法,裡面放一個Func委託,當然把引數抽象出來,形成一個公共的方法,像下面這樣做:
class Program { private static CancellationTokenSource cancelTokenSource = new CancellationTokenSource(); private static TaskCreationOptions taskCreationOptions = TaskCreationOptions.AttachedToParent; static void Main(string[] args) { var parentTask = new Task<int[]>(() => { var results = new int[3]; var childTasks = new Task[] { ExecuteChildThread(task=> results[0]=ChildThreadOne(cancelTokenSource.Token)), ExecuteChildThread(task=> results[1]=ChildThreadTwo(cancelTokenSource.Token)), ExecuteChildThread(task=> results[2]=ChildThreadThree(cancelTokenSource.Token)) }; //開啟所有的子任務 childTasks.ForEach(f => { f.Start(); }); //如果有子任務發生異常,那麼通過取消訊號量終止所有的任務 childTasks.ForEach(f => { f.ContinueWith(task=> cancelTokenSource.Cancel(), TaskContinuationOptions.OnlyOnFaulted); }); return results; }); parentTask.Start(); parentTask.ContinueWith(x => { Console.WriteLine("當父任務執行完畢時,CLR會喚起一個新執行緒,將父任務的返回值(子任務的返回值)輸出,所以這裡不會有任何的執行緒發生阻塞"); foreach (var re in parentTask.Result) { Console.WriteLine("子任務的返回值分別為:{0}", re); } }); Console.WriteLine("主執行緒不會阻塞,它會繼續執行"); Console.ReadKey();//必須加這行程式碼,因為Task時執行緒池執行緒,屬於後臺執行緒 } /// <summary> /// 子任務一 /// </summary> static int ChildThreadOne(CancellationToken token) { Thread.Sleep(2000);//模擬長時間計算操作 token.ThrowIfCancellationRequested(); Console.WriteLine("子任務一完成了計算任務,並返回值:{0}", 6); return 6; } /// <summary> /// 子任務二 /// </summary> static int ChildThreadTwo(CancellationToken token) { Thread.Sleep(2000);//模擬長時間計算操作 token.ThrowIfCancellationRequested(); Console.WriteLine("子任務二完成了計算任務,並返回值:{0}", 6); return 6; } /// <summary> /// 子任務三 /// </summary> static int ChildThreadThree(CancellationToken token) { Thread.Sleep(2000);//模擬長時間計算操作 token.ThrowIfCancellationRequested(); Console.WriteLine("子任務三完成了計算任務,並返回值:{0}", 6); return 6; } /// <summary> /// 建立一個通用的子執行緒方法,裡面封裝了所有子執行緒的需要設定的公共引數 /// </summary> /// <param name="func"></param> /// <returns></returns> static Task<int> ExecuteChildThread(Func<CancellationToken, int> func) { var t=new Task<int>(()=>func.Invoke(cancelTokenSource.Token), cancelTokenSource.Token, taskCreationOptions); return t; } } /// <summary> /// Linq擴充套件 /// </summary> public static class LinqExtension { public static void ForEach<T>(this IEnumerable<T> enumerators, Action<T> action) { foreach (var item in enumerators) { action(item); } } }
ok,通過對子任務的抽象,你可以這麼幹,但是MS提供了更好的辦法,你又何必重複造輪子呢?而且這裡存在著潛在的多執行緒爭用問題,
所有的執行緒都用到了這兩個全域性變數,最好加個鎖,但是加了鎖之後,效能就會受到影響.
但是奇怪的是,我無法重現,如果你能重現那是最好的,下面就開始介紹Ms提供的任務工廠
3、任務工廠實戰
下面再次對上面的方法進行重構,用任務工廠的方式,首先使用TaskFactory任務工廠的前提你必須清楚,就是建立的子任務,必須是一組共享配置的子任務物件集,所以,如果當中如果某個子任務需要使用特殊的配置,那就不能使用任務工廠,也不是不能使用,就是那個子任務你必須獨立出來,不能放到任務工廠裡面.ok,瞭解了前提條件後,開始實踐,程式碼如下:
class Program { static void Main(string[] args) { var parentTask=Task.Run(()=> { var cts = new CancellationTokenSource(); //通過TaskFactory設定子任務的公共引數 var tf = new TaskFactory<int>(cts.Token,TaskCreationOptions.AttachedToParent,TaskContinuationOptions.ExecuteSynchronously,TaskScheduler.Default); //通過TaskFactory設定所有的子任務,這些子任務共享上面公共引數 var childTasks = new Task<int>[] { tf.StartNew(() => ChildThreadOne(cts.Token)), tf.StartNew(() => ChildThreadTwo(cts.Token)), tf.StartNew(() => ChildThreadThree(cts.Token)) }; //如果子任務發生異常,則向餘下沒有執行完畢的子任務傳遞取消執行的訊號,如果有子任務執行完畢了,那就沒有辦法了 childTasks.ForEach(f => { f.ContinueWith(childTask => cts.Cancel(), TaskContinuationOptions.OnlyOnFaulted); }); //遍歷所有通過TaskFactory建立的子任務,然後篩選出沒有被取消和沒有發生異常的子任務,或者這些任務中的最大返回值 //這個任務不阻塞執行緒,只有當所有的子任務執行完畢之後,CLR會喚起執行緒池中的一個新執行緒來執行這個操作 //通過給喚起子執行緒設定CancellationToken.None,來達到這個執行緒不會被任何因素來取消該執行緒的目的 var tfTask = tf.ContinueWhenAll(childTasks, completedTasks => completedTasks.Where(completedTask => !completedTask.IsCanceled && !completedTask.IsFaulted).Max(completedTask => completedTask.Result), CancellationToken.None ); //輸出所有符合要求的子任務集合的返回值集合中的最大值,並指定該任務,在tfTask任務的基礎上同步執行的效果通過TaskContinuationOptions.ExecuteSynchronously tfTask.ContinueWith(childTasksCompleteTask => { Console.WriteLine("The Max Return Value is {0}", childTasksCompleteTask.Result); },TaskContinuationOptions.ExecuteSynchronously); }); Console.WriteLine("主執行緒繼續做它的事情"); Console.ReadKey();//必須加這行程式碼,因為Task時執行緒池執行緒,屬於後臺執行緒 } /// <summary> /// 子任務一 /// </summary> static int ChildThreadOne(CancellationToken token) { var returnValue = 6; Thread.Sleep(3000);//模擬長時間計算操作 token.ThrowIfCancellationRequested(); Console.WriteLine("子任務一完成了計算任務,並返回值:{0}", returnValue); return returnValue; } /// <summary> /// 子任務二 /// </summary> static int ChildThreadTwo(CancellationToken token) { var returnValue = 66; Thread.Sleep(3000);//模擬長時間計算操作 token.ThrowIfCancellationRequested(); Console.WriteLine("子任務二完成了計算任務,並返回值:{0}", returnValue); return returnValue; } /// <summary> /// 子任務三 /// </summary> static int ChildThreadThree(CancellationToken token) { Thread.Sleep(2000);//模擬長時間計算操作 throw new Exception("模擬丟擲異常"); } } /// <summary> /// Linq擴充套件 /// </summary> public static class LinqExtension { public static void ForEach<T>(this IEnumerable<T> enumerators, Action<T> action) { foreach (var item in enumerators) { action(item); } } }
因為我給異常執行緒設定了2秒的休眠時間,正常子執行緒設定了3秒的休眠時間,所以所有的執行緒都沒有執行完畢,就被取消掉了.如果修改下正常執行緒的休眠時間為1秒,將會得到以下的輸出:
so,TaskFactory完美的完成了它的任務,且不會有任務執行緒發生阻塞的情況。