1. 程式人生 > >.NET非同步程式設計之任務並行庫

.NET非同步程式設計之任務並行庫

目錄

  • 1.簡介
  • 2.Parallel類
    • 2.0 Parallel類簡介
    • 2.1 Parallel.For()
    • 2.2 Parallel.ForEach()
    • 2.3 Parallel.Invoke()
    • 2.4 補充:執行緒安全集合
  • 3.Task類
    • 3.0 Task類簡介
    • 3.1 建立無返回值的Task任務
    • 3.2 建立有返回值的Task任務
    • 3.3 為Task新增延續任務
    • 3.4 Task.Delay
    • 3.5 Task物件的其他一些靜態方法
    • 3.6 取消非同步操作
  • 4.並行Linq(PLinq)
    • 4.1 AsParallel()
    • 4.2 取消並行查詢
  • 5.參考&原始碼下載
shanzm-2020年2月16日 00:45:04

1.簡介

System.Threading.Tasks中的型別被稱為任務並行庫(Task Parallel Library,TPL)。

System.Thread.Tasks

名稱空間是.NET Framework4.0所提供,

“TPL使用CLR執行緒池自動將應用程式的工作動態分配到可用的CPU中。TPL還處理工作分割槽、執行緒排程、狀態管理和其他低級別的細節操作。最終結果是,你可以最大限度地提升.NET應用程式的效能,並且避免直接操作執行緒所帶來的複雜性” --《精通C#》




2.Parallel類


2.0 Parallel類簡介

System.Threading.Tasks名稱空間下有一個靜態類:Parallel類

Parallel可以實現對實現了IEnumerable介面的資料集合的每一個元素並行操作

有一點要說明的:並行操作會帶來一定的成本,如果任務本身能很快完成,或是迴圈次數很少,那麼並行處理的速度也許會比非並行處理還慢。

Parallel類就只有有三個方法:Parallel.For()Parallel.ForEach()Parallel.Invoke()

但是呢,這每個方法都有大量的過載(F12-->自行檢視Parallel定義)


2.1 Parallel.For()

使用Parallel.For()可以對陣列中的每一個元素進行並行操作

正常的遍歷陣列是按照索引的順序執行的,但是並行操作,對陣列的每一個元素的操作不一定按照索引順序操作

Parallel.For(),第一個引數是迴圈開始的索引(包含),第二個引數是迴圈結束的索引(不含)

Parallel.For()的第三個引數是一個有引數無返回值的委託,其引數是陣列的索引

其實就相當於:for (int i = 0; i < length; i++)的非同步版本,只是在這裡是並行操作,所以並不按照陣列中元素的順序執行,具體的執行順序是不可控的。

示例

static void Main(string[] args)
{
    int[] intArray = new int[] { 1, 2, 3, 4, 5, 6, 7, 8, 9, 10 };
    Console.WriteLine("------------常規,對陣列進行迴圈遍歷------------");
    Array.ForEach(intArray, n => Console.WriteLine($"當前操作的陣列元素是{n}"));//注意這裡的引數n是元素而不是索引
    
    Console.WriteLine("------------並行操作 對陣列進行迴圈遍歷------------");
    
    Parallel.For(0, intArray.Length, (i) => Console.WriteLine($"當前迴圈次數{i},當前操作的陣列元素是{intArray[i]}"));
    Console.ReadKey();
}

執行結果:可以看出,對陣列的元素的操作順序並不是按照索引的順序,而是不確定的。


2.2 Parallel.ForEach()

Parallel.ForEach()用於對泛型可列舉物件的元素進行並行操作

其實就相當於:foreach (var item in collection)的非同步版本

Parallel.ForEach()有大量的過載,這裡展示一個簡單的操作

Parallel.ForEach()的第一個引數是待操作的可列舉物件,第二個引數是一個有引數無返回值的委託,該委託引數是集合的元素(而不是索引)

示例

