1. 程式人生 > >並行程式設計(Parallel Framework)

並行程式設計(Parallel Framework)

前言

並行程式設計:通過編碼方式利用多核或多處理器稱為並行程式設計,多執行緒概念的一個子集。

並行處理:把正在執行的大量的任務分割成小塊,分配給多個同時執行的執行緒。多執行緒的一種。

並行程式設計分為如下幾個結構

1.並行的LINQPLINQ

2.Parallel

3.任務並行結構

4.併發集合

5.SpinLockSpinWait

這些是.NET 4.0引入的功能,一般被稱為PFX(Parallel Framework,並行框架)

Parallel類和任務並行結構稱為TPL(Task Parallel Library,任務並行庫)

並行框架(PFX)

1.並行框架基礎

當前CPU技術達到瓶頸,而製造商將關注重點轉移到提高核心技術上,而標準單執行緒程式碼並不會因此而自動提高執行速度。利用多核提升程式效能通常需要對計算密集型程式碼進行一些處理:1.將程式碼劃分成塊。2.通過多執行緒並行執行這些程式碼塊。3.結果變為可用後,以執行緒安全和高效能的方式整合這些結果。傳統多執行緒結構雖然實現功能,但難度頗高且不方便,特別是劃分和整理的步驟(本質問題是:多執行緒同時使用相同資料時,出於執行緒安全考慮進行鎖定的常用策略會引發大量競爭)。而並行框架(Parallel Framework)專門用於在這些應用場景中提供幫助。

2.並行框架組成

PFX:高層由兩個資料並行API組成:PLINQ或Parallel類。底層包含任務並行類和一組另外的結構為並行程式設計提供幫助。

基礎並行語言整合查詢(PLINQ)

語言整合查詢(Language Integrated Query,LINQ)提供了一個簡捷的語法來查詢資料集合。而這種由一個執行緒順序處理資料集合的方式我們稱為順序查詢(sequential query)

並行語言整合查詢(Parallel LINQ)LINQ並行版。它將順序查詢轉換為並行查詢,在內部使用任務,將集合中資料項的處理工作分散到多個CPU上,以併發處理多個數據項。

PLINQ將自動並行化本地的LINQ查詢System.Linq.ParallelEnumerable類(它定義在System.Core.dll中,需要引用System.Linq)公開了所有標準LINQ操作符的並行版本

。這些所有方法是依據System.Linq.ParallelQuery<T>擴充套件而來。

1.LINQ to PLINQ

要讓LINQ查詢呼叫並行版本,必須將自己的順序查詢(基於IEnumerable或IEnumerable<T>)轉換成並行查詢(基於ParallelQuery或ParallelQuery<T>),使用ParallelEnumerableAsParallel方法實現,如示例:

 1     class Program
 2     {
 3         static void Main(string[] args)
 4         {
 5             IEnumerable<int> numbers = Enumerable.Range(1, 1000);
 6             ParallelQuery parallelQuery =
 7                 from n in numbers.AsParallel()//轉換為並行
 8                 where n > 3
 9                 select n;
10             foreach (var item in parallelQuery)
11             {
12                 Console.WriteLine(item);
13             }
14             Console.ReadKey();
15         }
16     }

結果如下:使用Enumerable.Range生成的集合是順序的,但是經過並行查詢後順序被打亂。

2.PLINQ to LINQ

 將執行並行查詢的操作切換回執行順序查詢(並不常用),通過ParalleIEnumerableAsSequential實現。此時操作只由一個執行緒執行。

 1     class Program
 2     {
 3         static void Main(string[] args)
 4         {
 5             IEnumerable<int> numbers = Enumerable.Range(1, 1000);
 6             IEnumerable<int> enumerable = numbers.AsParallel().AsSequential().Where(c => c > 3);
 7             foreach (var item in enumerable)
 8             {
 9                 Console.WriteLine(item);
10             }
11             Console.ReadKey();
12         }
13     }

3.整合結果集(ForAll)

通常,一個LINQ查詢的結果資料是讓某個執行緒執行一個foreach來處理,此時只有一個執行緒遍歷查詢的所有結果,如果希望以並行方式處理查詢結果,通過ParalleIEnumerableForAll方法處理查詢,如示例:

 1     class Program
 2     {
 3         static void Main(string[] args)
 4         {
 5             IEnumerable<int> numbers = Enumerable.Range(1, 1000);
 6             (from n in numbers.AsParallel() where n > 3 select n).ForAll((d) =>
 7              {
 8                  d = d + 1000;
 9                  Console.WriteLine(d);//Console在此回損害效能,因為內部回對執行緒進行同步,此處因演示所以暫且一用
10              });
11             Console.ReadKey();
12         }
13     }

 執行結果如下:

