1. 程式人生 > 程式設計 >詳解c# 平行計算

詳解c# 平行計算

平行計算部分

沿用微軟的寫法,System.Threading.Tasks.::.Parallel類,提供對並行迴圈和區域的支援。 我們會用到的方法有For,ForEach,Invoke。

一、簡單使用

首先我們初始化一個List用於迴圈,這裡我們迴圈10次。(後面的程式碼都會按這個標準進行迴圈)

 Program.Data = new List<int>();
 for (int i = 0; i < 10; i++)
 {
 Data.Add(i);
 }

下面我們定義4個方法,分別為for,foreach,並行For,並行ForEach。並測試他們的執行時長。

 /// <summary>
 /// 是否顯示執行過程
 /// </summary>
 public bool ShowProcessExecution = false;
 /// <summary>
 /// 這是普通迴圈for
 /// </summary>
 private void Demo1()
 {
 List<int> data = Program.Data;
 DateTime dt1 = DateTime.Now;
 for (int i = 0; i < data.Count; i++)
 {
 Thread.Sleep(500);
 if (ShowProcessExecution)
 Console.WriteLine(data[i]);
 }
 DateTime dt2 = DateTime.Now;
 Console.WriteLine("普通迴圈For執行時長:{0}毫秒。",(dt2 - dt1).TotalMilliseconds);
 }
 /// <summary>
 /// 這是普通迴圈foreach
 /// </summary>
 private void Demo2()
 {
 List<int> data = Program.Data;
 DateTime dt1 = DateTime.Now;
 foreach (var i in data)
 {
 Thread.Sleep(500);
 if (ShowProcessExecution)
 Console.WriteLine(i);
 }
 DateTime dt2 = DateTime.Now;
 Console.WriteLine("普通迴圈For執行時長:{0}毫秒。",(dt2 - dt1).TotalMilliseconds);
 }
 /// <summary>
 /// 這是平行計算For
 /// </summary>
 private void Demo3()
 {
 List<int> data = Program.Data;
 DateTime dt1 = DateTime.Now;
 Parallel.For(0,data.Count,(i) =>
 {
 Thread.Sleep(500);
 if (ShowProcessExecution)
 Console.WriteLine(data[i]);
 });
 DateTime dt2 = DateTime.Now;
 Console.WriteLine("並行運算For執行時長:{0}毫秒。",(dt2 - dt1).TotalMilliseconds);
 }
 /// <summary>
 /// 這是平行計算ForEach
 /// </summary>
 private void Demo4()
 {
 List<int> data = Program.Data;
 DateTime dt1 = DateTime.Now;
 Parallel.ForEach(data,(i) =>
 {
 Thread.Sleep(500);
 if (ShowProcessExecution)
 Console.WriteLine(i);
 });
 DateTime dt2 = DateTime.Now;
 Console.WriteLine("並行運算ForEach執行時長:{0}毫秒。",(dt2 - dt1).TotalMilliseconds);
 }

下面是執行結果:

詳解c# 平行計算

這裡我們可以看出並行迴圈在執行效率上的優勢了。

結論1:在對一個數組內的每一個項做單獨處理時,完全可以選擇並行迴圈的方式來提升執行效率。

原理1:平行計算的執行緒開啟是緩步開啟的,執行緒數量1,2,4,8緩步提升。(不詳,PLinq最多64個執行緒,可能這也是64)

二、 並行迴圈的中斷和跳出

當在進行迴圈時,偶爾會需要中斷迴圈或跳出迴圈。下面是兩種跳出迴圈的方法Stop和Break,LoopState是迴圈狀態的引數。

 /// <summary>
 /// 中斷Stop
 /// </summary>
 private void Demo5()
 {
 List<int> data = Program.Data;
 Parallel.For(0,(i,LoopState) =>
 {
 if (data[i] > 5)
 LoopState.Stop();
 Thread.Sleep(500);
 Console.WriteLine(data[i]);
 });
 Console.WriteLine("Stop執行結束。");
 }
 /// <summary>
 /// 中斷Break
 /// </summary>
 private void Demo6()
 {
 List<int> data = Program.Data;
 Parallel.ForEach(data,LoopState) =>
 {
 if (i > 5)
 LoopState.Break();
 Thread.Sleep(500);
 Console.WriteLine(i);
 });
 Console.WriteLine("Break執行結束。");
 }

