有關對耗時很大迴圈進行並行化優化的探討之二:多重迴圈任務的併發處理
【引】
在上一篇有關並行化的《有關對耗時很大迴圈進行並行化優化的探討 之一:併發搜尋的處理》博文中,我們看到,不使用並行語言,而使用基本的程式語言配合多執行緒可以進行並行化處理,本文進一步將並行化操作進行一個簡單封裝,並開始完成一個並行化的資料操作,以供大家參考。
【多重迴圈的操作】
有很多方法可以實現並行查詢,必須並行化Linq就是很不錯的途徑,相比linq to entity,真正實現了效能的有效提升,但如果考慮在迴圈中進行復雜的業務處理,進行獨立的並行化程式設計會更有效的幫助我們實現系統的優化。
下面我們以一個簡單的操作為例,看看如何自己做一個並行化的程式:
在如下的例程中,我們自定義了一個繼承於datatable的TLoopTable物件,並向該表格新增500×5條Tdata記錄,很不幸的是,每條tdata記錄需要10毫秒才能生成。使用該TLoopTable物件填充到datagridview的時間是39秒左右,
我們就以此應用開始進行我們的並行化優化。
class TLoopTable : DataTable
{
public const int CON = 500;
public TLoopTable()
{
this.Columns.Add("C1");
this.Columns.Add("D1");
this.Columns.Add("E1");
this.Columns.Add("F1");
this.Columns.Add("G1");
}
public void AddRows()
{
List<LoopD> plist = new List<LoopD>();
plist.Add(new LoopD("C", 1));
plist.Add(new LoopD("D", 2));
plist.Add(new LoopD("E", 3));
plist.Add(new LoopD("F", 4));
plist.Add(new LoopD("G", 5));
for (int i = 0; i < CON; i++)
{
TData [] tdl = new TData [5];
foreach (LoopD p in plist)
{
tdl[p.Col -1] = new TData(p.Col, i, p.Str) ;
}
this.Rows.Add(tdl);
}
}
class LoopD
{
public int Col;
public string Str;
public LoopD(string s, int c)
{
Col = c ;
Str = s ;
}
}
class TData
{
public int Row;
public int Col;
public string Data;
public TData(int col, int row,string fStr)
{
Thread.Sleep(10);
Row = row;
Col = col;
Data = fStr + row.ToString();
}
public override string ToString()
{
return Data;
}
}
}
首先,我們將上一篇文章所用的並行化方法進行一下封裝如下:
1. 通過定義一個委託,在指定我們在並行化操作內的具體輸入輸出,我這樣定義委託,目的是達到的效果是:如果特定的操作需要迴圈強行退出(如發生異常),則通過taskRst=false實現,同時返回一個object型別值,以便外部程式瞭解中途退出的原因。
public delegate object MyParaLoopTaskHandler(object[] sender, int i, out bool taskRst);
class ParaLoop
{
private ManualResetEvent _ParaEvent;
private int _ExecedParaCount;
private object SynParaObj = new object();
private int ExecutedParaCount
{
get
{
lock (SynParaObj)
{
return _ExecedParaCount;
}
}
set
{
lock (SynParaObj)
{
_ExecedParaCount = value;
}
}
}
bool _IsParaFin;
private bool IsParaFin
{
get
{
lock (SynParaObj)
{
return _IsParaFin;
}
}
set
{
lock (SynParaObj)
{
_IsParaFin = value;
}
}
}
private int _TotalQty;
private object _ParaLoop_Return;
public object ParaCompute(MyParaLoopTaskHandler loopTask, object[] inArg, int loopQty)
{
_ParaEvent = new ManualResetEvent(false);
_IsParaFin = false;
_ExecedParaCount = 0;
_TotalQty = loopQty;
for (int i = 0; i < _TotalQty; i++)
{
MyParaThread u = new MyParaThread();
u.EndTaskCallBack +=new EndTaskHandler(u_EndTaskCallBack);
u.BeginInvoke(loopTask, inArg, i);
}
//使用waitone進行阻塞,僅僅是保證外部呼叫時使用效果上與順序執行相同,但如果希望使用非同步處理,這裡可以完全不用阻塞,而通過_EndTaskCallBack返回一個事件即可
_ParaEvent.WaitOne();
return _ParaLoop_Return;
}
void u_EndTaskCallBack(bool taskRst, object retVal)
{
if (IsParaFin)
return;
if (!taskRst)
{
_ParaLoop_Return = retVal;
_ParaEvent.Set();
return;
}
lock (SynParaObj)
{
ExecutedParaCount++;
if (ExecutedParaCount >= _TotalQty)
{
IsParaFin = true;
_ParaEvent.Set();
}
}
}
}
internal delegate void EndTaskHandler(bool taskRst,object retVal);
class MyParaThread
{
internal event EndTaskHandler EndTaskCallBack;
public void BeginInvoke(MyParaLoopTaskHandler thdTask, object[] sender,int i)
{
Thread p = new Thread(new ParameterizedThreadStart(DoThreadTask), 262144);
ParaTaskContent t = new ParaTaskContent(thdTask, sender,i);
p.Start(t);
}
private void DoThreadTask(object args)
{
ParaTaskContent c = args as ParaTaskContent;
bool rs = true;
object retval= c.Invoke( out rs );
if (EndTaskCallBack != null)
{
EndTaskCallBack(rs, retval);
}
}
class ParaTaskContent
{
public MyParaLoopTaskHandler _Task;
object[] _Sender;
int _Loop_i;
public ParaTaskContent(MyParaLoopTaskHandler thdTask, object[] sender,int loop_i)
{
_Task = thdTask;
_Sender = sender;
_Loop_i = loop_i;
}
public object Invoke(out bool rst)
{
return _Task.Invoke(_Sender, _Loop_i, out rst);
}
}
}
經過封裝以後,外部程式只需要如此呼叫即可實現並行化操作:
1.定義一個要處理的委託:
MyParaLoopTaskHandler k = new MyParaLoopTaskHandler(PerformParaCompute);
2. 建立一個並行化物件
ParaLoop t = new ParaLoop();
3. 指定需要輸入的內容
object[] inargs = new object[2];
inargs[0] = plist;
inargs[1] = rlist;
4. 執行並行化即可
t.ParaCompute(k, inargs,TLoopTable .CON );
而並行化的處理結果可以通過委託方法中定義的操作實現。在本例中,我們每次並行化操作是生成一行(5項TData記錄)記錄,但我們需要一個途徑使並行化生成的行資料與自定義表格容器的行向對應,否則結果可能是顯示順序不是0,1,2,3,...,而是0,3,2,4,1...
為了實現此目的,我們就先生成空白的列表資料,同時建立一個有行標記的資料字典,生成完資料後,再一次填充入table中。以下就是實現的例程:
public void AddRows()
{
List<LoopD> plist = new List<LoopD>();
plist.Add(new LoopD("C", 1));
plist.Add(new LoopD("D", 2));
plist.Add(new LoopD("E", 3));
plist.Add(new LoopD("F", 4));
plist.Add(new LoopD("G", 5));
//生成空白表
for (int i=0;i<CON ;i++)
{
this.Rows.Add(new object [5]);
}
ParaLoop t = new ParaLoop();
Dictionary<int, TData[]> rlist = new Dictionary<int, TData[]>(); // 用於保證並行化資料的順序與表格位置一致的資料字典
object[] inargs = new object[2];
inargs[0] = plist; //傳遞給並行處理程式 生成沒行資料對應位置資訊所需要的子迴圈體
inargs[1] = rlist; // 傳遞給並行處理程式位置資料字典
MyParaLoopTaskHandler k = new MyParaLoopTaskHandler(PerformParaCompute);
t.ParaCompute(k, inargs,TLoopTable .CON );
//並行化完成後,將資料字典填充到表格中。
foreach (int row in rlist.Keys)
{
foreach (TData d in rlist[row])
{
this.Rows[row][d.Col-1] = d;
}
}
}
//並行處理資料的執行方法
private object PerformParaCompute(object[] sender, int i, out bool taskRst)
{
object[] k = sender as object[];
//提取到輸入資料,並進行轉換
List<LoopD> loopsrc = k[0] as List<LoopD>;
Dictionary<int, TData[]> rlist = k[1] as Dictionary<int, TData[]>;
TData[] r = new TData[5];
foreach (LoopD l in loopsrc)
{
r[l.Col - 1] = new TData(l.Col, i, l.Str);
}
lock (rlist)
{
rlist.Add(i, r);
}
taskRst = true;
return null;
}
經過並行化改造前後的執行效率如下表
序列執行 | 並行化執行 | |
執行時間 | 39.072秒 | 2.172秒 |
CPU佔用率 | <1% | 68% |
可以看到,通過並行化改造,系統效率提升了20倍。哇噢。