1. 程式人生 > >.Net並行程式設計(一)-TPL之資料並行

.Net並行程式設計(一)-TPL之資料並行

前言

許多個人計算機和工作站都有多個CPU核心,可以同時執行多個執行緒。利用硬體的特性,使用並行化程式碼以在多個處理器之間分配工作。.NetFramework並行程式設計架構圖

應用場景

  • 檔案批量上傳

並行上傳單個檔案。也可以把一個檔案拆成幾段分開上傳,加快上傳速度。

  • 資料分批計算

如幾百萬資料可以拆成許多無關聯的部分,平行計算處理。最後聚合。

  • 資料推送

也是需要將資料拆解後,並行推送。

任務並行庫-資料並行

如果在一個迴圈內在每次迭代只執行少量工作或者它沒有執行多次迭代,那麼並行化的開銷可能會導致程式碼執行的更慢。使用並行之前,應該對執行緒(鎖,死鎖,競爭條件)應該有基本的瞭解。

Parallel.For

        /// <summary>
        /// 正常迴圈
        /// </summary>
        public void FormalDirRun()
        {
            long totalSize = 0;
            var dir = @"E:\LearnWall\orleans";//args[1];
            String[] files = Directory.GetFiles(dir);
            stopwatch.Restart();
            for (var i = 0; i < files.Length; i++)
            {
                FileInfo fi = new FileInfo(files[i]);
                long size = fi.Length;
                Interlocked.Add(ref totalSize, size);
            }
            stopwatch.Stop();
            Console.WriteLine($"FormalDirRun------{files.Length} files, {totalSize} bytes,time:{stopwatch.ElapsedMilliseconds},Dir:{dir}");
        }
        /// <summary>
        /// 並行迴圈
        /// </summary>
        public void ParallelForDirRun()
        {
            long totalSize = 0;
            var dir = @"E:\LearnWall\orleans";//args[1];
            String[] files = Directory.GetFiles(dir);
            stopwatch.Restart();
            Parallel.For(0, files.Length,
                         index =>
                         {
                             FileInfo fi = new FileInfo(files[index]);
                             long size = fi.Length;
                             Interlocked.Add(ref totalSize, size);
                         });
            stopwatch.Stop();
            Console.WriteLine($"ParallelForDirRun-{files.Length} files, {totalSize} bytes,time:{stopwatch.ElapsedMilliseconds},Dir:{dir}");
        }
        

從下圖對比介面可以看出當迴圈體內方法執行時間很短時,並行時間反而更長。這塊會有更細緻的補充。

FormalDirRun------20 files, 255618 bytes,time:0,Dir:E:\LearnWall\orleans
ParallelForDirRun-20 files, 255618 bytes,time:6,Dir:E:\LearnWall\orleans

我們追加一些延時操作如Thread.Sleep,但這應該不是好好例子...但我只想演示效果就行了。

Thread.Sleep(1000);

檢視結果得到,當方法內有阻塞延時一秒後,兩者速度錯了七倍。

FormalDirRun------20 files, 255618 bytes,time:20011,Dir:E:\LearnWall\orleans
ParallelForDirRun-20 files, 255618 bytes,time:3007,Dir:E:\LearnWall\orleans

Parallel.ForEach

為了並行速度的最大化,我們應該儘量減少在並行內對共享資源的訪問,如Console.Write,檔案日誌等...但這裡為了顯示效果,就用了。

 public void ParallelForEachDirRun()
        {
            long totalSize = 0;
            var dir = @"E:\LearnWall\orleans";//args[1];
            String[] files = Directory.GetFiles(dir);
            stopwatch.Restart();
            Parallel.ForEach(files, (current) =>
            {
                FileInfo fi = new FileInfo(current);
                long size = fi.Length;
                Interlocked.Add(ref totalSize, size);
                Console.WriteLine($"name:{fi.Name}");
            });
            stopwatch.Stop();
            Console.WriteLine($"ParallelForEachDirRun-{files.Length} files, {totalSize} bytes,Time:{stopwatch.ElapsedMilliseconds}");
        }        