解析PLINQ

1.PLINQ執行模型

如圖所示:

2.異常處理

PLINQ的報錯將以AggregateException形式被重拋,其中InnerExceeptions屬性包含一個或多個真正異常,示例可看內的異常處理部分。

3.PLINQ結果的排序

 並行化查詢當整理結果時不能保持初始化資料的原始次序。如果要保持序列的原始序列,可以通過在AsParallel之後呼叫AsOrdered來強制實現:

1             IEnumerable<int> numbers = Enumerable.Range(1, 10000);
2             var enumerable = numbers.AsParallel().Where(c => c > 3);

呼叫AsOrdered時,因為PLINQ要保持跟蹤每個元素的原始位置,會導致效能損失。

呼叫AsUnordered,可以在後續的查詢中低效AsOrdered產生的副作用,允許查詢從呼叫AsUnordered時起更高效的執行。

4.PLINQ存在的侷限與限制

1.若要使PLINQ發揮作用,必須具有一定數量的計算密集型工作可分配給工作者執行緒。大多數的LINQ to Objects查詢執行速度很快,不僅沒有必要並行化,而且劃分、整理和協調額外執行緒的開銷實際上會降低執行速度。而且查詢若呼叫了非執行緒安全的方法,PLINQ的結果有可能不正確。

2.PLINQ能夠並行化的內容還有些限制,以下查詢運算子防止查詢被並行化,除非源元素位於他們的元素索引位置:Take、TakeWhile、Skip和SkipWhileSelect、SelectMany和ElementAt的索引版本。

3.以下查詢運算子是並行化的,但所使用的複雜劃分策略有時可能比順序處理的速度還要低:Join、GroupBy、GroupJonin、Distinct、Union、Intersect和Except。

5.PLINQ的結果

和普通LINQ查詢一樣,PLINQ查詢也是延遲求值的。意味著執行只在開始使用時觸發。但是列舉結果集時和普通順序查詢有區別:

順序查詢:完全由使用者從輸入序列中“拉取”每個元素。

並行查詢:通常使用獨立執行緒來獲取序列中的元素,時間上比使用者需要它們時要提前,再通過查詢鏈並行處理元素後將結果儲存在一塊快取中,以便使用者按需取用。

注意:過早暫停結果列舉,查詢處理器也會暫停或結束,目的是不浪費CPU的時間或記憶體。在呼叫AsParallel之後呼叫WithMergeOptions可以調節PLINQ的緩衝行為。

6.如何使用PLINQ

為何優化將LINQ都並行化是不可取的,因為LINQ能解決大多數問題,執行速度也很快,因此無法從並行化中收益。

一種更好的方式是找出CPU密集的瓶頸,然後考慮通過LINQ的形式表達(這類重構,LINQ往往會使程式碼量變少,而且增強可讀性)。

PLINQ十分適用於易並行問題。他還可以很好地處理結構化的阻塞任務。

PLINQ不適於映象製作,因為將數百萬元素整理為一個輸出序列將帶來瓶頸,相反將元素寫入一個數組或託管記憶體塊中,然後使用Parallel類或任務並行管理多執行緒是更好的選擇。

Parallel類

Parallel類是對執行緒的一個很好的抽象。該類位於System.Threading.Tasks名稱空間中,提供了資料和任務並行性

PFX通過Parallel類中的三個靜態方法,提供了一種基本形式的結構化並行機制:

1.Parallel.Invoke

Parallel.Invoke用於並行執行一組委託,示例如下:

1         static void Main(string[] args)
2         {
3             Parallel.Invoke(
4                 () => Console.WriteLine($"當前執行緒Id:{Thread.CurrentThread.ManagedThreadId}"),
5                 () => Console.WriteLine($"當前執行緒Id:{Thread.CurrentThread.ManagedThreadId}")
6                 );
7             Console.ReadKey();
8         }

執行結果

Parallel.Invoke方法並行執行一組Action委託,然後等待它們完成。

1 public static void Invoke(params Action[] actions);

示例看起來像是建立和等待兩個Task物件的一種捷徑。但兩者存在重要的區別:如果傳入一個包含資料量非常大的委託陣列時,Parallel.Invoke方法仍然能高效工作,這是因為在底層,Parallel.Invoke方法是將大量元素劃分成較小的塊,分配給底層的Task執行,而不是每個委託建立一個獨立Task

2.Parallel.For

