.Net Core中利用TPL(任務並行庫)構建Pipeline處理Dataflow
在學習的過程中,看一些一線的技術文件很吃力,而且考慮到國內那些技術牛人英語都不差的,要向他們看齊,所以每天下班都在瘋狂地背單詞,部落格有些日子沒有更新了,見諒見諒
什麼是TPL?
Task Parallel Library (TPL), 在.NET Framework 4微軟推出TPL,並把TPL作為編寫多執行緒和並行程式碼的首選方式,但是,在國內,到目前為止好像用的人並不多。(TPL)是System.Threading和System.Threading.Tasks名稱空間中的一組公共型別和API 。TPL的目的是通過簡化嚮應用程式新增並行性和併發性的過程來提高開發人員的工作效率,TPL動態地擴充套件併發度,以最有效地使用所有可用的處理器。通過使用TPL,您可以最大限度地提高程式碼的效能,讓我們專注於程式本身而不用去關注負責的多執行緒管理。
為什麼使用TPL?
在上面介紹了什麼是TPL,可能大家還是雲裡霧裡,不知道TPL的好處到底是什麼。
我在youtube上找到了一個優秀的視訊,講述的是TPL和Thread的區別,我覺得對比一下,TPL的優勢很快就能體現出來,如果大家能開啟的話建議大家一定要看看。
現如今,我們的電腦的CPU怎麼也是2核以上,下面假設我的電腦是四核的,我們來做一個實驗。
使用Thread
程式碼中,如果使用Thread來處理任務,如果不做特出的處理,只是thread.Start(),監測電腦的核心的使用情況是下面這樣的。
每一條線代表CPU某個核心的使用情況,明顯,隨著程式碼Run起來,其實只有某一個核心的使用率迅速提升,其他核心並無明顯波動,為什麼會這樣呢?
原來,預設情況下,作業系統並不會呼叫所有的核心來處理任務,即使我們使用多執行緒,其實也是在一個核心裡面執行這些Thread,而且Thread之間涉及到執行緒同步等問題,其實,效率也不會明顯提高。
使用TPL
在程式碼中,引入了TPL來處理相同的任務,再次監視各個核心的使用情況,效果就變得截然不同,如下。
可以看到各個核心的使用情況都同時有了明顯的提高。
說明使用TPL後,不再是使用CPU的某個核心來處理任務了,而是TPL自動把任務分攤給每個核心來處理,處理效率可想而知,理論上會有明顯提升的(為什麼說理論上?和使用多執行緒一樣,各個核心之間的同步管理也是要佔用一定的效率的,所以對於並不複雜的任務,使用TPL可能適得其反)。
看了這個實驗講解,是不是理解了上面所說的這句。
TPL的目的是通過簡化嚮應用程式新增並行性和併發性的過程來提高開發人員的工作效率,TPL動態地擴充套件併發度,以最有效地使用所有可用的處理器。
所以說,使用TPL 來處理多執行緒任務可以讓你不必吧把精力放在如何提高多執行緒處理效率上,因為這一切,TPL 能自動地幫你完成。
TPL Dataflow?
TPL處理Dataflow是TPL強大功能中的一種,它提供一套完整的資料流元件,這些資料流元件統稱為TPL Dataflow Library,那麼,在什麼場景下適合使用TPL Dataflow Library呢?
官方舉的一個 栗子 再恰當不過:
例如,通過TPL Dataflow提供的功能來轉換影象,執行光線校正或防紅眼,可以建立管道資料流元件,管道中的每個功能可以並行執行,並且TPL能自動控制影象流在不同執行緒之間的同步,不再需要Thread 中的Lock。
TPL資料流庫由Block組成,Block是緩衝和處理資料的單元,TPL定義了三種最基礎的Block。
還有其他一些個性化的Block,但其實他們都是對這三種Block進行一些擴充,可以結合下面的程式碼來理解這三種Block.
Code Show
1.source block 和 target block 合併成propagator block.
private IPropagatorBlock<string, Dictionary<int, string>> Process1() { var bufferBlock = new BufferBlock<Dictionary<int, string>>(); var actionBlock = new ActionBlock<string>(x => { Console.WriteLine($"Process1 處理中:{x}"); Thread.Sleep(5000); var dic = new Dictionary<int, string> { { 0, x } }; dic.Add(1, "Process1"); bufferBlock.Post(dic); }, new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = _maxDegreeOfParallelism }); actionBlock.Completion.ContinueWith(_ => { Console.WriteLine($"Process1 Complete,State{_.Status}"); bufferBlock.Complete(); }); return DataflowBlock.Encapsulate(actionBlock, bufferBlock); }
可以看到,我定義了BufferBlock和ActionBlock,它們分別繼承於ISourceBlock 和 ITargetBlock ,所以說,他們其實就是源塊和目標塊,在new actionBlock()中傳入了一個Action<String>,該Action就是該Block所執行的任務。 最後,DataflowBlock.Encapsulate(actionBlock, bufferBlock)把源塊和目標塊合併成了一個傳遞塊。
2.TransformBlock
private IPropagatorBlock<Dictionary<int, string>, Dictionary<int, string>> Process2() { var block = new TransformBlock<Dictionary<int, string>, Dictionary<int, string>>(dic => { Console.WriteLine($"Process2 處理中:{dic.First().Value}"); Thread.Sleep(5000); dic.Add(2, "Process2"); return dic; }, new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = _maxDegreeOfParallelism } ); block.Completion.ContinueWith(_ => { Console.WriteLine($"Process2 Complete,State{_.Status}"); }); return block; }
TransfromBlock繼承了IPropagatorBlock,所以它本身就是一個傳遞塊,所以它除了要處理出入資料,還要返回資料,所以給new TransformBlock()中傳入的是Func<TInput, TOutput>而不是Action<TInput>.
3.TargetBlock來收尾
private ITargetBlock<Dictionary<int, string>> Process3() { var actionBlock = new ActionBlock<Dictionary<int, string>>(dic => { Console.WriteLine($"Process3 處理中:{dic.First().Value}"); Thread.Sleep(5000); dic.Add(3, "Process3"); Console.WriteLine("Dic中的內容如下:"); foreach (var item in dic) { Console.Write($"{item.Key}:{item.Value}||"); } Console.WriteLine(); }, new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = _maxDegreeOfParallelism }); return actionBlock; }
TargetBlock只能寫入並處理資料,不能讀取,所以TargetBlock適合作為Pipeline的最後一個Block。
4.控制每個Block的並行度
在在構造TargetBlock(包括其子類)的時候,可以傳入ExecutionDataflowBlockOptions引數,ExecutionDataflowBlockOptions物件裡面有一個MaxDegreeOfParallelism屬性,通過改制,可以控制該Block的同時處理任務的數量(可以理解成執行緒數)。
new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = _maxDegreeOfParallelism }
5.構建Pipeline,連線Block
public Task Builder() { _startBlock = Process1(); var process2Block = Process2(); var process3Block = Process3(); _startBlock.LinkTo(process2Block, new DataflowLinkOptions() { PropagateCompletion = true }); process2Block.LinkTo(process3Block, new DataflowLinkOptions() { PropagateCompletion = true }); process3Block.Completion.ContinueWith(_ => { Console.WriteLine($"Process3 Complete,State{_.Status}"); Console.WriteLine("所有任務處理完成"); }); return process3Block.Completion; }
通過
ISourceBlock<TOutput>.LinkTo(ITargetBlock<TOutput> target, DataflowLinkOptions linkOption)
方法,可以把Block連線起來,即構建Pipeline,當DataflowLinkOptions物件的PropagateCompletion屬性為true時,SorceBlock任務處理完成是,會把TargetBlock也標記為完成。
Block被標記為Complete 後,無法傳入新的資料了,即不能再處理新的任務了。
6.Pipeline的執行
public void Process(string[] inputs) { if (inputs == null) return; foreach (var input in inputs) { _startBlock.Post(input); } _startBlock.Complete(); }
Pipeline構建好後,我們只需要給第一個Block傳入資料,該資料就會在管道內流動起來了,所有資料傳入完成後,呼叫Block的Complete方法,把該Block標記為完成,就不可以再往裡面Post資料了。
完整程式碼如下:
using System; using System.Collections.Generic; using System.Linq; using System.Text; using System.Threading; using System.Threading.Tasks; using System.Threading.Tasks.Dataflow; namespace Tpl.Dataflow { public class Pipeline { IPropagatorBlock<string, Dictionary<int, string>> _startBlock; private int _maxDegreeOfParallelism; public Pipeline(int maxDegreeOfParallelism) { _maxDegreeOfParallelism = maxDegreeOfParallelism; } public void Process(string[] inputs) { if (inputs == null) return; foreach (var input in inputs) { _startBlock.Post(input); } _startBlock.Complete(); } public Task Builder() { _startBlock = Process1(); var process2Block = Process2(); var process3Block = Process3(); _startBlock.LinkTo(process2Block, new DataflowLinkOptions() { PropagateCompletion = true }); process2Block.LinkTo(process3Block, new DataflowLinkOptions() { PropagateCompletion = true }); process3Block.Completion.ContinueWith(_ => { Console.WriteLine($"Process3 Complete,State{_.Status}"); Console.WriteLine("所有任務處理完成"); }); return process3Block.Completion; } private IPropagatorBlock<string, Dictionary<int, string>> Process1() { var bufferBlock = new BufferBlock<Dictionary<int, string>>(); var actionBlock = new ActionBlock<string>(x => { Console.WriteLine($"Process1 處理中:{x}"); Thread.Sleep(5000); var dic = new Dictionary<int, string> { { 0, x } }; dic.Add(1, "Process1"); bufferBlock.Post(dic); }, new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = _maxDegreeOfParallelism }); actionBlock.Completion.ContinueWith(_ => { Console.WriteLine($"Process1 Complete,State{_.Status}"); bufferBlock.Complete(); }); return DataflowBlock.Encapsulate(actionBlock, bufferBlock); } private IPropagatorBlock<Dictionary<int, string>, Dictionary<int, string>> Process2() { var block = new TransformBlock<Dictionary<int, string>, Dictionary<int, string>>(dic => { Console.WriteLine($"Process2 處理中:{dic.First().Value}"); Thread.Sleep(5000); dic.Add(2, "Process2"); return dic; }, new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = _maxDegreeOfParallelism } ); block.Completion.ContinueWith(_ => { Console.WriteLine($"Process2 Complete,State{_.Status}"); }); return block; } private ITargetBlock<Dictionary<int, string>> Process3() { var actionBlock = new ActionBlock<Dictionary<int, string>>(dic => { Console.WriteLine($"Process3 處理中:{dic.First().Value}"); Thread.Sleep(5000); dic.Add(3, "Process3"); Console.WriteLine("Dic中的內容如下:"); foreach (var item in dic) { Console.Write($"{item.Key}:{item.Value}||"); } Console.WriteLine(); }, new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = _maxDegreeOfParallelism }); return actionBlock; } } }
Main方法如下:
static void Main(string[] args) { Console.WriteLine("請輸入管道併發數:"); if (int.TryParse(Console.ReadLine(), out int max)) { var pipeline = new Pipeline(max); var task = pipeline.Builder(); pipeline.Process(new[] { "碼", "農", "阿", "宇" }); task.Wait(); Console.ReadKey(); } }
測試執行如圖:
我來解釋一下,為什麼是這麼執行的,因為把管道的並行度設定為2,所以每個Block可以同時處理兩個任務,所以,如果給管道傳入四個字元 ,每個字元作為一個任務,假設傳入 “碼農阿宇”四個任務,會時這樣的一個過程…..
- 碼 農 兩個首先進入Process1,
- 處理完成後,碼 農 兩個任務流出,
- Process1位置空出來, 阿 宇 兩個任務流入 Process1,
- 碼 農 兩個任務流向 Process2,
- 阿 宇 從 Process1 處理完成後流出,此時Process1任務完成
- 碼 農 流出 Process2 ,同時 阿 宇 流入 Process2 ……
- 依此類推….
碼字不易,如果對您有用,歡迎推薦和關注,謝謝!