name:.gitignore
name:build.sh
.
.
.
name:TestAll.cmd
ParallelForEachDirRun-20 files, 255618 bytes,Time:17

Parallel.For 執行緒區域性變數

  public void ParallelForForThreadLocalVariables()
        {
            int[] nums = Enumerable.Range(0, 1000000).ToArray();
            long total = 0;

            // Use type parameter to make subtotal a long, not an int
            Parallel.For<long>(0, nums.Length, () => 0, (j,loop, subtotal) =>
            {
                subtotal += nums[j];
                return subtotal;
            },
                (x) => Interlocked.Add(ref total, x)
            );

            Console.WriteLine("The total is {0:N0}", total);
            Console.WriteLine("Press any key to exit");
            Console.ReadKey();
        }

結果如下:

The total is 499,999,509,000

每個For方法的前兩個引數指定開始和結束迭代值。在此方法的過載中,第三個引數是初始化本地狀態的位置。在此上下文中,本地狀態表示一個變數,其生命週期從當前執行緒上的迴圈的第一次迭代之前延伸到最後一次迭代之後。

第三個引數的型別是Func ,其中TResult是將儲存執行緒本地狀態的變數的型別。它的型別由呼叫泛型For (Int32,Int32,Func ,Func ,Action )方法時提供的泛型型別引數定義,在這種情況下是Int64。type引數告訴編譯器將用於儲存執行緒區域性狀態的臨時變數的型別。在此示例中,表示式() => 0(或Function() 0在Visual Basic中)將執行緒區域性變數初始化為零。如果泛型型別引數是引用型別或使用者定義的值型別,則表示式如下所示:

() => new MyClass()  

這塊內容比較繁瑣,一句話來說:前兩個引數是開始和結束值,第三個是根據For泛型而初始化的值。我其實也沒看太懂這塊。.net Framework原始碼如下,.netcore的不知道:

 public static ParallelLoopResult For<TLocal>(
            int fromInclusive, int toExclusive,
            Func<TLocal> localInit,
            Func<int, ParallelLoopState, TLocal, TLocal> body,
            Action<TLocal> localFinally)
        {
            if (body == null)
            {
                throw new ArgumentNullException("body");
            }
            if (localInit == null)
            {
                throw new ArgumentNullException("localInit");
            }
            if (localFinally == null)
            {
                throw new ArgumentNullException("localFinally");
            }
 
            return ForWorker(
                fromInclusive, toExclusive, s_defaultParallelOptions,
                null, null, body, localInit, localFinally);
        }
        
        /// </summary>
        /// <typeparam name="TLocal">本地資料的型別.</typeparam>
        /// <param name="fromInclusive">迴圈開始數</param>
        /// <param name="toExclusive">迴圈結束數</param>
        /// <param name="parallelOptions">選項</param>
        /// <param name="body">迴圈執行體</param>
        /// <param name="bodyWithState">ParallelState的迴圈體過載。</param>
        /// <param name="bodyWithLocal">執行緒區域性狀態的迴圈體過載。</param>
        /// <param name="localInit">一個返回新執行緒本地狀態的選擇器函式。</param>
        /// <param name="localFinally">清理執行緒本地狀態的清理函式。</param>
        /// <remarks>只能提供一個身體引數(即它們是獨佔的)。</remarks>
        /// <returns>A <see cref="T:System.Threading.Tasks.ParallelLoopResult"/> structure.</returns>
        private static ParallelLoopResult ForWorker<TLocal>(
            int fromInclusive, int toExclusive,
            ParallelOptions parallelOptions,
            Action<int> body,
            Action<int, ParallelLoopState> bodyWithState,
            Func<int, ParallelLoopState, TLocal, TLocal> bodyWithLocal,
            Func<TLocal> localInit, Action<TLocal> localFinally)
        {
        .
        .
        .
        }

