1. 程式人生 > >C# 多執行緒六之Task(任務)三之任務工廠 C# 多執行緒六之Task(任務)二

C# 多執行緒六之Task(任務)三之任務工廠 C# 多執行緒六之Task(任務)二

1、知識回顧,簡要概述

前面兩篇關於Task的隨筆,C# 多執行緒五之Task(任務)一 和 C# 多執行緒六之Task(任務)二,介紹了關於Task的一些基本的用法,以及一些使用的要點,如果都看懂了,本文將介紹另一個Task的特殊用法,前面介紹了,如何通過一個父任務建立多個子任務,且這些子任務都必須要支援取消的例子,常規做法是,通過new 一個Task陣列物件,然後在該物件的內部建立多個Task任務,然後給這些任務指定TaskCreationOptions.AttachedToParent,這樣所有的子任務都關聯到了父任務,接著給這些子任務,繫結一個CancellationToken類例項,當其中一個子任務發生異常時,呼叫CancellationToken類例項的Cancel方法,將其餘的子任務全都取消,大致程式碼如下:

        static void Main(string[] args)
        {
            var parentTask = new Task<int[]>(() =>
            {
                var results = new int[3];
                var cancelTokenSource = new CancellationTokenSource();
                var childTasks = new Task[] {
                    
new Task(() => results[0] = ChildThreadOne(cancelTokenSource.Token),cancelTokenSource.Token, TaskCreationOptions.AttachedToParent), new Task(() => results[1] = ChildThreadTwo(cancelTokenSource.Token),cancelTokenSource.Token, TaskCreationOptions.AttachedToParent),
new Task(() => results[2] = ChildThreadThree(cancelTokenSource.Token),cancelTokenSource.Token, TaskCreationOptions.AttachedToParent), }; //開啟所有的子任務 childTasks.ForEach(f => { f.Start(); }); //如果有子任務發生異常,那麼通過取消訊號量終止所有的任務 childTasks.ForEach(f => { f.ContinueWith(task=> cancelTokenSource.Cancel(), TaskContinuationOptions.OnlyOnFaulted); }); return results; }); parentTask.Start(); parentTask.ContinueWith(x => { Console.WriteLine("當父任務執行完畢時,CLR會喚起一個新執行緒,將父任務的返回值(子任務的返回值)輸出,所以這裡不會有任何的執行緒發生阻塞"); foreach (var re in parentTask.Result) { Console.WriteLine("子任務的返回值分別為:{0}", re); } }); Console.WriteLine("主執行緒不會阻塞,它會繼續執行"); Console.ReadKey();//必須加這行程式碼,因為Task時執行緒池執行緒,屬於後臺執行緒 } /// <summary> /// 子任務一 /// </summary> static int ChildThreadOne(CancellationToken token) { Thread.Sleep(3000);//模擬長時間計算操作 token.ThrowIfCancellationRequested(); Console.WriteLine("子任務一完成了計算任務,並返回值:{0}", 6); return 6; } /// <summary> /// 子任務二 /// </summary> static int ChildThreadTwo(CancellationToken token) { Thread.Sleep(2000);//模擬長時間計算操作 token.ThrowIfCancellationRequested(); throw new Exception("模擬丟擲異常"); } /// <summary> /// 子任務三 /// </summary> static int ChildThreadThree(CancellationToken token) { Thread.Sleep(3000);//模擬長時間計算操作 token.ThrowIfCancellationRequested(); Console.WriteLine("子任務三完成了計算任務,並返回值:{0}", 6); return 6; } } /// <summary> /// Linq擴充套件 /// </summary> public static class LinqExtension { public static void ForEach<T>(this IEnumerable<T> enumerators, Action<T> action) { foreach (var item in enumerators) { action(item); } } }

這裡需要注意,這裡給父任務parentTask開啟了三個子任務,並且通過TaskCreationOptions.AttachedToParent指定了所有的子任務不能獨立於父任務執行,並且給所有的子任務,傳遞了CancellationToken訊號量,當其中一個子任務發生異常時,所有其餘的子任務都終止,但是你必須知道的是,你沒有判斷哪個任務會被終止,因為如果不指定執行緒優先順序,哪怕制定了優先順序,你也無法確定的判斷某個計算任務在什麼時候會排程完,所以我給正常的執行的任務,Sleep了三秒,丟擲異常的任務Sleep了兩秒,所以所有的子執行緒都無法執行完畢.

 

2、程式碼重構

 ok,雖然上面的程式碼很好的完成了我們在程式碼層面的需求,但是處於對程式碼的重用性考慮,有沒有發現這個問題:

這塊操作,可以重構的,因為所有的引數都一樣,當然你可以去抽象一個共有的方法,裡面放一個Func委託,當然把引數抽象出來,形成一個公共的方法,像下面這樣做:

    class Program
    {
        private static CancellationTokenSource cancelTokenSource = new CancellationTokenSource();
        private static TaskCreationOptions taskCreationOptions = TaskCreationOptions.AttachedToParent;
        static void Main(string[] args)
        {
            var parentTask = new Task<int[]>(() =>
            {
                var results = new int[3];
               
                var childTasks = new Task[] {
                    ExecuteChildThread(task=> results[0]=ChildThreadOne(cancelTokenSource.Token)),
                    ExecuteChildThread(task=> results[1]=ChildThreadTwo(cancelTokenSource.Token)),
                    ExecuteChildThread(task=> results[2]=ChildThreadThree(cancelTokenSource.Token))
                };
                //開啟所有的子任務
                childTasks.ForEach(f => { f.Start(); });

                //如果有子任務發生異常,那麼通過取消訊號量終止所有的任務
                childTasks.ForEach(f =>
                {
                    f.ContinueWith(task=> cancelTokenSource.Cancel(), TaskContinuationOptions.OnlyOnFaulted);
                });

                return results;
            });
            parentTask.Start();
            parentTask.ContinueWith(x =>
            {
                Console.WriteLine("當父任務執行完畢時,CLR會喚起一個新執行緒,將父任務的返回值(子任務的返回值)輸出,所以這裡不會有任何的執行緒發生阻塞");
                foreach (var re in parentTask.Result)
                {
                    Console.WriteLine("子任務的返回值分別為:{0}", re);
                }
            });
            Console.WriteLine("主執行緒不會阻塞,它會繼續執行");
            Console.ReadKey();//必須加這行程式碼,因為Task時執行緒池執行緒,屬於後臺執行緒
        }

        /// <summary>
        /// 子任務一
        /// </summary>
        static int ChildThreadOne(CancellationToken token)
        {
            Thread.Sleep(2000);//模擬長時間計算操作
            token.ThrowIfCancellationRequested();
            Console.WriteLine("子任務一完成了計算任務,並返回值:{0}", 6);
            return 6;
        }

        /// <summary>
        /// 子任務二
        /// </summary>
        static int ChildThreadTwo(CancellationToken token)
        {
            Thread.Sleep(2000);//模擬長時間計算操作
            token.ThrowIfCancellationRequested();
            Console.WriteLine("子任務二完成了計算任務,並返回值:{0}", 6);
            return 6;
        }

        /// <summary>
        /// 子任務三
        /// </summary>
        static int ChildThreadThree(CancellationToken token)
        {
            Thread.Sleep(2000);//模擬長時間計算操作
            token.ThrowIfCancellationRequested();
            Console.WriteLine("子任務三完成了計算任務,並返回值:{0}", 6);
            return 6;
        }

        /// <summary>
        /// 建立一個通用的子執行緒方法,裡面封裝了所有子執行緒的需要設定的公共引數
        /// </summary>
        /// <param name="func"></param>
        /// <returns></returns>
        static Task<int> ExecuteChildThread(Func<CancellationToken, int> func)
        {
             var t=new Task<int>(()=>func.Invoke(cancelTokenSource.Token), cancelTokenSource.Token, taskCreationOptions);
             return t;
        }
    }

    /// <summary>
    /// Linq擴充套件
    /// </summary>
    public static class LinqExtension
    {
        public static void ForEach<T>(this IEnumerable<T> enumerators, Action<T> action)
        {
            foreach (var item in enumerators)
            {
                action(item);
            }
        }
    }

ok,通過對子任務的抽象,你可以這麼幹,但是MS提供了更好的辦法,你又何必重複造輪子呢?而且這裡存在著潛在的多執行緒爭用問題,

所有的執行緒都用到了這兩個全域性變數,最好加個鎖,但是加了鎖之後,效能就會受到影響.

但是奇怪的是,我無法重現,如果你能重現那是最好的,下面就開始介紹Ms提供的任務工廠

 

3、任務工廠實戰

下面再次對上面的方法進行重構,用任務工廠的方式,首先使用TaskFactory任務工廠的前提你必須清楚,就是建立的子任務,必須是一組共享配置的子任務物件集,所以,如果當中如果某個子任務需要使用特殊的配置,那就不能使用任務工廠,也不是不能使用,就是那個子任務你必須獨立出來,不能放到任務工廠裡面.ok,瞭解了前提條件後,開始實踐,程式碼如下:

    class Program
    {
        static void Main(string[] args)
        {
            var parentTask=Task.Run(()=> {
                var cts = new CancellationTokenSource();
                //通過TaskFactory設定子任務的公共引數
                var tf = new TaskFactory<int>(cts.Token,TaskCreationOptions.AttachedToParent,TaskContinuationOptions.ExecuteSynchronously,TaskScheduler.Default);

                //通過TaskFactory設定所有的子任務,這些子任務共享上面公共引數
                var childTasks = new Task<int>[] {
                    tf.StartNew(() => ChildThreadOne(cts.Token)),
                    tf.StartNew(() => ChildThreadTwo(cts.Token)),
                    tf.StartNew(() => ChildThreadThree(cts.Token))
                };
                //如果子任務發生異常,則向餘下沒有執行完畢的子任務傳遞取消執行的訊號,如果有子任務執行完畢了,那就沒有辦法了
                childTasks.ForEach(f =>
                {
                    f.ContinueWith(childTask => cts.Cancel(), TaskContinuationOptions.OnlyOnFaulted);
                });

                //遍歷所有通過TaskFactory建立的子任務,然後篩選出沒有被取消和沒有發生異常的子任務,或者這些任務中的最大返回值
                //這個任務不阻塞執行緒,只有當所有的子任務執行完畢之後,CLR會喚起執行緒池中的一個新執行緒來執行這個操作
                //通過給喚起子執行緒設定CancellationToken.None,來達到這個執行緒不會被任何因素來取消該執行緒的目的
                var tfTask = tf.ContinueWhenAll(childTasks,
                completedTasks => completedTasks.Where(completedTask => !completedTask.IsCanceled && !completedTask.IsFaulted).Max(completedTask => completedTask.Result), CancellationToken.None
                );

                //輸出所有符合要求的子任務集合的返回值集合中的最大值,並指定該任務,在tfTask任務的基礎上同步執行的效果通過TaskContinuationOptions.ExecuteSynchronously
                tfTask.ContinueWith(childTasksCompleteTask =>
                {
                    Console.WriteLine("The Max Return Value is {0}", childTasksCompleteTask.Result);
                },TaskContinuationOptions.ExecuteSynchronously);
            });
            Console.WriteLine("主執行緒繼續做它的事情");
            Console.ReadKey();//必須加這行程式碼,因為Task時執行緒池執行緒,屬於後臺執行緒
        }

        /// <summary>
        /// 子任務一
        /// </summary>
        static int ChildThreadOne(CancellationToken token)
        {
            var returnValue = 6;
            Thread.Sleep(3000);//模擬長時間計算操作
            token.ThrowIfCancellationRequested();
            Console.WriteLine("子任務一完成了計算任務,並返回值:{0}", returnValue);
            return returnValue;
        }

        /// <summary>
        /// 子任務二
        /// </summary>
        static int ChildThreadTwo(CancellationToken token)
        {
            var returnValue = 66;
            Thread.Sleep(3000);//模擬長時間計算操作
            token.ThrowIfCancellationRequested();
            Console.WriteLine("子任務二完成了計算任務,並返回值:{0}", returnValue);
            return returnValue;
        }

        /// <summary>
        /// 子任務三
        /// </summary>
        static int ChildThreadThree(CancellationToken token)
        {
            Thread.Sleep(2000);//模擬長時間計算操作
            throw new Exception("模擬丟擲異常");
        }
    }

    /// <summary>
    /// Linq擴充套件
    /// </summary>
    public static class LinqExtension
    {
        public static void ForEach<T>(this IEnumerable<T> enumerators, Action<T> action)
        {
            foreach (var item in enumerators)
            {
                action(item);
            }
        }
    }

因為我給異常執行緒設定了2秒的休眠時間,正常子執行緒設定了3秒的休眠時間,所以所有的執行緒都沒有執行完畢,就被取消掉了.如果修改下正常執行緒的休眠時間為1秒,將會得到以下的輸出:

so,TaskFactory完美的完成了它的任務,且不會有任務執行緒發生阻塞的情況。