List<int> intList = new List<int>() { 1, 2, 3, 4, 5, 6, 7, 8, 9, 10 };
Parallel.ForEach(intList, n => Console.WriteLine(n+100));
Console.ReadKey();


2.3 Parallel.Invoke()

Parallel.Invoke()對指定一系列操作並行運算

引數是一個Action委託陣列(注意只能是Action[],即只能是無返回值的委託陣列)

Parallel.Invoke()最常見用於併發請求介面

示例:

static void Main(string[] args)
{
     Action action1=() =>
     {
         for (int i = 0; i < 5; i++)
         {
             Console.WriteLine($"action-1-操作");
         }
     };

     Action action2 = () =>
     {
         for (int i = 0; i < 5; i++)
         {
             Console.WriteLine($"action-2-操作");
         }
     };
     //Parallel.Invoke(action1, action2);
     Action[] actions = { action1, action2 };
     Parallel.Invoke(actions);
     Console.ReadKey();
}

執行結果:


2.4 補充:執行緒安全集合

詳細可以參考微軟的線上文件

多執行緒對同一個資料集合同時讀寫操作,可能會造成資料的混亂

.NET4 引入了System.Collections.Concurrent 名稱空間,其中包含多個執行緒安全的資料集合型別。

現在的新專案中,只要是對資料集合進行多執行緒的增刪操作,就應該使用併發集合類。

但是,如果僅從集合進行多執行緒的讀取,則可使用一般的資料集合,即 System.Collections.Generic 名稱空間中的類。

.net 中執行緒安全的資料集合有一下一些:

型別 描述
BlockingCollection 為實現 IProducerConsumerCollection 的所有型別提供限制和阻止功能。 有關詳細資訊,請參閱 BlockingCollection 概述。
ConcurrentDictionary 鍵值對字典的執行緒安全實現。
ConcurrentQueue FIFO(先進先出)佇列的執行緒安全實現。
ConcurrentStack LIFO(後進先出)堆疊的執行緒安全實現。
ConcurrentBag 無序元素集合的執行緒安全實現。
IProducerConsumerCollection 型別必須實現以在 BlockingCollection 中使用的介面。

一個簡單的示例:給一個數據集合新增大批量的資料

List<int> list = new List<int>();
Parallel.For(0, 1000000, t => list.Add(t));

若是按照上面使用Parallel.For()的並行方式給List新增資料,

則會報錯:“索引超出了陣列界限。”或“ 源陣列長度不足。請檢查 srcIndex 和長度以及陣列的下限。”

即使沒有報錯,list中的資料也是有問題的(比可能數量不足)

當然可以通過加鎖的方式進行彌補:

List<int> list = new List<int>();
object locker = new object();
Parallel.For(0, 1000000, t => { lock(locker) { list.Add(t); } });

這樣通過對操作的執行緒枷鎖,完全是沒有必要的,你可以使用執行緒安全的集合型別,比如在這裡使用ConcurrentBag

ConcurrentBag<int> cBag = new ConcurrentBag<int>();
Parallel.For(0, 100000, t => cBag.Add(t));

當然因為是並行操作,所以插入集合中的資料並不是按照0-100000的順序(僅僅是成段的有序)。




3.Task類


3.0 Task類簡介

System.Threading.Tasks名稱空間中Task類,表示非同步操作。

Task類可以輕鬆地在次執行緒中呼叫方法,可以作為非同步委託的簡單替代品。

同時在該名稱空間還有一個泛型Task<TResul>類,TResult 表示非同步操作執行完成後返回值的型別。

建立一個Task操作,只需要使用靜態函式Task.Run()即可,

Task.Run()是一個.net framework4.5及以上定義的一個預設非同步操作,

Task.Run()引數是委託,即需要非同步執行的方法,

注意作為Task.Run()的引數的委託都是無參委託,

若Task.Run()引數是無返回值的委託Action,則Task.Run()返回值是Task型別

若Task.Run()引數是有返回值的委託Func<TResult>,則Task.Run()返回值是Task<TResult>泛型