Parallel.ForEach執行緒區域性變數

       /// <summary>
        /// 
        /// </summary>
        public void ParallelForEachThreadLocalVariables()
        {
            int[] nums = Enumerable.Range(0, 1000000).ToArray();
            long total = 0;

            // First type parameter is the type of the source elements
            // Second type parameter is the type of the thread-local variable (partition subtotal)
            Parallel.ForEach<int, long>(nums, // source collection
                                        () => 0, // method to initialize the local variable
                                        (j, loop, subtotal) => // method invoked by the loop on each iteration
                                     {
                                         subtotal += j; //modify local variable
                                         return subtotal; // value to be passed to next iteration
                                     },
                                        // Method to be executed when each partition has completed.
                                        // finalResult is the final value of subtotal for a particular partition.
                                        (finalResult) => Interlocked.Add(ref total, finalResult)
                                        );

            Console.WriteLine("The total from Parallel.ForEach is {0:N0}", total);
        }

ForEach的原始碼如下

        /// <summary>
        /// Executes a for each operation on an <see cref="T:System.Collections.IEnumerable{TSource}"/> 
        /// in which iterations may run in parallel.
        /// </summary>
        /// <typeparam name="TSource">The type of the data in the source.</typeparam>
        /// <param name="source">An enumerable data source.</param>
        /// <param name="body">The delegate that is invoked once per iteration.</param>
        /// <exception cref="T:System.ArgumentNullException">The exception that is thrown when the <paramref name="source"/> 
        /// argument is null.</exception>
        /// <exception cref="T:System.ArgumentNullException">The exception that is thrown when the <paramref name="body"/> 
        /// argument is null.</exception>
        /// <exception cref="T:System.AggregateException">The exception that is thrown to contain an exception
        /// thrown from one of the specified delegates.</exception>
        /// <returns>A <see cref="T:System.Threading.Tasks.ParallelLoopResult">ParallelLoopResult</see> structure
        /// that contains information on what portion of the loop completed.</returns>
        /// <remarks>
        /// The <paramref name="body"/> delegate is invoked once for each element in the <paramref name="source"/> 
        /// enumerable.  It is provided with the current element as a parameter.
        /// </remarks>
        public static ParallelLoopResult ForEach<TSource>(IEnumerable<TSource> source, Action<TSource> body)
        {
            if (source == null)
            {
                throw new ArgumentNullException("source");
            }
            if (body == null)
            {
                throw new ArgumentNullException("body");
            }
 
            return ForEachWorker<TSource, object>(
                source, s_defaultParallelOptions, body, null, null, null, null, null, null);
        }

取消 Parallel.ForEach或Parallel.For

通過CancellationTokenSource來獲取token

CancellationTokenSource cts = new CancellationTokenSource();

通過ParallelOptions.CancellationToken屬性來控制取消狀態。

ParallelOptions po = new ParallelOptions();

po.CancellationToken = cts.Token;

通過Parallel.For或Foreach的ParallelOptions值來控制並行內方法的取消。

程式碼如下:

 int[] nums = Enumerable.Range(0, 10000000).ToArray();
            CancellationTokenSource cts = new CancellationTokenSource();

            // Use ParallelOptions instance to store the CancellationToken
            ParallelOptions po = new ParallelOptions();
            po.CancellationToken = cts.Token;
            po.MaxDegreeOfParallelism = System.Environment.ProcessorCount;
            Console.WriteLine("Press any key to start. Press 'c' to cancel.");
            Console.ReadKey();

            // Run a task so that we can cancel from another thread.
            Task.Factory.StartNew(() =>
            {
                var s = Console.ReadKey().KeyChar;
                if (s == 'c')
                    cts.Cancel();
                Console.WriteLine("press any key to exit111");
            });

            try
            {
                Parallel.ForEach(nums, po, (num) =>
                {
                    double d = Math.Sqrt(num);
                    Console.WriteLine("{0} on {1}", d, Thread.CurrentThread.ManagedThreadId);
                    po.CancellationToken.ThrowIfCancellationRequested();
                });
            }
            catch (OperationCanceledException e)
            {
                Console.WriteLine(e.Message);
            }
            finally
            {
                cts.Dispose();
            }

            Console.ReadKey();

執行結果如下,鍵盤輸入c時,並行取消。

