多執行緒那點事—Parallel.for
阿新 • • 發佈:2021-01-03
先看段程式碼:
for (int i = 0; i < 10; i++)
{
Task.Factory.StartNew(()=>Console.WriteLine($"{Thread.CurrentThread.ManagedThreadId} ~ {i}"));
}
從程式碼上可以看出我們預期是列印1~10,但實際的列印結果是:
7 ~ 10
4 ~ 10
10 ~ 10
9 ~ 10
4 ~ 10
3 ~ 10
5 ~ 10
9 ~ 10
6 ~ 10
8 ~ 10
與預期的不一致,我們預期是列印數字1到10,但實際打印出來的是10次10。因為這幾個lambda表示式中使用了同一個變數,並且這些匿名函式共享變數值。
再來看下面這段程式碼:
Action<int> displayNumber = n => Console.WriteLine(n);
int i = 5;
Task taskOne = Task.Factory.StartNew(() => displayNumber(i));
i = 7;
Task taskTwo = Task.Factory.StartNew(() => displayNumber(i));
Task.WaitAll(taskOne,taskTwo);
輸出結果:
7
7
當閉包通過lambda表示式捕獲可變變數時,lambda捕獲變數的引用,而不是捕獲該變數的當前值。因此,如果任務在變數的引用值更改後執行,則該值將是記憶體中最新的值,而不是捕獲變數時的值。
為解決該問題,我們引入Parallel類來解決問題:
Parallel.For(0,10,i=>Console.WriteLine($"{Thread.CurrentThread.ManagedThreadId} ~ {i}"));
列印結果:
1 ~ 0
1 ~ 2
3 ~ 1
3 ~ 4
3 ~ 7
3 ~ 8
3 ~ 9
1 ~ 3
5 ~ 5
4 ~ 6
Parallel 類 提供對並行迴圈和區域的支援, 現在我們看下Parallel.for的程式碼:
// this needs to be in try-block because it can throw in BuggyScheduler.MaxConcurrencyLevel
rootTask = new ParallelForReplicatingTask(
parallelOptions,
delegate
{
//
// first thing we do upon enterying the task is to register as a new "RangeWorker" with the
// shared RangeManager instance.
//
// If this call returns a RangeWorker struct which wraps the state needed by this task
//
// We need to call FindNewWork32() on it to see whether there's a chunk available.
//
// Cache some information about the current task
Task currentWorkerTask = Task.InternalCurrent;
bool bIsRootTask = (currentWorkerTask == rootTask);
RangeWorker currentWorker = new RangeWorker();
Object savedStateFromPreviousReplica = currentWorkerTask.SavedStateFromPreviousReplica;
if (savedStateFromPreviousReplica is RangeWorker)
currentWorker = (RangeWorker)savedStateFromPreviousReplica;
else
currentWorker = rangeManager.RegisterNewWorker();
// These are the local index values to be used in the sequential loop.
// Their values filled in by FindNewWork32
int nFromInclusiveLocal;
int nToExclusiveLocal;
if (currentWorker.FindNewWork32(out nFromInclusiveLocal, out nToExclusiveLocal) == false ||
sharedPStateFlags.ShouldExitLoop(nFromInclusiveLocal) == true)
{
return; // no need to run
}
// ETW event for ParallelFor Worker Fork
if (TplEtwProvider.Log.IsEnabled())
{
TplEtwProvider.Log.ParallelFork((currentWorkerTask != null ? currentWorkerTask.m_taskScheduler.Id : TaskScheduler.Current.Id), (currentWorkerTask != null ? currentWorkerTask.Id : 0),
forkJoinContextID);
}
TLocal localValue = default(TLocal);
bool bLocalValueInitialized = false; // Tracks whether localInit ran without exceptions, so that we can skip localFinally if it wasn't
try
{
// Create a new state object that references the shared "stopped" and "exceptional" flags
// If needed, it will contain a new instance of thread-local state by invoking the selector.
ParallelLoopState32 state = null;
if (bodyWithState != null)
{
Contract.Assert(sharedPStateFlags != null);
state = new ParallelLoopState32(sharedPStateFlags);
}
else if (bodyWithLocal != null)
{
Contract.Assert(sharedPStateFlags != null);
state = new ParallelLoopState32(sharedPStateFlags);
if (localInit != null)
{
localValue = localInit();
bLocalValueInitialized = true;
}
}
// initialize a loop timer which will help us decide whether we should exit early
LoopTimer loopTimer = new LoopTimer(rootTask.ActiveChildCount);
// Now perform the loop itself.
do
{
if (body != null)
{
for (int j = nFromInclusiveLocal;
j < nToExclusiveLocal && (sharedPStateFlags.LoopStateFlags == ParallelLoopStateFlags.PLS_NONE // fast path check as SEL() doesn't inline
|| !sharedPStateFlags.ShouldExitLoop()); // the no-arg version is used since we have no state
j += 1)
{
body(j);
}
}
else if (bodyWithState != null)
{
for (int j = nFromInclusiveLocal;
j < nToExclusiveLocal && (sharedPStateFlags.LoopStateFlags == ParallelLoopStateFlags.PLS_NONE // fast path check as SEL() doesn't inline
|| !sharedPStateFlags.ShouldExitLoop(j));
j += 1)
{
state.CurrentIteration = j;
bodyWithState(j, state);
}
}
else
{
for (int j = nFromInclusiveLocal;
j < nToExclusiveLocal && (sharedPStateFlags.LoopStateFlags == ParallelLoopStateFlags.PLS_NONE // fast path check as SEL() doesn't inline
|| !sharedPStateFlags.ShouldExitLoop(j));
j += 1)
{
state.CurrentIteration = j;
localValue = bodyWithLocal(j, state, localValue);
}
}
// Cooperative multitasking hack for AppDomain fairness.
// Check if allowed loop time is exceeded, if so save current state and return. The self replicating task logic
// will detect this, and queue up a replacement task. Note that we don't do this on the root task.
if (!bIsRootTask && loopTimer.LimitExceeded())
{
currentWorkerTask.SavedStateForNextReplica = (object)currentWorker;
break;
}
}
// Exit if we can't find new work, or if the loop was stoppped.
while (currentWorker.FindNewWork32(out nFromInclusiveLocal, out nToExclusiveLocal) &&
((sharedPStateFlags.LoopStateFlags == ParallelLoopStateFlags.PLS_NONE) ||
!sharedPStateFlags.ShouldExitLoop(nFromInclusiveLocal)));
}
catch
{
// if we catch an exception in a worker, we signal the other workers to exit the loop, and we rethrow
sharedPStateFlags.SetExceptional();
throw;
}
finally
{
// If a cleanup function was specified, call it. Otherwise, if the type is
// IDisposable, we will invoke Dispose on behalf of the user.
if (localFinally != null && bLocalValueInitialized)
{
localFinally(localValue);
}
// ETW event for ParallelFor Worker Join
if (TplEtwProvider.Log.IsEnabled())
{
TplEtwProvider.Log.ParallelJoin((currentWorkerTask != null ? currentWorkerTask.m_taskScheduler.Id : TaskScheduler.Current.Id), (currentWorkerTask != null ? currentWorkerTask.Id : 0),
forkJoinContextID);
}
}
},
creationOptions, internalOptions);
rootTask.RunSynchronously(parallelOptions.EffectiveTaskScheduler); // might throw TSE
rootTask.Wait();
// If we made a cancellation registration, we need to clean it up now before observing the OCE
// Otherwise we could be caught in the middle of a callback, and observe PLS_STOPPED, but oce = null
if (parallelOptions.CancellationToken.CanBeCanceled)
{
ctr.Dispose();
}
// If we got through that with no exceptions, and we were canceled, then
// throw our cancellation exception
if (oce != null) throw oce;
body對於迭代範圍 (的每個值呼叫一次委託 fromInclusive , toExclusive) 。提供兩個引數:
1、一個 Int32 值,該值表示迭代次數。
2、ParallelLoopState可用於提前中斷迴圈的例項。ParallelLoopState物件是由編譯器建立的; 它不能在使用者程式碼中例項化。
繼續來看:
Parallel.For(0, 10, (i,state) =>
{
if (i > 5)
state.Break();
Console.WriteLine($"{Thread.CurrentThread.ManagedThreadId} ~ {i}");
} );
輸出:
1 ~ 0
1 ~ 1
1 ~ 2
1 ~ 3
1 ~ 4
1 ~ 5
1 ~ 6
在上面的方法中我們使用了 break方法。
呼叫 Break 方法會通知 for 操作,在當前的迭代之後,無需執行迭代。不過,如果所有迭代尚未執行,則仍必須執行當前的所有迭代。
因此,呼叫 Break 類似於 for c# 等語言中的傳統迴圈內的中斷操作,但它並不是完美的替代方法:例如,無法保證當前的迭代不會執行。
今天就先寫道這裡~