Parallel.For執行C# for迴圈的並行化等價迴圈,示例如下:

 1     class Program
 2     {
 3         static void Main(string[] args)
 4         {
 5             //順序迴圈
 6             {
 7                 for (int i = 0; i < 10; i++)
 8                 {
 9                     Test(i);
10                 }
11             }
12             Console.WriteLine("並行化for開始");
13             //順序執行轉換為並行化
14             {
15                 Parallel.For(0, 10, i => Test(i));
16             }
17             //順序執行轉換為並行化(更簡單的方式)
18             {
19                 Parallel.For(0, 10, Test);
20             }
21             Console.ReadKey();
22         }
23         static void Test(int i)
24         {
25             Console.WriteLine($"當前執行緒Id:{Thread.CurrentThread.ManagedThreadId},輸出結果為:{i}");
26         }
27     }

結果如下:

3.Parallel.ForEach

Parallel.ForEach執行C# foreach迴圈的並行化等價迴圈,示例如下:

 1     class Program
 2     {
 3         static void Main(string[] args)
 4         {
 5             string[] data = { "zero", "one", "two", "three", "four", "five", "six", "seven", "eight", "nine" };
 6             //順序迴圈
 7             {
 8                 foreach (string num in data)
 9                 {
10                     Test(num);
11                 }
12             }
13             Console.WriteLine("並行化foreach開始");
14             //順序執行轉換為並行化
15             {
16                 Parallel.ForEach(data, num => Test(num));
17             }
18             Console.ReadKey();
19         }
20         static void Test(string str)
21         {
22             Console.WriteLine($"當前執行緒Id:{Thread.CurrentThread.ManagedThreadId},輸出結果為:{str}");
23         }
24     }

執行結果:

注意:以上三個方法都會引發阻塞直到所有工作完成為止。和PLINQ一樣,在出現未處理的異常之後,餘下的工作者在它們當前的迭代之後停止,而一場將被拋回給呼叫者,並封裝在一個AggregateException中。

4.索引&跳出(ParallelLoopState)

有時迭代索引很有用處,但是切忌不可像順序迴圈的用法使用共享變數(迴圈內i++)的方式使用,因為共享變數值在並行上下文中是執行緒不安全的

同樣的,因為並行ForForEach中的迴圈體是一個委託,所以無法使用break語句提前退出迴圈,必須呼叫ParallelLoopState物件上的BreakStop方法。

ForEach為例,ForEach過載的其中之一如下,它包含Acton的其中有三個引數(TSourec=子元素,ParallelLoopState=並行迴圈狀態,long=索引):

1 public static ParallelLoopResult ForEach<TSource>(IEnumerable<TSource> source, Action<TSource, ParallelLoopState, long> body)

所以,想要得到索引和提前跳出的正確方式如示例:

 1     class Program
 2     {
 3         static void Main(string[] args)
 4         {
 5             string[] data = { "zero", "one", "two", "three", "four", "five", "six", "seven", "eight", "nine" };
 6             Parallel.ForEach(data, (num, state, i) =>
 7             {
 8                 Console.WriteLine($"當前索引為:{i},狀態為:{state}");
 9                 Test(num);
10                 if (num == "six")
11                     state.Break();
12             });
13             Console.ReadKey();
14         }
15         static void Test(string str)
16         {
17             Console.WriteLine($"當前執行緒Id:{Thread.CurrentThread.ManagedThreadId},輸出結果為:{str}");
18         }
19     }

結果如下:

For的版本如下:

 1     class Program
 2     {
 3         static void Main(string[] args)
 4         {
 5             string[] data = { "zero", "one", "two", "three", "four", "five", "six", "seven", "eight", "nine" };
 6             Parallel.For(0, data.Length, (i, state) =>
 7             {
 8                 Console.WriteLine($"當前索引為:{i},狀態為:{state}");
 9                 Test(data[i]);
10                 if (data[i] == "six")
11                     state.Break();
12             });
13             Console.ReadKey();
14         }
15         static void Test(string str)
16         {
17             Console.WriteLine($"當前執行緒Id:{Thread.CurrentThread.ManagedThreadId},輸出結果為:{str}");
18         }
19     }

任務並行

併發集合概述

.NET 4.0在System.Collections.Concurrent名稱空間中提供了一組新的集合。所有這些集合都完全是執行緒安全的:

這些集合不僅是為使用帶鎖的普通集合提供了快捷方式,而且可以在一般的多執行緒中使用併發集合,但需要注意:1.併發集合針對並行程式設計進行了調整。只有在高度併發的應用場景中,傳統集合的效能才能勝過它們

2.執行緒安全的集合不能確保使用它的程式碼也是安全的

3.如果列舉一個併發集合的同時,另一個執行緒要修改它,不會丟擲任何異常,相反,得到舊內容與新內容的混合。