執行結果如下:

詳解c# 平行計算

結論2:使用Stop會立即停止迴圈,使用Break會執行完畢所有符合條件的項。

三、並行迴圈中為陣列/集合新增項

上面的應用場景其實並不是非常多見,畢竟只是為了遍歷一個數組內的資源,我們更多的時候是為了遍歷資源,找到我們所需要的。那麼請繼續看。

下面是我們一般會想到的寫法:

 private void Demo7()
 {
 List<int> data = new List<int>();
 Parallel.For(0,Program.Data.Count,(i) =>
 {
 if (Program.Data[i] % 2 == 0)
 data.Add(Program.Data[i]);
 });
 Console.WriteLine("執行完成For.");
 }
 private void Demo8()
 {
 List<int> data = new List<int>();
 Parallel.ForEach(Program.Data,(i) =>
 {
 if (Program.Data[i] % 2 == 0)
 data.Add(Program.Data[i]);
 });
 Console.WriteLine("執行完成ForEach.");
 }

看起來應該是沒有問題的,但是我們多次執行後會發現,偶爾會出現錯誤如下:

詳解c# 平行計算

這是因為List是非執行緒安全的類,我們需要使用System.Collections.Concurrent名稱空間下的型別來用於並行迴圈體內。

說明
BlockingCollection<T> 為實現 IProducerConsumerCollection<T> 的執行緒安全集合提供阻止和限制功能。
ConcurrentBag<T> 表示物件的執行緒安全的無序集合。
ConcurrentDictionary<TKey,TValue> 表示可由多個執行緒同時訪問的鍵值對的執行緒安全集合。
ConcurrentQueue<T> 表示執行緒安全的先進先出 (FIFO) 集合。
ConcurrentStack<T> 表示執行緒安全的後進先出 (LIFO) 集合。
OrderablePartitioner<TSource> 表示將一個可排序資料來源拆分成多個分割槽的特定方式。
Partitioner 提供針對陣列、列表和可列舉項的常見分割槽策略。
Partitioner<TSource> 表示將一個數據源拆分成多個分割槽的特定方式。

那麼我們上面的程式碼可以修改為,加了了ConcurrentQueue和ConcurrentStack的最基本的操作。

 /// <summary>
 /// 並行迴圈操作集合類,集合內只取5個物件
 /// </summary>
 private void Demo7()
 {
 ConcurrentQueue<int> data = new ConcurrentQueue<int>();
 Parallel.For(0,(i) =>
 {
 if (Program.Data[i] % 2 == 0)
 data.Enqueue(Program.Data[i]);//將物件加入到佇列末尾
 });
 int R;
 while (data.TryDequeue(out R))//返回佇列中開始處的物件
 {
 Console.WriteLine(R);
 }
 Console.WriteLine("執行完成For.");
 }
 /// <summary>
 /// 並行迴圈操作集合類
 /// </summary>
 private void Demo8()
 {
 ConcurrentStack<int> data = new ConcurrentStack<int>();
 Parallel.ForEach(Program.Data,(i) =>
 {
 if (Program.Data[i] % 2 == 0)
 data.Push(Program.Data[i]);//將物件壓入棧中
 });
 int R;
 while (data.TryPop(out R))//彈出棧頂物件
 {
 Console.WriteLine(R);
 }
 Console.WriteLine("執行完成ForEach.");
 }

ok,這裡返回一個序列的問題也解決了。

結論3:在並行迴圈內重複操作的物件,必須要是thread-safe(執行緒安全)的。集合類的執行緒安全物件全部在System.Collections.Concurrent名稱空間下。

四、返回集合運算結果/含有區域性變數的並行迴圈

