一個簡單的模擬例項說明Task及其排程問題
Task對於.NET的重要性毋庸置疑。通過最近的一些面試經歷,發現很多人對與Task及其排程機制,以及執行緒和執行緒池之間的關係並沒有清晰的認識。本文采用最簡單的方式模擬了Task的實現,旨在說明Task是什麼?它是如何被排程執行的?原始碼從這裡下載。
一、Task(Job)
二、TaskScheduler(JobScheduler)
三、基於執行緒池的排程
四、使用指定執行緒進行排程
五、非同步等待
六、await關鍵字的運用
七、狀態機
一、Task(Job)
Task代表一項具有某種狀態的操作,我們使用如下這個Job型別來模擬Task。Job封裝的操作體現為一個Action委託,狀態則通過JobStatus列舉來表示(對應TaskStatus列舉)。簡單起見,我們僅僅定義了四種狀態(建立、排程、執行和完成)。Invoke方法負責執行封裝的Action委託,並對狀態進行相應設定。
public class Job { private readonly Action _work; public Job(Action work)=> _work = work; public JobStatus Status { get; internal set; } internal protected virtual void Invoke() { Status = JobStatus.Running; _work(); Status = JobStatus.Completed;
} } public
enum JobStatus { Created, Scheduled, Running, Completed }
二、TaskScheduler(JobScheduler)
Task承載的操作通過排程得以執行,具體的排程策略取決於排程器的選擇。Task排程器通過TaskScheduler表示,我們利用如下這個JobScheduler型別對它進行模擬。如下面的程式碼片段所示,我們只為抽象類JobScheduler定義了唯一的QueueJob方法來排程作為引數的Job物件。靜態Current屬性表示當前預設實現的排程器。
public abstract classJobScheduler { public abstract void QueueJob(Job job); public static JobScheduler Current { get; set; } = new ThreadPoolJobScheduler (); }
對於開發者來說,執行Task就是將它提交給排程器,這一操作體現在我們為Job型別定義的靜態Start方法中。該方法通過引數指定具體的排程器,如果沒有顯式指定,預設採用JobScheduler的Current靜態屬性設定的預設排程器。為了方便後面的演示,我們還定義了一個靜態的Run方法,該方法會將指定的Action物件封裝成Job,並呼叫Start方法利用預設的排程器進行排程。
public class Job { private readonly Action _work; public Job(Action work)=> _work = work; public JobStatus Status { get; internal set; } internal protected virtual void Invoke() { Status = JobStatus.Running; _work(); Status = JobStatus.Completed; } public void Start(JobScheduler? scheduler = null) => (scheduler ?? JobScheduler.Current).QueueJob(this); public static Job Run(Action work) { var job = new Job(work); job.Start(); return job; } }
三、基於執行緒池的排程
Task如何執行取決於選擇怎樣的排程器,.NET預設採用基於執行緒池的排程策略,這一策略體現在ThreadPoolTaskScheduler型別上,我們使用如下這個ThreadPoolJobScheduler 進行模擬。如下面的程式碼片段所示,重寫的QueueJob方法通過呼叫ThreadPool.QueueUserWorkItem方法執行指定Job物件封裝的Action委託。JobScheduler的Current屬性設定的預設排程器就是這麼一個ThreadPoolJobScheduler 物件。
public class ThreadPoolJobScheduler : JobScheduler { public override void QueueJob(Job job) { job.Status = JobStatus.Scheduled; var executionContext = ExecutionContext.Capture(); ThreadPool.QueueUserWorkItem(_ => ExecutionContext.Run(executionContext!, _ => job.Invoke(), null)); } }
我們按照如下的方式呼叫Job的靜態Run方法建立並執行了三個Job,每個Job封裝的Action委託在執行的時候會將當前執行緒ID打印出來。
_ = Job.Run(() => Console.WriteLine($"Job1 is excuted in thread {Thread.CurrentThread.ManagedThreadId}")); _ = Job.Run(() => Console.WriteLine($"Job2 is excuted in thread {Thread.CurrentThread.ManagedThreadId}")); _ = Job.Run(() => Console.WriteLine($"Job3 is excuted in thread {Thread.CurrentThread.ManagedThreadId}")); Console.ReadLine();
由於採用預設的基於執行緒池的排程策略,所以三個Job會在三個不同的執行緒上執行。
四、使用指定執行緒進行排程
我們知道.NET程序只有一個全域性的執行緒池,對於一些需要長時間執行且具有較高優先順序的操作,採用基於執行緒池的呼叫未必是好的選擇。比如在一個Web應用中,執行緒池的工作執行緒會被用來處理請求,對於一個需要持續執行的Job可能會因為可用工作執行緒的不足而被阻塞。.NET對於這種情況具有不同的處理方式(啟動Task的時候選擇TaskCreationOptions.LongRunning選項),這裡我們使用自定義排程器的方式來解決這個問題。如下這個DedicatedThreadJobScheduler 利用建立的“專有執行緒”來保證被呼叫的Job能夠“立即”執行。執行緒的數量通過建構函式的引數指定,執行緒在無事可做的時候被“掛起”以及有新的Job被排程時被“復甦”通過一個ManualResetEvent物件來完成。
public class DedicatedThreadJobScheduler : JobScheduler { private readonly Queue<Job>[] _queues; private readonly Thread[] _threads; private readonly ManualResetEvent[] _events; public DedicatedThreadJobScheduler (int threadCount) { _queues = new Queue<Job>[threadCount]; _threads = new Thread[threadCount]; _events = new ManualResetEvent[threadCount]; for (int index = 0; index < threadCount; index++) { var queue = _queues[index] = new Queue<Job>(); var thread = _threads[index] = new Thread(Invoke); _events[index] = new ManualResetEvent(true); thread.Start(index); } void Invoke(object? state) { var index = (int)state!; var @event = _events[index]; while (true) { if (@event.WaitOne()) { while (true) { if (!_queues[index].TryDequeue(out var job)) { Suspend(index); break; } job.Invoke(); } } } } } public override void QueueJob(Job job) { job.Status = JobStatus.Scheduled; var (queue, index) = _queues.Select((queue, index) => (queue, index)).OrderBy(it => it.queue.Count).First(); queue.Enqueue(job); Resume(index); } public void Suspend(int index) => _events[index].Reset(); public void Resume(int index) => _events[index].Set(); }
還是上面演示的程式,這次我們將當前排程器設定為上面這個DedicatedThreadJobScheduler ,並將使用的執行緒數設定為2。
JobScheduler.Current = new DedicatedThreadJobScheduler (2); _ = Job.Run(() => Console.WriteLine($"Job1 is excuted in thread {Thread.CurrentThread.ManagedThreadId}")); _ = Job.Run(() => Console.WriteLine($"Job2 is excuted in thread {Thread.CurrentThread.ManagedThreadId}")); _ = Job.Run(() => Console.WriteLine($"Job3 is excuted in thread {Thread.CurrentThread.ManagedThreadId}")); _ = Job.Run(() => Console.WriteLine($"Job4 is excuted in thread {Thread.CurrentThread.ManagedThreadId}")); _ = Job.Run(() => Console.WriteLine($"Job5 is excuted in thread {Thread.CurrentThread.ManagedThreadId}")); _ = Job.Run(() => Console.WriteLine($"Job6 is excuted in thread {Thread.CurrentThread.ManagedThreadId}")); Console.ReadLine();
我們會發現所有的操作只會在兩個固定的執行緒中被執行。
五、非同步等待
如果需要在某個Task執行之後接著執行後續的操作,我們可以呼叫其ContinueWith方法指定待執行的操作,現在我們將這個方法定義Job型別上。Job與Task的ContinueWith有些差異,在這裡我們認為ContinueWith指定的也是一個Job,那麼多個Job則可以按照預先編排的順序構成一個連結串列。當前Job執行後,只需要將後續這個Job交付給排程器就可以了。如下面的程式碼片段所示,我們利用_continue欄位來表示非同步等待執行的Job,並利用它維持一個Job連結串列。ContinueWith方法會將指定的Action委託封裝成Job並新增到連結串列末端。
public class Job { private readonly Action _work; private Job? _continue; public Job(Action work) => _work = work; public JobStatus Status { get; internal set; } public void Start(JobScheduler? scheduler = null) => (scheduler ?? JobScheduler.Current).QueueJob(this); internal protected virtual void Invoke() { Status = JobStatus.Running; _work(); Status = JobStatus.Completed; _continue?.Start(); } public static Job Run(Action work) { var job = new Job(work); job.Start(); return job; } public Job ContinueWith(Action<Job> continuation) { if (_continue == null) { var job = new Job(() => continuation(this)); _continue = job; } else { _continue.ContinueWith(continuation); } return this; } }
利用ContinueWith方法實現非同步操作的按序執行體現在如下的程式中。
Job.Run(() => { Thread.Sleep(1000); Console.WriteLine("Foo1"); }).ContinueWith(_ => { Thread.Sleep(100); Console.WriteLine("Bar1"); }).ContinueWith(_ => { Thread.Sleep(100); Console.WriteLine("Baz1"); }); Job.Run(() => { Thread.Sleep(100); Console.WriteLine("Foo2"); }).ContinueWith(_ => { Thread.Sleep(10); Console.WriteLine("Bar2"); }).ContinueWith(_ => { Thread.Sleep(10); Console.WriteLine("Baz2"); }); Console.ReadLine();
輸出結果
六、await關鍵字的運用
雖然ContinueWith方法能夠解決“非同步等待”的問題,但是我們更喜歡使用await關鍵字,接下來我們就為Job賦予這個能力。為此我們定義瞭如下這個實現了ICriticalNotifyCompletion介面的JobAwaiter結構體。顧名思義,該介面用來發送操作完成的通知。一個JobAwaiter物件由一個Job物件構建而成,當它自身執行完成之後,OnCompleted方法會被呼叫,我們利用它執行後續的操作。
public struct JobAwaiter: ICriticalNotifyCompletion { private readonly Job _job; public bool IsCompleted => _job.Status == JobStatus.Completed; public JobAwaiter(Job job) { _job = job; if (job.Status == JobStatus.Created) { job.Start(); } } public void OnCompleted(Action continuation) { _job.ContinueWith(_ => continuation()); } public void GetResult() { } public void UnsafeOnCompleted(Action continuation)=>OnCompleted(continuation); }
我們在Job型別上新增這個GetAwaiter方法返回根據自身建立的JobAwaiter物件。
public class Job { private readonly Action _work; private Job? _continue; public Job(Action work) => _work = work; public JobStatus Status { get; internal set; } public void Start(JobScheduler? scheduler = null) => (scheduler ?? JobScheduler.Current).QueueJob(this); internal protected virtual void Invoke() { Status = JobStatus.Running; _work(); Status = JobStatus.Completed; _continue?.Start(); } public static Job Run(Action work) { var job = new Job(work); job.Start(); return job; } public Job ContinueWith(Action<Job> continuation) { if (_continue == null) { var job = new Job(() => continuation(this)); _continue = job; } else { _continue.ContinueWith(continuation); } return this; } public JobAwaiter GetAwaiter() => new(this); }
任何一個型別一旦擁有了這樣一個GetAwaiter方法,我們就能將await關鍵詞應用在對應的物件上面。
await Foo(); await Bar(); await Baz(); Console.ReadLine(); static Job Foo() => new Job(() => { Thread.Sleep(1000); Console.WriteLine("Foo"); }); static Job Bar() => new Job(() => { Thread.Sleep(100); Console.WriteLine("Bar"); }); static Job Baz() => new Job(() => { Thread.Sleep(10); Console.WriteLine("Baz"); });
輸出結果:
七、狀態機
我想你應該知道await關鍵字僅僅是編譯器提供的語法糖,編譯後的程式碼會利用一個“狀態機”實現“非同步等待”的功能,上面這段程式碼最終編譯成如下的形式。值得一提的是,Debug和Release模式編譯出來的程式碼是不同的,下面給出的是Release模式下的編譯結果,上述的狀態機體現為生成的<<Main>$>d__0這個結構體。它的實現其實很簡單:如果個方法出現了N個await關鍵字,它們相當於將整個方法的執行流程切割成N+1段,狀態機的狀態體現為當前應該執行那段,具體的執行體現在MoveNext方法上。GetAwaiter方法返回的ICriticalNotifyCompletion物件用來確定當前操作是否結束,如果結束則可以直接指定後續操作,否則需要呼叫AwaitUnsafeOnCompleted對後續操作進行處理。
// Program using System; using System.Diagnostics; using System.Runtime.CompilerServices; using System.Runtime.InteropServices; using System.Threading.Tasks; using Jobs; [CompilerGenerated] internal class Program { [StructLayout(LayoutKind.Auto)] [CompilerGenerated] private struct <<Main>$>d__0 : IAsyncStateMachine { public int <>1__state; public AsyncTaskMethodBuilder <>t__builder; private JobAwaiter <>u__1; private void MoveNext() { int num = <>1__state; try { JobAwaiter awaiter; switch (num) { default: awaiter = <<Main>$>g__Foo|0_0().GetAwaiter(); if (!awaiter.IsCompleted) { num = (<>1__state = 0); <>u__1 = awaiter; <>t__builder.AwaitUnsafeOnCompleted(ref awaiter, ref this); return; } goto IL_006c; case 0: awaiter = <>u__1; <>u__1 = default(JobAwaiter); num = (<>1__state = -1); goto IL_006c; case 1: awaiter = <>u__1; <>u__1 = default(JobAwaiter); num = (<>1__state = -1); goto IL_00c6; case 2: { awaiter = <>u__1; <>u__1 = default(JobAwaiter); num = (<>1__state = -1); break; } IL_00c6: awaiter.GetResult(); awaiter = <<Main>$>g__Baz|0_2().GetAwaiter(); if (!awaiter.IsCompleted) { num = (<>1__state = 2); <>u__1 = awaiter; <>t__builder.AwaitUnsafeOnCompleted(ref awaiter, ref this); return; } break; IL_006c: awaiter.GetResult(); awaiter = <<Main>$>g__Bar|0_1().GetAwaiter(); if (!awaiter.IsCompleted) { num = (<>1__state = 1); <>u__1 = awaiter; <>t__builder.AwaitUnsafeOnCompleted(ref awaiter, ref this); return; } goto IL_00c6; } awaiter.GetResult(); Console.ReadLine(); } catch (Exception exception) { <>1__state = -2; <>t__builder.SetException(exception); return; } <>1__state = -2; <>t__builder.SetResult(); } void IAsyncStateMachine.MoveNext() { //ILSpy generated this explicit interface implementation from .override directive in MoveNext this.MoveNext(); } [DebuggerHidden] private void SetStateMachine([System.Runtime.CompilerServices.Nullable(1)] IAsyncStateMachine stateMachine) { <>t__builder.SetStateMachine(stateMachine); } void IAsyncStateMachine.SetStateMachine([System.Runtime.CompilerServices.Nullable(1)] IAsyncStateMachine stateMachine) { //ILSpy generated this explicit interface implementation from .override directive in SetStateMachine this.SetStateMachine(stateMachine); } } [AsyncStateMachine(typeof(<<Main>$>d__0))] private static Task <Main>$(string[] args) { <<Main>$>d__0 stateMachine = default(<<Main>$>d__0); stateMachine.<>t__builder = AsyncTaskMethodBuilder.Create(); stateMachine.<>1__state = -1; stateMachine.<>t__builder.Start(ref stateMachine); return stateMachine.<>t__builder.Task; } [SpecialName] private static void <Main>(string[] args) { <Main>$(args).GetAwaiter().GetResult(); } }
上面提到過,編譯器生成的狀態機程式碼在Debug和Release模式是不一樣的。在Release模式下狀態機是一個結構體,雖然是以介面ICriticalNotifyCompletion的方式使用它,但是由於使用了ref關鍵字,所以不會涉及裝箱,所以不會對GC造成任何影響。但是Debug模式下生成的狀態機則是一個類(如下所示),將會涉及針對堆記憶體的分配和回收。對於遍佈await關鍵字的應用程式,兩者之間的效能差異肯定是不同的。實際上針對Task的很多優化策略,比如使用ValueTask,對某些Task<T>物件(比如狀態為Completed的Task<bool>物件)的複用,以及使用IValueTaskSource等,都是為了解決記憶體分配的問題。
// Program using System; using System.Diagnostics; using System.Runtime.CompilerServices; using System.Threading.Tasks; using Jobs; [CompilerGenerated] internal class Program { [CompilerGenerated] private sealed class <<Main>$>d__0 : IAsyncStateMachine { public int <>1__state; public AsyncTaskMethodBuilder <>t__builder; public string[] args; private JobAwaiter <>u__1; private void MoveNext() { int num = <>1__state; try { JobAwaiter awaiter3; JobAwaiter awaiter2; JobAwaiter awaiter; switch (num) { default: awaiter3 = <<Main>$>g__Foo|0_0().GetAwaiter(); if (!awaiter3.IsCompleted) { num = (<>1__state = 0); <>u__1 = awaiter3; <<Main>$>d__0 stateMachine = this; <>t__builder.AwaitUnsafeOnCompleted(ref awaiter3, ref stateMachine); return; } goto IL_007e; case 0: awaiter3 = <>u__1; <>u__1 = default(JobAwaiter); num = (<>1__state = -1); goto IL_007e; case 1: awaiter2 = <>u__1; <>u__1 = default(JobAwaiter); num = (<>1__state = -1); goto IL_00dd; case 2: { awaiter = <>u__1; <>u__1 = default(JobAwaiter); num = (<>1__state = -1); break; } IL_00dd: awaiter2.GetResult(); awaiter = <<Main>$>g__Baz|0_2().GetAwaiter(); if (!awaiter.IsCompleted) { num = (<>1__state = 2); <>u__1 = awaiter; <<Main>$>d__0 stateMachine = this; <>t__builder.AwaitUnsafeOnCompleted(ref awaiter, ref stateMachine); return; } break; IL_007e: awaiter3.GetResult(); awaiter2 = <<Main>$>g__Bar|0_1().GetAwaiter(); if (!awaiter2.IsCompleted) { num = (<>1__state = 1); <>u__1 = awaiter2; <<Main>$>d__0 stateMachine = this; <>t__builder.AwaitUnsafeOnCompleted(ref awaiter2, ref stateMachine); return; } goto IL_00dd; } awaiter.GetResult(); Console.ReadLine(); } catch (Exception exception) { <>1__state = -2; <>t__builder.SetException(exception); return; } <>1__state = -2; <>t__builder.SetResult(); } void IAsyncStateMachine.MoveNext() { //ILSpy generated this explicit interface implementation from .override directive in MoveNext this.MoveNext(); } [DebuggerHidden] private void SetStateMachine([System.Runtime.CompilerServices.Nullable(1)] IAsyncStateMachine stateMachine) { } void IAsyncStateMachine.SetStateMachine([System.Runtime.CompilerServices.Nullable(1)] IAsyncStateMachine stateMachine) { //ILSpy generated this explicit interface implementation from .override directive in SetStateMachine this.SetStateMachine(stateMachine); } } [AsyncStateMachine(typeof(<<Main>$>d__0))] [DebuggerStepThrough] private static Task <Main>$(string[] args) { <<Main>$>d__0 stateMachine = new <<Main>$>d__0(); stateMachine.<>t__builder = AsyncTaskMethodBuilder.Create(); stateMachine.args = args; stateMachine.<>1__state = -1; stateMachine.<>t__builder.Start(ref stateMachine); return stateMachine.<>t__builder.Task; } [SpecialName] [DebuggerStepThrough] private static void <Main>(string[] args) { <Main>$(args).GetAwaiter().GetResult(); } }