從執行上下文角度重新理解.NET(Core)的多執行緒程式設計[1]:基於呼叫鏈的”引數”傳遞
執行緒是作業系統能夠進行運算排程的最小單位,作業系統執行緒進一步被封裝成託管的Thread物件,手工建立並管理Thread物件已經成為了所能做到的對執行緒最細粒度的控制了。後來我們有了ThreadPool,可以更加方便地以池化的方式來使用執行緒。最後,Task誕生,它結合async/await關鍵字給與我們完美非同步程式設計模式。但這一切讓我們的程式設計體驗越來越好,但是離執行緒的本質越來越遠。被系列文章從“執行上下文傳播”這個令開發者相對熟悉的角度來聊聊重新認識我們似乎已經很熟悉的主題。
目錄
一、ThreadStatic欄位或者ThreadLocal<T>物件
二、CallContext
三、支援跨執行緒傳遞嗎?
四、IllogicalCallContext和LogicalCallContext
五、AsyncLocal<T>
一、ThreadStatic欄位或者ThreadLocal<T>物件
本篇文章旨在解決一個問題:對於一個由多個方法組成的呼叫鏈,資料如何在上下游方法之間傳遞。我想很多人首先想到的就是通過方法的引數進行傳遞,但是作為方法簽名重要組成部分的引數列表代表一種“契約”,往往是不能輕易更改的。既然不能通過引數直接進行傳遞,那麼我們需要一個“共享”的資料容器,上游方法將需要傳遞的資料放到這個容器中,下游方法在使用的時候從該容器中將所需的資料提取出來。
那麼這個共享的容器可以是一個靜態欄位,當然不行, 因為型別的靜態欄位類似於一個單例物件,它會被多個併發執行的呼叫鏈共享。雖然普通的靜態欄位不行,但是標註了ThreadStaticAttribute特性的靜態欄位則可以,因為這樣的欄位是執行緒獨享的。為了方便演示,我們定義瞭如下一個CallStackContext型別來表示基於某個呼叫鏈的上下文,這是一個字典,用於存放任何需要傳遞的資料。自增的TraceId欄位程式碼當前呼叫鏈的唯一標識。當前的CallStackContext上下文通過靜態屬性Current獲取,可以看出它返回標註了ThreadStaticAttribute特性的靜態欄位_current。
public class CallStackContext : Dictionary<string, object> { [ThreadStatic] private static CallStackContext _current; private static int _traceId = 0; public static CallStackContext Current { get => _current; set => _current = value; } public long TraceId { get; } = Interlocked.Increment(ref _traceId); }
我們通過如下這個CallStack物件建立一個“邏輯”上的呼叫鏈。在初始化的時候,CallStack會建立一個CallStackContext物件並將其放進CallContext物件並對靜態欄位_current進行復制。該欄位會在Dispose方法中被置空,此時標誌邏輯呼叫鏈生命週期的終止。
public class CallStack : IDisposable { public CallStack() => CallStackContext.Current = new CallStackContext(); public void Dispose() => CallStackContext.Current = null; }
我們通過如下的程式來演示針對CallStack和CallStackContext的使用。如程式碼片段所示,我們利用物件池併發呼叫Call方法。Call方法內部會依次呼叫Foo、Bar和Baz三個方法,需要傳遞的資料體現為一個Guid,我們將當存放在當前CallStackContext中。整個方法Call方法的操作均在建立Callback的using block中執行。
class Program { static void Main() { for (int i = 0; i < 5; i++) { ThreadPool.QueueUserWorkItem(_ => Call()); } Console.Read(); } static void Call() { using (new CallStack()) { CallStackContext.Current["argument"] = Guid.NewGuid(); Foo(); Bar(); Baz(); } } static void Foo() => Trace(); static void Bar() => Trace(); static void Baz() => Trace(); static void Trace([CallerMemberName] string methodName = null) { var threadId = Thread.CurrentThread.ManagedThreadId; var traceId = CallStackContext.Current?.TraceId; var argument = CallStackContext.Current?["argument"]; Console.WriteLine($"Thread: {threadId}; TraceId: {traceId}; Method: {methodName}; Argument:{argument}"); } }
為了驗證三個方法獲取的資料是否正確,我們讓它們呼叫同一個Trace方法,該方法會在控制檯上打印出當前執行緒ID、呼叫鏈標識(TraceId)、方法名和獲取到的資料。如下所示的是該演示程式執行後的結果,可以看出置於CallContext中的CallStackContext物件幫助我們很好地完成了針對呼叫鏈的資料傳遞。
既然我們可以使用ThreadStatic靜態欄位,自然也可以使用ThreadLocal<T>物件來代替。如果希望時候後者,我們只需要將CallStackContext改寫成如下的形式即可。
public class CallStackContext : Dictionary<string, object> { private static ThreadLocal<CallStackContext> _current = new ThreadLocal<CallStackContext>(); private static int _traceId = 0; public static CallStackContext Current { get => _current.Value; set => _current.Value = value; } public long TraceId { get; } = Interlocked.Increment(ref _traceId); }
二、CallContext
除使用ThreadStatic欄位來傳遞呼叫鏈資料之外,我們還可以使用CallContext。顧名思義,CallContext是專門為呼叫鏈建立的上下文,我們首先利用它來實現基於呼叫鏈的資料傳遞。如果採用這種解決方案,上述的CallStack和CallStackContext型別可以改寫成如下的形式。如程式碼片段所示,當前的CallStackContext上下文通過靜態屬性Current獲取,可以看出它是通過呼叫CallContext的靜態方法GetData提取的,傳入的型別名稱作為存放“插槽”的名稱。在初始化的時候,CallStack會建立一個CallStackContext物件並將其放進CallContext對應儲存插槽中作為當前上下文,該插槽會在Dispose方法中被釋放
public class CallStackContext: Dictionary<string, object> { private static int _traceId = 0; public static CallStackContext Current => CallContext.GetData(nameof(CallStackContext)) as CallStackContext; public long TraceId { get; } = Interlocked.Increment(ref _traceId); }
public class CallStack : IDisposable { public CallStack() => CallContext.SetData(nameof(CallStackContext), new CallStackContext()); public void Dispose() => CallContext.FreeNamedDataSlot(nameof(CallStackContext)); }
三、支援跨執行緒傳遞嗎?
對於上面演示的例項來說,呼叫鏈中的三個方法(Foo、Bar和Baz)均是在同一個執行緒中執行的,如果出現了跨執行緒呼叫,CallContext是否還能幫助我們實現上下文的快執行緒傳遞嗎?為了驗證CallContext跨執行緒傳遞的能力,我們將Call方法改寫成如下的形式:Call方法直接呼叫Foo方法,但是Foo方法針對Bar方法的呼叫,以及Bar方法針對Baz方法的呼叫均在一個新建立的執行緒中進行的。
static void Call() { using (new CallStack()) { CallStackContext.Current["argument"] = Guid.NewGuid(); Foo(); } } static void Foo() { Trace(); new Thread(Bar).Start(); } static void Bar() { Trace(); new Thread(Baz).Start(); } static void Baz() => Trace();
再次執行我們我們的程式,不論是採用基於ThreadStatic靜態欄位,還是採用ThreadLocal<T>物件或者CallContext的解決方法,均會得到如下所示的輸出結果。可以看出設定的資料只能在Foo方法中獲取到,但是並沒有自動傳遞到非同步執行的Bar和Baz方法中。
四、IllogicalCallContext和LogicalCallContext
其實CallContext設定的上下文物件分為IllogicalCallContext和LogicalCallContext兩種型別,呼叫SetData設定的是IllogicalCallContext,它並不具有跨執行緒傳播的能力。如果希望在進行非同步呼叫的時候自動傳遞到目標執行緒,必須呼叫CallContext的LogicalSetData方法設定為LogicalCallContext。所以我們應該將CallStack型別進行如下的改寫。
public class CallStack : IDisposable { public CallStack() => CallContext.LogicalSetData(nameof(CallStackContext), new CallStackContext()); public void Dispose() => CallContext.FreeNamedDataSlot(nameof(CallStackContext)); }
與之相對,獲取LogicalCallContext物件的方法也得換成LogicalGetData,為此我們將CallStackContext改寫成如下的形式。
public class CallStackContext: Dictionary<string, object> { private static int _traceId = 0; public static CallStackContext Current => CallContext.LogicalGetData(nameof(CallStackContext)) as CallStackContext; public long TraceId { get; } = Interlocked.Increment(ref _traceId); }
再次執行我們程式,依然能夠得到希望的結果。
除了將設定和提取當前CallStackContext的方式進行修改(GetData=>LogicalGet; SetData=>LogicalSetData)之外,我們還有另一個解決方案,那就是讓放存放在CallContext儲存槽的資料型別實現ILogicalThreadAffinative介面。該介面沒有定義任何成員,實現型別對應的物件將自動視為LogicalCallContext。對於我們的演示例項來說,我們只需要讓CallStackContext實現該介面就可以了。
public class CallStackContext: Dictionary<string, object>, ILogicalThreadAffinative { private static int _traceId = 0; public static CallStackContext Current => CallContext.GetData(nameof(CallStackContext)) as CallStackContext; public long TraceId { get; } = Interlocked.Increment(ref _traceId); }
五、AsyncLocal<T>
CallContext並沒有被.NET Core繼承下來。也就是,只有.NET Framework才提供針對CallContext的支援,.因為我們有更好的選擇,那就是AsyncLocal<T>。如果使用AsyncLocal<T>作為存放呼叫鏈上下文的容器,我們的
public class CallStackContext: Dictionary<string, object>, ILogicalThreadAffinative { internal static readonly AsyncLocal<CallStackContext> _contextAccessor = new AsyncLocal<CallStackContext>(); private static int _traceId = 0; public static CallStackContext Current => _contextAccessor.Value; public long TraceId { get; } = Interlocked.Increment(ref _traceId); } public class CallStack : IDisposable { public CallStack() => CallStackContext._contextAccessor.Value = new CallStackContext(); public void Dispose() => CallStackContext._contextAccessor.Value = null; }
既然命名為AsyncLocal<T>,自然是支援非同步呼叫。它不僅支援上面演示的直接建立執行緒的方式,最主要的是支援我們熟悉的await的方式(如下所示)。
class Program { static async Task Main(string[] args) { for (int i = 0; i < 5; i++) { ThreadPool.QueueUserWorkItem(_ => Call()); } Console.Read(); Console.Read(); async Task Call() { using (new CallStack()) { CallStackContext.Current["argument"] = Guid.NewGuid(); await FooAsync(); await BarAsync(); await BazAsync(); } } } static Task FooAsync() => Task.Run(() => Trace()); static Task BarAsync() => Task.Run(() => Trace()); static Task BazAsync() => Task.Run(() => Trace()); static void Trace([CallerMemberName] string methodName = null) { var threadId = Thread.CurrentThread.ManagedThreadId; var traceId = CallStackContext.Current?.TraceId; var argument = CallStackContext.Current?["argument"]; Console.WriteLine($"Thread: {threadId}; TraceId: {traceId}; Method: {methodName}; Argument:{argument}"); } }