ThreadPool類(線程池)
CLR線程池並不會在CLR初始化時立即建立線程,而是在應用程序要創建線程來運行任務時,線程池才初始化一個線程。
線程池初始化時是沒有線程的,線程池裏的線程的初始化與其他線程一樣,但是在完成任務以後,該線程不會自行銷毀,而是以掛起的狀態返回到線程池。直到應用程序再次向線程池發出請求時,線程池裏掛起的線程就會再度激活執行任務。
這樣既節省了建立線程所造成的性能損耗,也可以讓多個任務反復重用同一線程,從而在應用程序生存期內節約大量開銷。
通過CLR線程池所建立的線程總是默認為後臺線程,優先級數為ThreadPriority.Normal。
CLR線程池分為工作者線程(workerThreads)
- 工作者線程是主要用作管理CLR內部對象的運作,通常用於計算密集的任務。
- I/O(Input/Output)線程主要用於與外部系統交互信息,如輸入輸出,CPU僅需在任務開始的時候,將任務的參數傳遞給設備,然後啟動硬件設備即可。等任務完成的時候,CPU收到一個通知,一般來說是一個硬件的中斷信號,此時CPU繼續後繼的處理工作。在處理過程中,CPU是不必完全參與處理過程的,如果正在運行的線程不交出CPU的控制權,那麽線程也只能處於等待狀態,即使操作系統將當前的CPU調度給其他線程,此時線程所占用的空間還是被占用,而並沒有CPU處理這個線程,可能出現線程資源浪費的問題。如果這是一個網絡服務程序,每一個網絡連接都使用一個線程管理,可能出現大量線程都在等待網絡通信,隨著網絡連接的不斷增加,處於等待狀態的線程將會很消耗盡所有的內存資源。可以考慮使用線程池解決這個問題。
線程池的最大值一般默認為1000、2000。當大於此數目的請求時,將保持排隊狀態,直到線程池裏有線程可用。
使用CLR線程池的工作者線程一般有兩種方式:
- 通過ThreadPool.QueueUserWorkItem()方法;
- 通過委托;
要註意,不論是通過ThreadPool.QueueUserWorkItem()還是委托,調用的都是線程池裏的線程。
通過以下兩個方法可以讀取和設置CLR線程池中工作者線程與I/O線程的最大線程數。
- ThreadPool.GetMax(out in workerThreads,out int completionPortThreads);
- ThreadPool.SetMax(int workerThreads,int completionPortThreads);
若想測試線程池中有多少線程正在投入使用,可以通過ThreadPool.GetAvailableThreads(out in workThreads,out int conoletionPortThreads)方法。
方法 | 說明 |
GetAvailableThreads | 剩余空閑線程數 |
GetMaxThreads | 最多可用線程數,所有大於此數目的請求將保持排隊狀態,直到線程池線程變為可用 |
GetMinThreads | 檢索線程池在新請求預測中維護的空閑線程數 |
QueueUserWorkItem | 啟動線程池裏得一個線程(隊列的方式,如線程池暫時沒空閑線程,則進入隊列排隊) |
SetMaxThreads | 設置線程池中的最大線程數 |
SetMinThreads | 設置線程池最少需要保留的線程數 |
我們可以使用線程池來解決上面的大部分問題,跟使用單個線程相比,使用線程池有如下優點:
1、縮短應用程序的響應時間。因為在線程池中有線程的線程處於等待分配任務狀態(只要沒有超過線程池的最大上限),無需創建線程。
2、不必管理和維護生存周期短暫的線程,不用在創建時為其分配資源,在其執行完任務之後釋放資源。
3、線程池會根據當前系統特點對池內的線程進行優化處理。
總之使用線程池的作用就是減少創建和銷毀線程的系統開銷。在.NET中有一個線程的類ThreadPool,它提供了線程池的管理。
ThreadPool是一個靜態類,它沒有構造函數,對外提供的函數也全部是靜態的。其中有一個QueueUserWorkItem方法,它有兩種重載形式,如下:
public static bool QueueUserWorkItem(WaitCallback callBack):將方法排入隊列以便執行。此方法在有線程池線程變得可用時執行。
public static bool QueueUserWorkItem(WaitCallback callBack,Object state):將方法排入隊列以便執行,並指定包含該方法所用數據的對象。此方法在有線程池線程變得可用時執行。
QueueUserWorkItem方法中使用的的WaitCallback參數表示一個delegate,它的聲明如下:
public delegate void WaitCallback(Object state)
如果需要傳遞任務信息可以利用WaitCallback中的state參數,類似於ParameterizedThreadStart委托。
下面是一個ThreadPool的例子,代碼如下:
using System; using System.Collections; using System.ComponentModel; using System.Diagnostics; using System.Threading; namespace ConsoleApp1 { class ThreadPoolDemo { public ThreadPoolDemo() { } public void Work() { ThreadPool.QueueUserWorkItem(new WaitCallback(CountProcess)); ThreadPool.QueueUserWorkItem(new WaitCallback(GetEnvironmentVariables)); } /// <summary> /// 統計當前正在運行的系統進程信息 /// </summary> /// <param name="state"></param> private void CountProcess(object state) { Process[] processes = Process.GetProcesses(); foreach (Process p in processes) { try { Console.WriteLine("進程信息:Id:{0},ProcessName:{1},StartTime:{2}", p.Id, p.ProcessName, p.StartTime); } catch (Win32Exception e) { Console.WriteLine("ProcessName:{0}", p.ProcessName); } finally { } } Console.WriteLine("獲取進程信息完畢。"); } /// <summary> /// 獲取當前機器系統變量設置 /// </summary> /// <param name="state"></param> public void GetEnvironmentVariables(object state) { IDictionary list = System.Environment.GetEnvironmentVariables(); foreach (DictionaryEntry item in list) { Console.WriteLine("系統變量信息:key={0},value={1}", item.Key, item.Value); } Console.WriteLine("獲取系統變量信息完畢。"); } } }ThreadPoolDemo
using System; using System.Threading; namespace ConsoleApp1 { class Program { static void Main(string[] args) { ThreadPoolDemo tpd1 = new ThreadPoolDemo(); tpd1.Work(); Thread.Sleep(5000); Console.WriteLine("OK"); Console.ReadLine(); } } }Program
利用ThreadPool調用工作線程和IO線程的範例
using System; using System.Collections; using System.IO; using System.Text; using System.Threading; namespace ConsoleApp1 { class Program { static void Main(string[] args) { // 設置線程池中處於活動的線程的最大數目 // 設置線程池中工作者線程數量為1000,I/O線程數量為1000 ThreadPool.SetMaxThreads(1000, 1000); Console.WriteLine("Main Thread: queue an asynchronous method"); PrintMessage("Main Thread Start"); // 把工作項添加到隊列中,此時線程池會用工作者線程去執行回調方法 ThreadPool.QueueUserWorkItem(asyncMethod); asyncWriteFile(); Console.Read(); } // 方法必須匹配WaitCallback委托 private static void asyncMethod(object state) { Thread.Sleep(1000); PrintMessage("Asynchoronous Method"); Console.WriteLine("Asynchoronous thread has worked "); } #region 異步讀取文件模塊 private static void asyncReadFile() { byte[] byteData = new byte[1024]; FileStream stream = new FileStream(@"D:\123.txt", FileMode.OpenOrCreate, FileAccess.ReadWrite, FileShare.ReadWrite, 1024, true); //把FileStream對象,byte[]對象,長度等有關數據綁定到FileDate對象中,以附帶屬性方式送到回調函數 Hashtable ht = new Hashtable(); ht.Add("Length", (int)stream.Length); ht.Add("Stream", stream); ht.Add("ByteData", byteData); //啟動異步讀取,倒數第二個參數是指定回調函數,倒數第一個參數是傳入回調函數中的參數 stream.BeginRead(byteData, 0, (int)ht["Length"], new AsyncCallback(Completed), ht); PrintMessage("asyncReadFile Method"); } //實際參數就是回調函數 static void Completed(IAsyncResult result) { Thread.Sleep(2000); PrintMessage("asyncReadFile Completed Method"); //參數result實際上就是Hashtable對象,以FileStream.EndRead完成異步讀取 Hashtable ht = (Hashtable)result.AsyncState; FileStream stream = (FileStream)ht["Stream"]; int length = stream.EndRead(result); stream.Close(); string str = Encoding.UTF8.GetString(ht["ByteData"] as byte[]); Console.WriteLine(str); stream.Close(); } #endregion #region 異步寫入文件模塊 //異步寫入模塊 private static void asyncWriteFile() { //文件名 文件創建方式 文件權限 文件進程共享 緩沖區大小為1024 是否啟動異步I/O線程為true FileStream stream = new FileStream(@"D:\123.txt", FileMode.OpenOrCreate, FileAccess.ReadWrite, FileShare.ReadWrite, 1024, true); //這裏要註意,如果寫入的字符串很小,則.Net會使用輔助線程寫,因為這樣比較快 byte[] bytes = Encoding.UTF8.GetBytes("你在他鄉還好嗎?"); //異步寫入開始,倒數第二個參數指定回調函數,最後一個參數將自身傳到回調函數裏,用於結束異步線程 stream.BeginWrite(bytes, 0, (int)bytes.Length, new AsyncCallback(Callback), stream); PrintMessage("AsyncWriteFile Method"); } static void Callback(IAsyncResult result) { //顯示線程池現狀 Thread.Sleep(2000); PrintMessage("AsyncWriteFile Callback Method"); //通過result.AsyncState再強制轉換為FileStream就能夠獲取FileStream對象,用於結束異步寫入 FileStream stream = (FileStream)result.AsyncState; stream.EndWrite(result); stream.Flush(); stream.Close(); asyncReadFile(); } #endregion // 打印線程池信息 private static void PrintMessage(String data) { int workthreadnumber; int iothreadnumber; // 獲得線程池中可用的線程,把獲得的可用工作者線程數量賦給workthreadnumber變量 // 獲得的可用I/O線程數量給iothreadnumber變量 ThreadPool.GetAvailableThreads(out workthreadnumber, out iothreadnumber); Console.WriteLine("{0}\n CurrentThreadId is {1}\n CurrentThread is background :{2}\n WorkerThreadNumber is:{3}\n IOThreadNumbers is: {4}\n", data, Thread.CurrentThread.ManagedThreadId, Thread.CurrentThread.IsBackground.ToString(), workthreadnumber.ToString(), iothreadnumber.ToString()); } } }Program
線程池中放入異步操作
using System; using System.Threading; namespace ConsoleApp1 { class Program { private static void AsyncOperation(object state) { Console.WriteLine("Operation state: {0}", state ?? "(null)"); Console.WriteLine("Worker thread id: {0}", Thread.CurrentThread.ManagedThreadId); Thread.Sleep(TimeSpan.FromSeconds(2)); } static void Main(string[] args) { const int x = 1; const int y = 2; const string lambdaState = "lambda state 2"; ThreadPool.QueueUserWorkItem(AsyncOperation); Thread.Sleep(TimeSpan.FromSeconds(1)); ThreadPool.QueueUserWorkItem(AsyncOperation, "async state"); Thread.Sleep(TimeSpan.FromSeconds(1)); ThreadPool.QueueUserWorkItem(state => { Console.WriteLine("Operation state: {0}", state); Console.WriteLine("Worker thread id: {0}", Thread.CurrentThread.ManagedThreadId); Thread.Sleep(TimeSpan.FromSeconds(2)); }, "lambda state"); ThreadPool.QueueUserWorkItem(_ => { Console.WriteLine("Operation state: {0}, {1}", x + y, lambdaState); Console.WriteLine("Worker thread id: {0}", Thread.CurrentThread.ManagedThreadId); Thread.Sleep(TimeSpan.FromSeconds(2)); }, "lambda state"); Thread.Sleep(TimeSpan.FromSeconds(2)); } } }Program
線程池同步操作
using System; using System.Threading; namespace ConsoleApp1 { class ThreadPoolDemo { static object lockobj = new object(); static int Count = 0; ManualResetEvent manualEvent; public ThreadPoolDemo(ManualResetEvent manualEvent) { this.manualEvent = manualEvent; } public void DisplayNumber(object a) { lock (lockobj) { Count++; Console.WriteLine("當前運算結果:{0},Count={1},當前子線程id:{2} 的狀態:{3}", a, Count, Thread.CurrentThread.ManagedThreadId, Thread.CurrentThread.ThreadState); } //Console.WriteLine("當前運算結果:{0}", a); //Console.WriteLine("當前運算結果:{0},當前子線程id:{1} 的狀態:{2}", a,Thread.CurrentThread.ManagedThreadId, Thread.CurrentThread.ThreadState); //這裏是方法執行時間的模擬,如果註釋該行代碼,就能看出線程池的功能了 Thread.Sleep(2000); //Console.WriteLine("當前運算結果:{0},Count={1},當前子線程id:{2} 的狀態:{3}", a, Count, Thread.CurrentThread.ManagedThreadId, Thread.CurrentThread.ThreadState); //這裏是釋放共享鎖,讓其他線程進入 manualEvent.Set(); } } }ThreadPoolDemo
using System; using System.Diagnostics; using System.Threading; namespace ConsoleApp1 { class Program { //設定任務數量 static int count = 10; static void Main(string[] args) { //讓線程池執行5個任務所以也為每個任務加上這個對象保持同步 ManualResetEvent[] events = new ManualResetEvent[count]; Console.WriteLine("當前主線程id:{0}", Thread.CurrentThread.ManagedThreadId); Stopwatch sw = new Stopwatch(); sw.Start(); NoThreadPool(count); sw.Stop(); Console.WriteLine("Execution time using threads: {0}", sw.ElapsedMilliseconds); sw.Reset(); sw.Start(); //循環每個任務 for (int i = 0; i < count; i++) { //實例化同步工具 events[i] = new ManualResetEvent(false); //Test在這裏就是任務類,將同步工具的引用傳入能保證共享區內每次只有一個線程進入 ThreadPoolDemo tst = new ThreadPoolDemo(events[i]); //Thread.Sleep(200); //將任務放入線程池中,讓線程池中的線程執行該任務 ThreadPool.QueueUserWorkItem(tst.DisplayNumber, i); } //註意這裏,設定WaitAll是為了阻塞調用線程(主線程),讓其余線程先執行完畢, //其中每個任務完成後調用其set()方法(收到信號),當所有 //的任務都收到信號後,執行完畢,將控制權再次交回調用線程(這裏的主線程) ManualResetEvent.WaitAll(events); sw.Stop(); Console.WriteLine("Execution time using threads: {0}", sw.ElapsedMilliseconds); //Console.WriteLine("所有任務做完!"); Console.ReadKey(); } static void NoThreadPool(int count) { for (int i = 0; i < count; i++) { Thread.Sleep(2000); Console.WriteLine("當前運算結果:{0},Count={1},當前子線程id:{2} 的狀態:{3}", i, i + 1, Thread.CurrentThread.ManagedThreadId, Thread.CurrentThread.ThreadState); } } } }Program
線程池中的取消操作
using System; using System.Threading; namespace ConsoleApp1 { class Program { static void Main(string[] args) { ThreadPool.SetMaxThreads(1000, 1000); Console.WriteLine("Main thread run"); PrintMessage("Start"); Run(); Console.ReadKey(); } private static void Run() { CancellationTokenSource cts = new CancellationTokenSource(); // 這裏用Lambda表達式的方式和使用委托的效果一樣的,只是用了Lambda後可以少定義一個方法。 // 這在這裏就是讓大家明白怎麽lambda表達式如何由委托轉變的 ////ThreadPool.QueueUserWorkItem(o => Count(cts.Token, 1000)); ThreadPool.QueueUserWorkItem(callback, cts.Token); Console.WriteLine("Press Enter key to cancel the operation\n"); Console.ReadLine(); // 傳達取消請求 cts.Cancel(); Console.ReadLine(); } private static void callback(object state) { Thread.Sleep(1000); PrintMessage("Asynchoronous Method Start"); CancellationToken token = (CancellationToken)state; Count(token, 1000); } // 執行的操作,當受到取消請求時停止數數 private static void Count(CancellationToken token, int countto) { for (int i = 0; i < countto; i++) { if (token.IsCancellationRequested) { Console.WriteLine("Count is canceled"); break; } Console.WriteLine(i); Thread.Sleep(300); } Console.WriteLine("Cout has done"); } // 打印線程池信息 private static void PrintMessage(String data) { int workthreadnumber; int iothreadnumber; // 獲得線程池中可用的線程,把獲得的可用工作者線程數量賦給workthreadnumber變量 // 獲得的可用I/O線程數量給iothreadnumber變量 ThreadPool.GetAvailableThreads(out workthreadnumber, out iothreadnumber); Console.WriteLine("{0}\n CurrentThreadId is {1}\n CurrentThread is background :{2}\n WorkerThreadNumber is:{3}\n IOThreadNumbers is: {4}\n", data, Thread.CurrentThread.ManagedThreadId, Thread.CurrentThread.IsBackground.ToString(), workthreadnumber.ToString(), iothreadnumber.ToString()); } } }Program
Thread與ThreadPool的一個性能比較
using System; using System.Diagnostics; using System.Threading; namespace ConsoleApp1 { class Program { static void Main(string[] args) { const int numberOfOperations = 300; var sw = new Stopwatch(); sw.Start(); UseThreads(numberOfOperations); sw.Stop(); Console.WriteLine("Execution time using threads: {0}", sw.ElapsedMilliseconds); sw.Reset(); sw.Start(); UseThreadPool(numberOfOperations); sw.Stop(); Console.WriteLine("Execution time using threadPool: {0}", sw.ElapsedMilliseconds); } static void UseThreads(int numberOfOperations) { using (var countdown = new CountdownEvent(numberOfOperations)) { Console.WriteLine("Scheduling work by creating threads"); for (int i = 0; i < numberOfOperations; i++) { var thread = new Thread(() => { Console.Write("{0},", Thread.CurrentThread.ManagedThreadId); Thread.Sleep(TimeSpan.FromSeconds(0.1)); countdown.Signal(); }); thread.Start(); } countdown.Wait(); Console.WriteLine(); } } static void UseThreadPool(int numberOfOperations) { using (var countdown = new CountdownEvent(numberOfOperations)) { Console.WriteLine("Starting work on a threadpool"); for (int i = 0; i < numberOfOperations; i++) { ThreadPool.QueueUserWorkItem(_ => { Console.Write("{0},", Thread.CurrentThread.ManagedThreadId); Thread.Sleep(TimeSpan.FromSeconds(0.1)); countdown.Signal(); }); } countdown.Wait(); Console.WriteLine(); } } } }Program
ThreadPool類(線程池)