多執行緒程式設計(3、多工併發處理)
阿新 • • 發佈:2019-02-16
例:假設我們有個方法要執行100次,得到100次返回的結果總和。
序列執行
static void Main(string[] args)
{
SerialExcute();
Console.ReadLine();
}
/// <summary>
/// 序列執行,要阻塞主執行緒,不推薦
/// </summary>
private static void SerialExcute()
{
int sum=0;
Stopwatch watch = new Stopwatch();
watch.Start();
for (int i = 0; i < 100; i++)
{
sum += Method(i);
}
watch.Stop();
Console.WriteLine("100次執行總共開銷{0}毫秒,結果總和為:{1}。", watch.ElapsedMilliseconds,sum);
Console.WriteLine("這是主執行緒." );
Console.WriteLine("100次執行結束.");
}
private static int Method(int i)
{
Console.WriteLine("這是第{0}次執行Method", i);
//睡眠100毫秒秒
Thread.Sleep(100);
return i;
}
100次執行總共開銷10001毫秒,結果總和為:4950。
執行緒池併發執行
class Program
{
static void Main(string[] args)
{
TaskExcute();
}
private static object lockobj = new object();
/// <summary>
/// 執行緒池並行執行,如果出異常捕獲不了,將導致整個程式崩潰。(不推薦使用)
/// </summary>
private static void TaskExcute()
{
int sum=0;
Stopwatch watch = new Stopwatch();
watch.Start();
try
{
//採用計數器來判斷執行緒池裡的執行緒全部執行完畢
CountdownEvent handler = new CountdownEvent(100);
//ThreadPool.SetMaxThreads(5, 5);
//設定併發數 5
ThreadPool.SetMinThreads(5, 5);
for (int i = 0; i < 100; i++)
{
var j = i;//這裡一定要定義一個新的變數
ThreadPool.QueueUserWorkItem((state) =>
{
int result = Method(j);
//加鎖
lock (lockobj)
{
sum += result;
}
handler.Signal();
});
}
///主執行緒等待
handler.Wait();
}
catch (AggregateException ex)
{
Console.WriteLine(ex.Message);
//.NET4 Task的統一異常處理機制
foreach (Exception inner in ex.InnerExceptions)
{
Console.WriteLine("Exception type {0} from {1}",
inner.GetType(), inner.Source);
}
}
watch.Stop();
Console.WriteLine("100次執行總共開銷{0}毫秒,結果總和為:{1}。", watch.ElapsedMilliseconds, sum);
Console.WriteLine("這是主執行緒.");
Console.WriteLine("100次執行結束.");
}
private static int Method(int i)
{
Console.WriteLine("這是第{0}次執行Method", i);
//睡眠100毫秒
Thread.Sleep(100);
// throw new NullReferenceException("測試異常");
return i;
}
}
100次執行總共開銷1826毫秒,結果總和為:4950。
Task並行執行
using System;
using System.Collections.Generic;
using System.Diagnostics;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
namespace ConsoleApplication39
{
class Program
{
static void Main(string[] args)
{
TaskExcute();
}
private static object lockobj = new object();
public static CancellationTokenSource TokenSource = new CancellationTokenSource();
/// <summary>
/// Task併發執行,不能自定義併發數(不推薦)
/// </summary>
private static void TaskExcute()
{
int sum = 0;
Stopwatch watch = new Stopwatch();
watch.Start();
try
{
List<Task> tasks = new List<Task>();
List<int> results = new List<int>();
for (int i = 0; i < 100; i++)
{
var j = i;
tasks.Add(Task.Factory.StartNew(() =>
{
results.Add(Method(j));
}, TokenSource.Token));
}
Task.WaitAll(tasks.ToArray());
sum = results.Sum();
}
catch (AggregateException ex)
{
foreach (var inner in ex.InnerExceptions)
{
Console.WriteLine(inner.Message);
}
}
watch.Stop();
Console.WriteLine("100次執行總共開銷{0}毫秒,結果總和為:{1}。", watch.ElapsedMilliseconds, sum);
Console.WriteLine("這是主執行緒.");
Console.WriteLine("100次執行結束.");
}
private static int Method(int i)
{
Console.WriteLine("這是第{0}次執行Method", i);
//睡眠100毫秒,將此值調大將能開到併發數
Thread.Sleep(100);
try
{
// throw new NullReferenceException("測試異常");
}
catch (Exception ex)
{
TokenSource.Cancel();
throw ex;
}
return i;
}
}
}
100次執行總共開銷2218毫秒,結果總和為:4950。
重寫任務排程器
using System;
using System.Collections.Generic;
using System.Diagnostics;
using System.Threading;
using System.Threading.Tasks;
namespace ConsoleApplication39
{
class Program
{
static void Main(string[] args)
{
TaskExcute();
}
private static object lockobj = new object();
/// <summary>
/// 重寫任務排程器實現併發執行,可以捕獲異常且可以自定義併發數,推薦使用
/// </summary>
private static void TaskExcute()
{
int sum = 0;
Stopwatch watch = new Stopwatch();
watch.Start();
try
{
NTaskScheduler scheduler = new NTaskScheduler(5);
List<Task> tasks = new List<Task>();
for (int i = 0; i < 100; i++)
{
var j = i;
tasks.Add(new Task(() =>
{
var result = Method(j, scheduler._tokenSource);
lock (lockobj)
{
sum += result;
}
}, scheduler._tokenSource.Token));
tasks[i].Start(scheduler);
}
Task.WaitAll(tasks.ToArray());
}
catch (AggregateException ex)
{
foreach (var item in ex.InnerExceptions)
{
Console.WriteLine(item.Message);
}
}
watch.Stop();
Console.WriteLine("100次執行總共開銷{0}毫秒,結果總和為:{1}。", watch.ElapsedMilliseconds, sum);
Console.WriteLine("這是主執行緒.");
Console.WriteLine("100次執行結束.");
}
private static int Method(int i, CancellationTokenSource TokenSource)
{
Console.WriteLine("這是第{0}次執行Method", i);
//睡眠100毫秒,當前時間放大可以檢視併發數
Thread.Sleep(100);
try
{
// throw new NullReferenceException("測試異常");
}
catch (Exception ex)
{
TokenSource.Cancel();
throw ex;
}
return i;
}
}
}
NTaskScheduler.cs
using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
namespace ConsoleApplication39
{
/// <summary>
/// 自定義併發任務排程
/// </summary>
public class NTaskScheduler : TaskScheduler, IDisposable
{
private CancellationTokenSource TokenSource =null;
public CancellationTokenSource _tokenSource { get {
if (TokenSource == null) TokenSource = new CancellationTokenSource();
return TokenSource;
}
}
private List<Thread> _threads = new List<Thread>();
private BlockingCollection<Task> _tasks = new BlockingCollection<Task>();
private int _concurrencylevel;
/// <summary>
/// 初始化併發排程器
/// </summary>
/// <param name="concurrencyLevel">併發數</param>
public NTaskScheduler(int concurrencylevel)
{
this._concurrencylevel = concurrencylevel;
for (int i = 0; i < concurrencylevel; i++)
{
_threads.Add(new Thread(() =>
{
foreach (Task task in _tasks.GetConsumingEnumerable())
this.TryExecuteTask(task);
}));
_threads[i].Start();
}
}
public void Dispose()
{
this._tasks.CompleteAdding();//不接受Task的新增
foreach (Thread t in _threads)
{
t.Join();
}
}
/// <summary>
/// 僅對於偵錯程式支援,生成當前排隊到計劃程式中等待執行的 System.Threading.Tasks.Task 例項的列舉
/// </summary>
/// <returns>一個允許偵錯程式遍歷當前排隊到此計劃程式中的任務的列舉</returns>
protected override IEnumerable<Task> GetScheduledTasks()
{
return _tasks.ToArray();
}
protected override void QueueTask(Task task)
{
_tasks.Add(task);
}
protected override bool TryExecuteTaskInline(Task task, bool taskWasPreviouslyQueued)
{
if (_threads.Contains(Thread.CurrentThread)) return TryExecuteTask(task);
return false;
}
}
}
100次執行總共開銷2021毫秒,結果總和為:4950。