[NewLife.XCode]實體佇列(多執行緒生產的大資料集中儲存)
NewLife.XCode是一個有15年曆史的開源資料中介軟體,支援netcore/net45/net40,由新生命團隊(2002~2020)開發完成並維護至今,以下簡稱XCode。
整個系列教程會大量結合示例程式碼和執行日誌來進行深入分析,蘊含多年開發經驗於其中,代表作有百億級大資料實時計算專案。
開源地址:https://github.com/NewLifeX/X (求star, 1067+)
在大資料分析處理中,需要對海量資料進行添刪改操作,常規單行操作難以滿足要求,批量操作勢在必行!
飛仙(http://feixian.newlifex.com/)有收藏各種資料庫批量插入資料的效能排行榜,其中MySql冠軍是60萬tps,SQLite冠軍是56.6萬tps
然而很多時候,資料來自多個渠道(多執行緒、多網路連線),單個渠道資料量不大,甚至只有一行,就難以使用批量添刪改操作了。例如物聯網資料採集、埋點日誌等,在多執行緒上有大量資料需要寫入。因此,XCode創造性設計了實體佇列技術!
!!閱讀本文之前,建議閱讀:https://www.yuque.com/smartstone/xcode/batch
什麼是實體佇列
要說實體佇列EntityDeferredQueue,就不得不提它的基類延遲佇列DeferredQueue。
延遲佇列DeferredQueue的核心思想就是“湊批”,把要處理的零散資料放入一個“佇列”,然後定時集中處理。
例如物聯網採集服務端從多個連線收到資料,需要寫入資料庫,為了提升吞吐,可以把實體資料放入延遲佇列,然後定時的落庫,此時,延遲佇列得到一批資料,可以使用批量插入技術。
實際上DeferredQueue內部並不是一個佇列,而是一個併發字典,因為有些業務場景,需要在“入佇列”時去重,例如統計資料,需要拿出某省份的統計資料,多次累加後集中儲存。
private static readonly DeferredQueue _statCache = new EntityDeferredQueue { Name = "Gun", Action = EntityActions.Save }; private static void SaveStat(DateTime date, Int32 provinceID, String kind, ScanKinds scanKind, String code) { var key = $"{date:yyMMdd}_{provinceID}_{kind}"; var stat = _statCache.GetOrAdd(key, k => GunProvinceStat.FindByKey(k, true) ?? new GunProvinceStat()); stat.StatDate = date; stat.Kind = kind; stat.ProvinceID = provinceID; stat.LastCode = code; stat.ProcessStat(scanKind); _statCache.Commit(key); }
主要流程
對於統計型資料來說,可以在記憶體裡面多次累加計算指標,然後一次性儲存,並且是批量儲存,極大減少了資料庫寫入次數。這是大資料分析必備利器!
延遲佇列主要屬性
/// <summary>跟蹤數。達到該值時輸出跟蹤日誌,預設1000</summary>
public Int32 TraceCount { get; set; } = 1000;
/// <summary>週期。預設10_000毫秒</summary>
public Int32 Period { get; set; } = 10_000;
/// <summary>最大個數。超過該個數時,進入佇列將產生堵塞。預設100_000</summary>
public Int32 MaxEntity { get; set; } = 100_000;
/// <summary>批大小。預設5_000</summary>
public Int32 BatchSize { get; set; } = 5_000;
/// <summary>等待借出物件確認修改的時間,預設3000ms</summary>
public Int32 WaitForBusy { get; set; } = 3_000;
/// <summary>儲存速度,每秒儲存多少個實體</summary>
public Int32 Speed { get; private set; }
/// <summary>是否非同步處理。預設true表示非同步處理,共用DQ定時排程;false表示同步處理,獨立執行緒</summary>
public Boolean Async { get; set; } = true;
回過頭來,實體佇列EntityDeferredQueue作為延遲佇列的擴充套件延伸,實際上是定義了“佇列資料”的處理行為。延遲佇列只負責收集資料和定時排程,實際處理行為Process需要擴充套件。
EntityDeferredQueue定義了 Save/Insert/Update/Upsert/Delete等行為供選擇。
如何使用實體佇列提升吞吐
再次深入分析前文的例子
private static readonly DeferredQueue _statCache = new EntityDeferredQueue { Name = "Gun", Action = EntityActions.Save };
private static void SaveStat(DateTime date, Int32 provinceID, String kind, ScanKinds scanKind, String code)
{
var key = $"{date:yyMMdd}_{provinceID}_{kind}";
var stat = _statCache.GetOrAdd(key, k => GunProvinceStat.FindByKey(k, true) ?? new GunProvinceStat());
stat.StatDate = date;
stat.Kind = kind;
stat.ProvinceID = provinceID;
stat.LastCode = code;
stat.ProcessStat(scanKind);
_statCache.Commit(key);
}
這是一個非常簡單的資料分析專案,統計每天各省每一種掃描型別的操作次數。日均分析處理5億行資料,每一行資料都要識別出日期、省份、類別等欄位,也就是SaveStat每天要呼叫5億次,結果資料分類存入統計表。共31省份27種類別,每日統計行數約800行(並非每個省都有全部類別)。通俗來講,5億行資料,分組聚合得到800行,實時計算,每5秒計算一次。
採用流式計算框架,逐行遍歷5億行實時資料,如果Insert/Update資料庫5億次,顯然很不現實!
平均每行寫入62.5萬次(5億/800),如果能夠在記憶體裡面“湊一湊”,每1000次更新,才寫入一次資料庫,那麼總寫入次數降低為50萬次,平均每行寫入625次。
實體佇列/延遲佇列,正是為了這類場景而設計!
首先,根據業務去構造一個唯一key,在這裡就是日期+省份+類別;
其次,GetOrAdd嘗試從佇列裡獲取該key對應的統計物件,99%時候記憶體命中,如果不存在,則查資料庫或者new一個;
再次,取得統計物件後,可以進行欄位累加,stat.ProcessStat(scanKind);
最後,Commit告訴佇列,該key對應的實體物件已經使用完成,可以提交;
在延遲佇列內部,定時(Period=10_000ms)執行一次儲存,把記憶體裡面的統計物件批量儲存到資料庫,並清空佇列。
這裡遇到的第一個問題就是,少量統計物件仍然使用怎麼辦?請放心,定時任務會等待一定時間(WaitForBusy=3000ms),如果使用方Commit則提前完成。因此,上面的Commit可以不要,效果會變差一些,同時,統計邏輯必須儘快完成(<3000ms)。
第二個問題很重要,定時間隔(Period=10_000ms)之內,記憶體資料是高危狀態,如果此時程序退出,則意味著統計資料丟失。標準架構應該是在資料落庫以後做Ack確認,但是原始資料實在太多(5億),很不現實。因此,實際工作中,我們是通過提升系統可靠性來規避該問題,採用螞蟻排程AntJob,結合分散式多節點部署,在實時計算中,記憶體保留資料並不多。每次需要更新程式時,先停止排程一分鐘,等待資料落庫和冷卻,才能推出應用程序。在資料分析領域,一般允許有一定的資料誤差(<0.01%),或者白天實時計算加夜晚離線重算的模式!
實際經驗表明,只要應用沒有非法退出,不存在資料丟失問題!
再來看看 ProcessStat內部,(這裡的GunProvinceStat是XCode實體類,一張統計表)
public void ProcessStat(ScanKinds kind)
{
//stat.Total++;
Interlocked.Increment(ref _Total);
switch (kind)
{
case ScanKinds.Receipt:
//stat.Receipts++;
Interlocked.Increment(ref _Receipts);
break;
case ScanKinds.SendBill:
case ScanKinds.SendAir:
//stat.Sends++;
Interlocked.Increment(ref _Sends);
break;
case ScanKinds.SendBag:
Interlocked.Increment(ref _SendBags);
break;
case ScanKinds.ComeBill:
case ScanKinds.ComeAir:
//stat.Comes++;
Interlocked.Increment(ref _Comes);
break;
case ScanKinds.ComeBag:
Interlocked.Increment(ref _ComeBags);
break;
case ScanKinds.SendCar:
case ScanKinds.ComeCar:
Interlocked.Increment(ref _Cars);
break;
case ScanKinds.Dispatch:
//stat.Dispatchs++;
Interlocked.Increment(ref _Dispatchs);
break;
case ScanKinds.Sign:
//stat.Signs++;
Interlocked.Increment(ref _Signs);
break;
case ScanKinds.Back:
Interlocked.Increment(ref _Backs);
break;
case ScanKinds.Problem:
Interlocked.Increment(ref _Problems);
break;
case ScanKinds.Stay:
case ScanKinds.Other:
case ScanKinds.Input:
case ScanKinds.Order:
case ScanKinds.Electronic:
default:
Interlocked.Increment(ref _Others);
break;
}
}
資料表結構
<Table Name="GunProvinceStat" Description="巴槍省份統計" IgnoreNameCase="False">
<Columns>
<Column Name="ID" DataType="Int32" Identity="True" PrimaryKey="True" Description="編號" />
<Column Name="StatDate" DataType="DateTime" Description="統計日期" />
<Column Name="ProvinceID" DataType="Int32" Description="省份。0表示全國" />
<Column Name="Kind" DataType="String" Description="類別。All表示所有型別" />
<Column Name="Total" DataType="Int64" Description="總次數" />
<Column Name="Receipts" DataType="Int64" Description="收件數" />
<Column Name="Sends" DataType="Int64" Description="發件數" />
<Column Name="Comes" DataType="Int64" Description="到件數" />
<Column Name="Dispatchs" DataType="Int64" Description="派件數" />
<Column Name="Signs" DataType="Int64" Description="簽收數" />
<Column Name="SendBags" DataType="Int64" Description="發包數" />
<Column Name="ComeBags" DataType="Int64" Description="到包數" />
<Column Name="Cars" DataType="Int64" Description="掃車數" />
<Column Name="Backs" DataType="Int64" Description="退件數" />
<Column Name="Problems" DataType="Int64" Description="問題件數" />
<Column Name="Others" DataType="Int64" Description="其它數" />
<Column Name="LastCode" DataType="String" Description="最後單號" />
<Column Name="CreateTime" DataType="DateTime" Description="建立時間" />
<Column Name="UpdateTime" DataType="DateTime" Description="更新時間" />
</Columns>
<Indexes>
<Index Columns="StatDate,ProvinceID,Kind" Unique="True" />
<Index Columns="Kind,ProvinceID" />
</Indexes>
</Table>
系列教程
NewLife.XCode教程系列[2019版]
- 增刪改查入門。快速展現用法,程式碼配置連線字串
- 資料模型檔案。建立表格欄位和索引,名字以及資料型別規範,推薦欄位(時間,使用者,IP)
- 實體類詳解。資料類業務類,泛型基類,介面
- 功能設定。連線字串,除錯開關,SQL日誌,慢日誌,引數化,執行超時。程式碼與配置檔案設定,連線字串區域性設定
- 反向工程。自動建立資料庫資料表
- 資料初始化。InitData寫入初始化資料
- 高階增刪改。過載攔截,自增欄位,Valid驗證,實體模型(時間,使用者,IP)
- 髒資料。如何產生,怎麼利用
- 增量累加。高併發統計
- 事務處理。單表和多表,不同連線,多種寫法
- 擴充套件屬性。多表關聯,Map對映
- 高階查詢。複雜條件,分頁,自定義擴充套件FieldItem,查總記錄數,查彙總統計
- 資料層快取。Sql快取,更新機制
- 實體快取。全表整理快取,更新機制
- 物件快取。字典快取,適用使用者等資料較多場景。
- 百億級效能。欄位精煉,索引完備,合理查詢,充分利用快取
- 實體工廠。元資料,通用處理程式
- 角色許可權。Membership
- 匯入匯出。Xml,Json,二進位制,網路或檔案
- 分表分庫。常見拆分邏輯
- 高階統計。聚合統計,分組統計
- 批量寫入。批量插入,批量Upsert,非同步儲存
- 實體佇列。寫入級快取,提升效能。
- 備份同步。備份資料,恢復資料,同步資料
- 資料服務。提供RPC介面服務,遠端執行查詢,例如SQLite網路版
- 大資料分析。ETL抽取,排程計算處理,結果持久化