使用迴圈的時候經常也會用到迭代,那麼在並行迴圈中叫做 含有區域性變數的迴圈 。下面的程式碼中詳細的解釋,這裡就不囉嗦了。

 /// <summary>
 /// 具有執行緒區域性變數的For迴圈
 /// </summary>
 private void Demo9()
 {
 List<int> data = Program.Data;
 long total = 0;
 //這裡定義返回值為long型別方便下面各個引數的解釋
 Parallel.For<long>(0,// For迴圈的起點
 data.Count,// For迴圈的終點
 () => 0,// 初始化區域性變數的方法(long),既為下面的subtotal的初值
 (i,LoopState,subtotal) => // 為每個迭代呼叫一次的委託,i是當前索引,LoopState是迴圈狀態,subtotal為區域性變數名
 {
 subtotal += data[i]; // 修改區域性變數
 return subtotal; // 傳遞引數給下一個迭代
 },(finalResult) => Interlocked.Add(ref total,finalResult) //對每個執行緒結果執行的最後操作,這裡是將所有的結果相加
 );
 Console.WriteLine(total);
 }
 /// <summary>
 /// 具有執行緒區域性變數的ForEach迴圈
 /// </summary>
 private void Demo10()
 {
 List<int> data = Program.Data;
 long total = 0;
 Parallel.ForEach<int,long>(data,// 要迴圈的集合物件
 () => 0,subtotal) => // 為每個迭代呼叫一次的委託,i是當前元素,LoopState是迴圈狀態,subtotal為區域性變數名
 {
 subtotal += i; // 修改區域性變數
 return subtotal; // 傳遞引數給下一個迭代
 },finalResult) //對每個執行緒結果執行的最後操作,這裡是將所有的結果相加
 );
 Console.WriteLine(total);
 }

結論4:並行迴圈中的迭代,確實很傷人。程式碼太難理解了。

五、PLinq(Linq的平行計算)

上面介紹完了For和ForEach的平行計算盛宴,微軟也沒忘記在Linq中加入平行計算。下面介紹Linq中的平行計算。

4.0中在System.Linq名稱空間下加入了下面幾個新的類:

說明
ParallelEnumerable 提供一組用於查詢實現 ParallelQuery{TSource} 的物件的方法。這是 Enumerable 的並行等效項。
ParallelQuery 表示並行序列。
ParallelQuery<TSource> 表示並行序列。

原理2:PLinq最多會開啟64個執行緒

原理3:PLinq會自己判斷是否可以進行平行計算,如果不行則會以順序模式執行。

原理4:PLinq會在昂貴的並行演算法或成本較低的順序演算法之間進行選擇,預設情況下它選擇順序演算法。

在ParallelEnumerable中提供的並行化的方法

ParallelEnumerable 運算子 說明
AsParallel() PLINQ 的入口點。指定如果可能,應並行化查詢的其餘部分。
AsSequential() 指定查詢的其餘部分應像非並行 LINQ 查詢一樣按順序執行。
AsOrdered() 指定 PLINQ 應保留查詢的其餘部分的源序列排序,直到例如通過使用 orderby 子句更改排序為止。
AsUnordered() 指定查詢的其餘部分的 PLINQ 不需要保留源序列的排序。
WithCancellation() 指定 PLINQ 應定期監視請求取消時提供的取消標記和取消執行的狀態。
WithDegreeOfParallelism() 指定 PLINQ 應當用來並行化查詢的處理器的最大數目。
WithMergeOptions() 提供有關 PLINQ 應當如何(如果可能)將並行結果合併回到使用執行緒上的一個序列的提示。
WithExecutionMode() 指定 PLINQ 應當如何並行化查詢(即使預設行為是按順序執行查詢)。
ForAll() 多執行緒列舉方法,與迴圈訪問查詢結果不同,它允許在不首先合併回到使用者執行緒的情況下並行處理結果。
Aggregate() 過載 對於 PLINQ 唯一的過載,它啟用對執行緒本地分割槽的中間聚合以及一個用於合併所有分割槽結果的最終聚合函式。

下面是PLinq的簡單程式碼

 /// <summary>
 /// PLinq簡介
 /// </summary>
 private void Demo11()
 {
 var source = Enumerable.Range(1,10000);
 //查詢結果按source中的順序排序
 var evenNums = from num in source.AsParallel().AsOrdered()
 where num % 2 == 0
 select num;
 //ForAll的使用
 ConcurrentBag<int> concurrentBag = new ConcurrentBag<int>();
 var query = from num in source.AsParallel()
 where num % 10 == 0
 select num;
 query.ForAll((e) => concurrentBag.Add(e * e));
 }

上面程式碼中使用了ForAll,ForAll和foreach的區別如下:

詳解c# 平行計算

以上就是詳解c# 平行計算的詳細內容,更多關於c# 平行計算的資料請關注我們其它相關文章!