注意:若是低於.net4.5,則可以使用Task.Factory.StartNew(),和Task.Run()靜態方法作用一樣

總而言之,言而總之,show you code ,一切皆明瞭!


3.1 建立無返回值的Task任務

示例:無返回值的Task

static void Main(string[] args)
{
    //1.使用Task建構函式建立,必須顯式的使用.Start()才能開始執行
    //Task task = new Task(() => { Thread.Sleep(10); Console.WriteLine("我是Task ,我結束了"); });
    //task.Start();

    //2.使用TaskFactory.StartNew(工廠建立) 方法
    //Task task = Task.Factory.StartNew(() => { Thread.Sleep(10); Console.WriteLine("我是Task ,我結束了"); });

    //3.使用Task.Run()
    Task task = Task.Run(() => { Thread.Sleep(10); Console.WriteLine("我是Task.Run ,我結束了"); });
    if (!task.IsCompleted)//task.IsCompleted判斷當前的任務是否已完成
    {
        Console.WriteLine("當前的Task.Run()尚未執行完,但是因為非同步,返回到呼叫函式,所以可以先執行後續的程式碼");
    }

    Console.WriteLine("當前Task.Run還沒有完成,我們是在他之後的程式碼但是先執行了");

    task.Wait();//強行鎖定執行緒,等待task完成
    Console.WriteLine("終於Task.Run完成了工作");
    Console.ReadKey();
}


3.2 建立有返回值的Task任務

若是Task任務有返回值,返回值型別為Task<T>,使用返回值的Result屬性查詢具體值

除錯時注意檢視,執行到 Console.WriteLine(task.Result)的時候,其中Task任務還是在執行Thread.Sleep(1000)

還沒有出結果,我們希望的非同步執行也沒有發生,而是程式是在一直在等待,這是為什麼呢?

是因為一但執行了task.Result,即使task任務還沒有完成,主執行緒則停下等待,直到等待task.Result出結果

這種情況和非同步委託中呼叫EndInvoke()是一樣的:一旦執行EndInvoke,若是引用方法還沒有完成,主執行緒則停止,直到引用函式執行結束。

所以可以這樣理解:task.Result可以看作是一個未來結果(一定有結果但還在運算中)

示例:有返回值的Task

static void Main(string[] args)
{
    Console.WriteLine("SomeDoBeforeTask");
    Func<int> Do = () => { Thread.Sleep(1000); Console.WriteLine("Task.Run結束"); return 2; };
    Task<int> task = Task.Run(Do);
    Console.WriteLine(task.Status);//使用task.Status檢視當前的Task的狀態:當前的狀態:WaitingToRun
    Console.WriteLine(task.Result);//使用task.result操作Task任務的返回值:返回值是:2
    Console.WriteLine(task.Status);//使用task.Status檢視當前的Task的狀態:當前的狀態:RanToComplation
    Console.WriteLine("SomeDoAfterTask");
    Console.ReadKey();
}

執行結果:

說明:
其中我們使用task.Result檢視當前的task的狀態,其中Task的狀態(即其生命週期)只有三種:

  • Created(建立Task):注意只有Task task=new Task(...),此時的Task狀態為Created,其他方式建立的Task跳過了Created狀態
  • WaitingToRun(等待執行Task)
  • RanToComplation(Task完成)


3.3 為Task新增延續任務

Task任務是在後臺執行的同時,主執行緒的繼續執行後續程式

所以有時候需要在Task結束後,繼續執行某個特定的任務,即為Task新增延續任務(也稱接續工作)

舉一個簡單的例子,

求解1-5000能求被3整除的個數,這個過程需要許多時間,我把它定義為一個Task.Run()

我們需要在求出結果後打印出結果,這裡怎麼操作呢?

若是直接使用task.Result則會阻塞主執行緒,一直等待運算出結果,這顯然不是我們想要的