4.不存在任何List<T>的併發版本。

5.它們的記憶體利用率沒有非併發的Stack和Queue類高效,但對於併發訪問的效果更好。

1.結構概述

這些併發集合與傳統集合的區別是:它們公開了特殊方法來執行原子測試和行動操作,而這些方法都是通過IProducerConsumerCollection<T>介面提供的。

IProducerConsumerCollection<T>介面代表一個執行緒安全的生產者/消費者集合,這三個類繼承並實現了IProducerConsumerCollection<T>介面:

ConcurrentStack<T>ConcurrentQueue<T>ConcurrentBag<T>

它們實現的TryAddTryTake方法用於測試一個新增/刪除操作能否執行,如果可以,則執行新增/刪除操作。測試與行動不需要對傳統集合上鎖。

ConcurrentBag<T>用於儲存物件的無需集合,適用於呼叫Take或TryTake時不關心獲取那個元素的額情況。

相對於併發佇列或堆疊,在多執行緒同時呼叫一個ConcurrentBag的Add時,不存在競爭,但佇列或堆疊並行呼叫Add會引起一些競爭,所以ConcurrentBag上呼叫Take方法非常高效。

BlockingCollection<T>類似阻塞集合,適用於等待新元素的出現,可以把它看作一個容器,使用一個阻塞集合封裝所有實現IProducerConsumerCollection<T>的集合,並且允許從封裝的集合中去除元素,若沒有元素,操作會阻塞

2.基礎方法

常用的一些方法,整理自 zy__ :

ConcurrentQueue:完全無鎖,但面臨資源競爭失敗時可能會陷入自旋並重試操作。

Enqueue:在隊尾插入元素

TryDequeue:嘗試刪除隊頭元素,並通過out引數返回

TryPeek:嘗試將對頭元素通過out引數返回,但不刪除該元素。

ConcurrentStack:完全無鎖,但面臨資源競爭失敗時可能會陷入自旋並重試操作。

Push:向棧頂插入元素

TryPop:從棧頂彈出元素,並且通過out 引數返回

TryPeek:返回棧頂元素,但不彈出。

ConcurrentBag:一個無序的集合,程式可以向其中插入元素,或刪除元素。在同一個執行緒中向集合插入,刪除元素的效率很高。

Add:向集合中插入元素 

TryTake:從集合中取出元素並刪除 

TryPeek:從集合中取出元素,但不刪除該元素。

BlockingCollection:一個支援界限和阻塞的容器

Add :向容器中插入元素

TryTake:從容器中取出元素並刪除

TryPeek:從容器中取出元素,但不刪除。

CompleteAdding:告訴容器,新增元素完成。此時如果還想繼續新增會發生異常。

IsCompleted:告訴消費執行緒,生產者執行緒還在繼續執行中,任務還未完成。

ConcurrentDictionary對於讀操作是完全無鎖的,當很多執行緒要修改資料時,它會使用細粒度的鎖。

AddOrUpdate:如果鍵不存在,方法會在容器中新增新的鍵和值,如果存在,則更新現有的鍵和值。

GetOrAdd:如果鍵不存在,方法會向容器中新增新的鍵和值,如果存在則返回現有的值,並不新增新值。TryAdd:嘗試在容器中新增新的鍵和值。

TryGetValue:嘗試根據指定的鍵獲得值。

TryRemove:嘗試刪除指定的鍵。

TryUpdate:有條件的更新當前鍵所對應的值。

GetEnumerator:返回一個能夠遍歷整個容器的列舉器。

結語

根據ConcurrentBag編寫執行緒安全的生產者消費者請戳:這裡 。

說實在的寫這篇文章挺煩的,主要涉及的知識點太多講的太細篇幅會很長況且我自己有些也還沒用過,所以是概述性文章,對PFX有個基本的認識,當需要具體深入使用某些知識時再查詢相關文件。

因個人的興趣,所以準備沉澱下來專攻 資料結構和演算法,然後研究 人工智慧(Microsoft的人工智慧平臺Windows ML不會涉及,選擇研究Google的第二代人工智慧學習系統TensorFlow )。

接下來會對LinuxPython進行基礎的學習並更新文章(如果有C#工作經驗那麼Python的學習會非常簡單,個人是這麼認為的,只不過我是在Linux上使用Vim做程式設計,所以還很不適應),但是最核心的還是資料結構&演算法使用那種程式語言並不重要

感興趣的朋友可以關注。

參考文獻

CLR via C#(第4版) Jeffrey Richter

C#高階程式設計(第10版) C# 6 & .NET Core 1.0   Christian Nagel  

果殼中的C# C#5.0權威指南  Joseph Albahari

...