多執行緒那點事—Parallel.for
阿新 • • 發佈:2021-01-02
先看段程式碼:
1 for (int i = 0; i < 10; i++) 2 { 3 Task.Factory.StartNew(()=>Console.WriteLine($"{Thread.CurrentThread.ManagedThreadId} ~ {i}")); 4 }
從程式碼上可以看出我們預期是列印1~10,但實際的列印結果是:
1 7 ~ 10 2 4 ~ 10 3 10 ~ 10 4 9 ~ 10 5 4 ~ 10 6 3 ~ 10 7 5 ~ 10 8 9 ~ 10 9 6 ~ 10 10 8 ~ 10
與預期的不一致,我們預期是列印數字1到10,但實際打印出來的是10次10。因為這幾個lambda表示式中使用了同一個變數,並且這些匿名函式共享變數值。
再來看下面這段程式碼:
1 Action<int> displayNumber = n => Console.WriteLine(n); 2 int i = 5; 3 Task taskOne = Task.Factory.StartNew(() => displayNumber(i)); 4 i = 7; 5 Task taskTwo = Task.Factory.StartNew(() => displayNumber(i)); 6 Task.WaitAll(taskOne,taskTwo);
輸出結果:
7 7
當閉包通過lambda表示式捕獲可變變數時,lambda捕獲變數的引用,而不是捕獲該變數的當前值。因此,如果任務在變數的引用值更改後執行,則該值將是記憶體中最新的值,而不是捕獲變數時的值。
為解決該問題,我們引入Parallel類來解決問題:
1 Parallel.For(0,10,i=>Console.WriteLine($"{Thread.CurrentThread.ManagedThreadId} ~ {i}"));
列印結果:
1 1 ~ 0 2 1 ~ 2 3 3 ~ 1 4 3 ~ 4 5 3 ~ 7 6 3 ~ 8 7 3 ~ 9 8 1 ~ 3 9 5 ~ 5 10 4 ~ 6
Parallel 類 提供對並行迴圈和區域的支援, 現在我們看下Parallel.for的程式碼:
1 // this needs to be in try-block because it can throw in BuggyScheduler.MaxConcurrencyLevel 2 rootTask = new ParallelForReplicatingTask( 3 parallelOptions, 4 delegate 5 { 6 // 7 // first thing we do upon enterying the task is to register as a new "RangeWorker" with the 8 // shared RangeManager instance. 9 // 10 // If this call returns a RangeWorker struct which wraps the state needed by this task 11 // 12 // We need to call FindNewWork32() on it to see whether there's a chunk available. 13 // 14 // Cache some information about the current task 15 Task currentWorkerTask = Task.InternalCurrent; 16 bool bIsRootTask = (currentWorkerTask == rootTask); 17 RangeWorker currentWorker = new RangeWorker(); 18 Object savedStateFromPreviousReplica = currentWorkerTask.SavedStateFromPreviousReplica; 19 if (savedStateFromPreviousReplica is RangeWorker) 20 currentWorker = (RangeWorker)savedStateFromPreviousReplica; 21 else 22 currentWorker = rangeManager.RegisterNewWorker(); 23 // These are the local index values to be used in the sequential loop. 24 // Their values filled in by FindNewWork32 25 int nFromInclusiveLocal; 26 int nToExclusiveLocal; 27 if (currentWorker.FindNewWork32(out nFromInclusiveLocal, out nToExclusiveLocal) == false || 28 sharedPStateFlags.ShouldExitLoop(nFromInclusiveLocal) == true) 29 { 30 return; // no need to run 31 } 32 // ETW event for ParallelFor Worker Fork 33 if (TplEtwProvider.Log.IsEnabled()) 34 { 35 TplEtwProvider.Log.ParallelFork((currentWorkerTask != null ? currentWorkerTask.m_taskScheduler.Id : TaskScheduler.Current.Id), (currentWorkerTask != null ? currentWorkerTask.Id : 0), 36 forkJoinContextID); 37 } 38 TLocal localValue = default(TLocal); 39 bool bLocalValueInitialized = false; // Tracks whether localInit ran without exceptions, so that we can skip localFinally if it wasn't 40 try 41 { 42 // Create a new state object that references the shared "stopped" and "exceptional" flags 43 // If needed, it will contain a new instance of thread-local state by invoking the selector. 44 ParallelLoopState32 state = null; 45 if (bodyWithState != null) 46 { 47 Contract.Assert(sharedPStateFlags != null); 48 state = new ParallelLoopState32(sharedPStateFlags); 49 } 50 else if (bodyWithLocal != null) 51 { 52 Contract.Assert(sharedPStateFlags != null); 53 state = new ParallelLoopState32(sharedPStateFlags); 54 if (localInit != null) 55 { 56 localValue = localInit(); 57 bLocalValueInitialized = true; 58 } 59 } 60 // initialize a loop timer which will help us decide whether we should exit early 61 LoopTimer loopTimer = new LoopTimer(rootTask.ActiveChildCount); 62 // Now perform the loop itself. 63 do 64 { 65 if (body != null) 66 { 67 for (int j = nFromInclusiveLocal; 68 j < nToExclusiveLocal && (sharedPStateFlags.LoopStateFlags == ParallelLoopStateFlags.PLS_NONE // fast path check as SEL() doesn't inline 69 || !sharedPStateFlags.ShouldExitLoop()); // the no-arg version is used since we have no state 70 j += 1) 71 { 72 body(j); 73 } 74 } 75 else if (bodyWithState != null) 76 { 77 for (int j = nFromInclusiveLocal; 78 j < nToExclusiveLocal && (sharedPStateFlags.LoopStateFlags == ParallelLoopStateFlags.PLS_NONE // fast path check as SEL() doesn't inline 79 || !sharedPStateFlags.ShouldExitLoop(j)); 80 j += 1) 81 { 82 state.CurrentIteration = j; 83 bodyWithState(j, state); 84 } 85 } 86 else 87 { 88 for (int j = nFromInclusiveLocal; 89 j < nToExclusiveLocal && (sharedPStateFlags.LoopStateFlags == ParallelLoopStateFlags.PLS_NONE // fast path check as SEL() doesn't inline 90 || !sharedPStateFlags.ShouldExitLoop(j)); 91 j += 1) 92 { 93 state.CurrentIteration = j; 94 localValue = bodyWithLocal(j, state, localValue); 95 } 96 } 97 // Cooperative multitasking hack for AppDomain fairness. 98 // Check if allowed loop time is exceeded, if so save current state and return. The self replicating task logic 99 // will detect this, and queue up a replacement task. Note that we don't do this on the root task. 100 if (!bIsRootTask && loopTimer.LimitExceeded()) 101 { 102 currentWorkerTask.SavedStateForNextReplica = (object)currentWorker; 103 break; 104 } 105 } 106 // Exit if we can't find new work, or if the loop was stoppped. 107 while (currentWorker.FindNewWork32(out nFromInclusiveLocal, out nToExclusiveLocal) && 108 ((sharedPStateFlags.LoopStateFlags == ParallelLoopStateFlags.PLS_NONE) || 109 !sharedPStateFlags.ShouldExitLoop(nFromInclusiveLocal))); 110 } 111 catch 112 { 113 // if we catch an exception in a worker, we signal the other workers to exit the loop, and we rethrow 114 sharedPStateFlags.SetExceptional(); 115 throw; 116 } 117 finally 118 { 119 // If a cleanup function was specified, call it. Otherwise, if the type is 120 // IDisposable, we will invoke Dispose on behalf of the user. 121 if (localFinally != null && bLocalValueInitialized) 122 { 123 localFinally(localValue); 124 } 125 // ETW event for ParallelFor Worker Join 126 if (TplEtwProvider.Log.IsEnabled()) 127 { 128 TplEtwProvider.Log.ParallelJoin((currentWorkerTask != null ? currentWorkerTask.m_taskScheduler.Id : TaskScheduler.Current.Id), (currentWorkerTask != null ? currentWorkerTask.Id : 0), 129 forkJoinContextID); 130 } 131 } 132 }, 133 creationOptions, internalOptions); 134 rootTask.RunSynchronously(parallelOptions.EffectiveTaskScheduler); // might throw TSE 135 rootTask.Wait(); 136 // If we made a cancellation registration, we need to clean it up now before observing the OCE 137 // Otherwise we could be caught in the middle of a callback, and observe PLS_STOPPED, but oce = null 138 if (parallelOptions.CancellationToken.CanBeCanceled) 139 { 140 ctr.Dispose(); 141 } 142 // If we got through that with no exceptions, and we were canceled, then 143 // throw our cancellation exception 144 if (oce != null) throw oce;
body對於迭代範圍 (的每個值呼叫一次委託 fromInclusive , toExclusive) 。提供兩個引數:
1、一個 Int32 值,該值表示迭代次數。
2、ParallelLoopState可用於提前中斷迴圈的例項。ParallelLoopState物件是由編譯器建立的; 它不能在使用者程式碼中例項化。
繼續來看:
Parallel.For(0, 10, (i,state) => { if (i > 5) state.Break(); Console.WriteLine($"{Thread.CurrentThread.ManagedThreadId} ~ {i}"); } );
輸出:
1 1 ~ 0 2 1 ~ 1 3 1 ~ 2 4 1 ~ 3 5 1 ~ 4 6 1 ~ 5 7 1 ~ 6
在上面的方法中我們使用了 break方法。
呼叫 Break 方法會通知 for 操作,在當前的迭代之後,無需執行迭代。不過,如果所有迭代尚未執行,則仍必須執行當前的所有迭代。
因此,呼叫 Break 類似於 for c# 等語言中的傳統迴圈內的中斷操作,但它並不是完美的替代方法:例如,無法保證當前的迭代不會執行。
今天就先寫道這裡~
&n