若是使用while(!task.IsComplation){//後續操作},你無法判斷Task何時結束,而且一旦Task結束則會中斷後續操作

這裡就是需要為Task加上接續工作

這裡你可以明白,接續本質和非同步委託中的回撥模式是一樣的,回撥方法就是接續工作


3.3.1使用task.ContinueWith()

task1.ContinueWith(...task2..)表示當task1結束後接著執行task2任務

注意這裡我們使用Lambda表示式編寫接續工作,接續工作是有一個引數的,引數是Task型別,即上一個Task

即第一個Task完成後自動啟動下一個Task,實現Task的延續

注意:ContinueWith()的返回值亦是Task型別物件,即新建立的任務

可以為接續工作task2繼續新增接續工作task3

示例5 :

  static void Main(string[] args)
  {
      Console.WriteLine("task執行前...");
      Task<int> task1 = Task.Run(() => Enumerable.Range(1, 5000).Count(n => (n % 3) == 0));
      Task task2 = task1.ContinueWith(t => Console.WriteLine($"當你看到這句話則task1結束了,1-5000中能被3整除的個數{t.Result}"));//這裡的t就是task1
      Task task3 = task2.ContinueWith(t => Console.WriteLine($"當你看到這句話則task2也結束了"));
      Console.WriteLine($"task1及其接續工作正在執行中," + "\t\n" + "我們現在正在執行其他的後續程式碼");
      Console.ReadKey();
  }

執行結果:


3.3.2使用Awaiter

使用task.GetAwaiter()為相關的task建立一個等待者

示例:

    static void Main(string[] args)
    {
        Console.WriteLine("task執行前...");
        Task<int> task1 = Task.Run(() => Enumerable.Range(1, 5000).Count(n => (n % 3) == 0));
        var awaiter = task1.GetAwaiter();//建立一個awaiter物件
        //awaiter.OnCompleted(() => Console.WriteLine($"當你看到這句話則task1結束了,1-5000中能被3整除的個{task1.Result}"));
        awaiter.OnCompleted(() => Console.WriteLine($"當你看到這句話則task1結束了,1-5000中能被3整除的個{awaiter.GetResult()}"));
        Console.WriteLine($"task1及其接續工作正在執行中," + "\t\n" + "我們現在正在執行其他的後續程式碼");
        Console.ReadKey();
    }

執行效果同上。

3.3.3使用ContinueWith和Awaiter的區別:

ContinueWith會返回Task物件,它非常適合用於增加更多的接續工作,不過,如果Task出錯,必須直接處理AggregateException。

使用task.GetAwaiter建立awaiter物件,是在.net4.5之後,其中C#5.0的非同步功能就是使用這種方式。

使用awaiter也是可以使用task.Result直接的檢視任務的結果,但是使用awaiter.GetResult()可以在Task出現異常的時候直接丟擲,不會封裝在AggregateException中。

3.4 Task.Delay

延時執行Task

3.4.1 使用Task.Delay()和ContinueWith實現延遲工作

其實就相當於實現Thread.Sleep()的非同步版本

若是你使用Thread.Sleep(),則會程式一直在等待(即阻塞執行緒),直到等待結束才會執行後續的程式碼

而這裡就相當於給給Thread.Sleep()一個加了接續工作,且這個接續工作是非同步的。

即使用Task.Delay()不會阻塞主執行緒,主執行緒可以繼續執行後續程式碼

示例:

    //新建非同步任務,30毫秒秒後執行
    Task.Delay(30).ContinueWith(c =>
    {
        for (int i = 0; i < 50; i++)
        {
            Console.WriteLine(i + "這是Task在執行");
        }
    });
    for (int i = 0; i < 100; i++)
    {
        Console.WriteLine(i + "這是Task之後的程式在執行");
    }

除錯的時候你會發現,剛開始的時候的時候是先顯示的"i這是Task之後的程式在執行"

之後在等帶了30毫秒,後就會開始顯示"i這是Task在執行"和"i這是Task之後的程式在執行"交叉顯示

執行結果如下:


3.4.2 使用Task.Delay()和Awaiter實現延遲工作

示例:執行效果同上

    Task.Delay(30).GetAwaiter().OnCompleted(() =>
    {
        for (int i = 0; i < 50; i++)
        {
            Console.WriteLine(i + "這是Awaiter在執行行");
        }
    });
    for (int i = 0; i < 100; i++)
    {
        Console.WriteLine(i + "這是Awaiter之後的程式在執行行");
    }
    Console.ReadKey();    


3.5 Task物件的其他一些靜態方法

方法名 說明
Task.Wait task1.Wait();就是等待任務執行(task1)完成,task1的狀態變為Completed
Task.WaitAll 待所有的任務都執行完成
Task.WaitAny 發同Task.WaitAll,就是等待任何一個任務完成就繼續向下執行
CancellationTokenSource 通過cancellation的tokens來取消一個Task


3.6 取消非同步操作

非同步方法是可以請求終止執行的,

System.Threading.Tasks名稱空間中有兩個類是為此目的而設計的:Cance1lationToken和CancellationTokenSource。

下面看使用CancellationTokenSource和CancellationToken來實現取消某個非同步操作。

這裡使用Task.Run()為例,其第一個引數是一個Action委託,第二個引數就是CancellationToken物件

static void Main(string[] args)
{
    CancellationTokenSource cts = new CancellationTokenSource();//生成一個CancellationTokenSource物件,該物件可以建立CancellationToken                                                 
    CancellationToken ct = cts.Token;//獲取一個令牌(token)
    Task.Run(() =>
    {
        for (int i = 0; i < 20; i++)
        {
            if (ct.IsCancellationRequested)
            {
                return;
            }
            Thread.Sleep(1000);
            Console.WriteLine($"非同步程式的的迴圈:{i}");
        }
    }, ct);//注意Run()的第二個引數就是終止令牌token
    for (int i = 0; i < 4; i++)
    {

        Thread.Sleep(1000);
        Console.WriteLine($"主執行緒中迴圈:{i}");
    }
    Console.WriteLine("馬上sts.Cancel(),即將要終止非同步程式");
    cts.Cancel();//含有該CancellationTokenSource的token的非同步程式,終止!
    Console.ReadKey();
}

執行結果:可以發現非同步任務Task.Run()還沒有完成,但是因為cst.Cancel()執行,token的屬性IsCancellationRequested變為true,非同步迴圈結束。

說明:取消一個非同步操作的過程,注意,該過程是協同的。

即:呼叫CancellationTokenSource的Cancel時,它本身並不會執行取消操作。
而是會將CancellationToken的IsCancellationRequested屬性設定為true。
包含CancellationToken的程式碼負責檢查該屬性,並判斷是否需要停止執行並返回。




4.並行Linq(PLinq)


4.1 AsParallel()

System.Linq名稱空間中有一個ParallelEnumerable類,該類中的方法可以分解Linq查詢的工作,使其分佈在多個執行緒上,即實現並行查詢。

為並行執行而設計的LINQ查詢稱為PLINQ查詢。

下面讓我們先簡單的理一理:

首先我們都知道Enumerable類為IEnumberable<T>介面擴充套件了一系列的靜態方法。(就是我們使用Linq方法語法的中用的哪些常用的靜態方法,自行F12)

正如MSDN中所說:“ParallelEnumberable是Enumberable的並行等效項”,ParallelEnumberable類則是Enumerable類的並行版本,

F12檢視定義可以看到ParallelEnumerable類中幾乎所有的方法都是對ParallelQuery<TSource>介面的擴充套件,

但是,在ParallelEnumberable類有一個重要的例外,AsParallel() 方法還對IEnumerable<T>介面的擴充套件,並且返回的是一個ParallelQuery<TSource>型別的物件,

所以呢?凡是實現類IEnumberable<T>集合可以通過呼叫靜態方法AsParallel(),返回一個ParallelQuery型別的物件,之後就可以使用ParallelEnumerable類中的非同步版本的靜態查詢方法了!

注意在執行PLinq的時候,PLinq會自動的判斷如果查詢能從並行化中受益,則將同時執行。而如果並行執行查詢會損害效能,PLINQ將按順序執行查詢。

示例:求1到50000000中可以整除3的數,將所求的結果倒序存放在modThreeIsZero[]中

這是需要非常多的重複運算,所以我們可以對比按照一般Linq查詢下方式和PLinq查詢,對比一些需要的時間。

static void Main(string[] args)
{
    int[] intArray = Enumerable.Range(1, 50000000).ToArray();
    Stopwatch sw = new Stopwatch();

    //順序查詢
    sw.Start();
    int[] modThreeIsZero1 = intArray.Select(n => n).Where(n => n % 3 == 0).OrderByDescending(n => n).ToArray();
    sw.Stop();
    Console.WriteLine($"順序查詢,執行時間:{sw.ElapsedMilliseconds}毫秒,可以整除3的個數:{modThreeIsZero1.Count()}");

    //使用AsParallel()實現並行查詢
    //AsParallel()方法返回ParallelQuery&lt;TSourc>型別物件。因為返回的型別,所以編譯器選擇的Select()、Where()等方法是ParallelEnumerable.Where(),而不是Enumerable.Where()。
    sw.Restart();
    int[] modThreeIsZero2 = intArray.AsParallel().Select(n => n).Where(n => n % 3 == 0).OrderByDescending(n => n).ToArray();
    sw.Stop();
    Console.WriteLine($"並行查詢,執行時間:{sw.ElapsedMilliseconds}毫秒,可以整除3的個數:{modThreeIsZero2.Count()}");

    Console.ReadKey();
}

說明:AsParallel()方法返回ParallelQuery<TSourc>型別物件。因為返回的型別,所以編譯器選擇的Select()、Where()等方法是ParallelEnumerable.Where(),而不是Enumerable.Where()。

執行結果:

可以對比結果,在大規模的Linq查詢中,同步查詢和並行查詢兩者的執行時間的差距還是很大的!

但是小規模的Linq查詢二者的效果其實並沒有很明顯。

4.2 取消並行查詢

在3.6取消非同步操作中解釋瞭如何取消一個長時間的任務,

那麼對於長時間執行的PLinq也是可以取消的

同樣是使用CancellationTokenSource生成一個CancellationToken物件作為token

怎麼把token給PLinq呢?使用ParallelQuery<TSource>中靜態方法WithCancellation(token)

在PLinq中,若是取消了並行操作,則會丟擲OperationCanceledException

示例:

static void Main(string[] args)
{
    //具體的作用和含義可以看0030取消一個非同步操作
    CancellationTokenSource cts = new CancellationTokenSource();
    CancellationToken ct = cts.Token;
    int[] intArray = Enumerable.Range(1, 50000000).ToArray();
    Task<int[]> task = Task.Run(() =>
    {
        try
        {
            int[] modThreeIsZero = intArray.AsParallel().WithCancellation(ct).Select(n => n).Where(n=> n% 3 == 0).OrderByDescending(n => n).ToArray();
            return modThreeIsZero;
        }
        catch (OperationCanceledException ex)//一旦PLinq中取消查詢就會觸發OperationCanceledException異常
        {
            Console.WriteLine(ex.Message);//注意:Message的內容就是:已取消該操作
            return null;
        }
    });
       
    Console.WriteLine("取消PLinq?Y/N");
    string input = Console.ReadLine();
    if (input.ToLower().Equals("y"))
    {
        cts.Cancel();//取消並行查詢
        Console.WriteLine("取消了PLinq!");//undone:怎麼驗證已經真的取消了
    }
    else
    {
        Console.WriteLine("Loading……");
        Console.WriteLine(task.Result.Count());
    }
    Console.ReadKey();
}




5.參考&原始碼下載

書籍:精通C#

書籍:C#高階程式設計

書籍:ASP.NET MVC5網站開發之美

文件:.NET API 瀏覽器

點選:原始碼下載

唉,書真是越看越厚,皆是淺嘗輒止,先到這裡