1937.41838537782 on 7
2739.95711645274 on 8
2501.40660429287 on 9
2958.47798707376 on 10
.
.
.
press any key to exit111
The operation was canceled.

捕獲並行體內的異常

示例方法採用ConcurrentQueue來接收異常集合,最後丟擲一個聚合異常AggregateException。

var exceptions = new ConcurrentQueue();

exceptions.Enqueue(e);

這為我以後捕獲異常提供了一個好思路。

        /// <summary>
        /// 捕獲並行體內的異常
        /// </summary>
        public void HandleExceptionParallelLoop()
        {
            // Create some random data to process in parallel.
            // There is a good probability this data will cause some exceptions to be thrown.
            byte[] data = new byte[5000];
            Random r = new Random();
            r.NextBytes(data);

            try
            {
                ProcessDataInParallel(data);
            }
            catch (AggregateException ae)
            {
                var ignoredExceptions = new List<Exception>();
                // This is where you can choose which exceptions to handle.
                foreach (var ex in ae.Flatten().InnerExceptions)
                {
                    if (ex is ArgumentException)
                        Console.WriteLine(ex.Message);
                    else
                        ignoredExceptions.Add(ex);
                }
                if (ignoredExceptions.Count > 0) throw new AggregateException(ignoredExceptions);
            }

            Console.WriteLine("Press any key to exit.");
            Console.ReadKey();
        }
        private  void ProcessDataInParallel(byte[] data)
        {
            // Use ConcurrentQueue to enable safe enqueueing from multiple threads.
            var exceptions = new ConcurrentQueue<Exception>();

            // Execute the complete loop and capture all exceptions.
            Parallel.ForEach(data, d =>
            {
                try
                {
                    // Cause a few exceptions, but not too many.
                    if (d < 3)
                        throw new ArgumentException($"Value is {d}. Value must be greater than or equal to 3.");
                    else
                        Console.Write(d + " ");
                }
                // Store the exception and continue with the loop.                    
                catch (Exception e)
                {
                    exceptions.Enqueue(e);
                }
            });
            Console.WriteLine();

            // Throw the exceptions here after the loop completes.
            if (exceptions.Count > 0) throw new AggregateException(exceptions);
        }

對微小執行體提速

當Parallel.For迴圈有一個很快的執行體,它可能比同等順序迴圈執行更慢。較慢的效能是由分割槽資料所涉及的開銷和每次迴圈迭代呼叫委託的成本引起的。為了解決這種情況,Partitioner類提供了Partitioner.Create方法,該方法使您能夠為委託主體提供順序迴圈,以便每個分割槽僅呼叫一次委託,而不是每次迭代呼叫一次。

var rangePartitioner = Partitioner.Create(0, source.Length);

        /// <summary>
        /// 提速
        /// </summary>
        public void SpeedUpMicroParallelBody() {
            // Source must be array or IList.
            var source = Enumerable.Range(0, 100000).ToArray();

            // Partition the entire source array.
            var rangePartitioner = Partitioner.Create(0, source.Length);

            double[] results = new double[source.Length];

            // Loop over the partitions in parallel.
            Parallel.ForEach(rangePartitioner, (range, loopState) =>
            {
                // Loop over each range element without a delegate invocation.
                for (int i = range.Item1; i < range.Item2; i++)
                {
                    results[i] = source[i] * Math.PI;
                }
            });

            Console.WriteLine("Operation complete. Print results? y/n");
            char input = Console.ReadKey().KeyChar;
            if (input == 'y' || input == 'Y')
            {
                foreach (double d in results)
                {
                    Console.Write("{0} ", d);
                }
            }
        }

原始碼地址

總結

本篇文章沿著微軟官方文件步驟熟悉了第一部分資料並行的用法。

Parallel.For和Parallel.ForEach實現並行。

Parallel.For和Parallel.ForEach執行緒區域性變數。

取消並行ParallelOptions.CancellationToken

捕捉異常ConcurrentQueue累加並行體內的異常,外部接收。

加速Partitioner.Create

感謝觀看!