1. 程式人生 > 其它 >[NewLife.XCode]實體佇列(多執行緒生產的大資料集中儲存)

[NewLife.XCode]實體佇列(多執行緒生產的大資料集中儲存)

NewLife.XCode是一個有15年曆史的開源資料中介軟體,支援netcore/net45/net40,由新生命團隊(2002~2020)開發完成並維護至今,以下簡稱XCode。

整個系列教程會大量結合示例程式碼和執行日誌來進行深入分析,蘊含多年開發經驗於其中,代表作有百億級大資料實時計算專案。

開源地址:https://github.com/NewLifeX/X (求star, 1067+)

在大資料分析處理中,需要對海量資料進行添刪改操作,常規單行操作難以滿足要求,批量操作勢在必行!

飛仙(http://feixian.newlifex.com/)有收藏各種資料庫批量插入資料的效能排行榜,其中MySql冠軍是60萬tps,SQLite冠軍是56.6萬tps

然而很多時候,資料來自多個渠道(多執行緒、多網路連線),單個渠道資料量不大,甚至只有一行,就難以使用批量添刪改操作了。例如物聯網資料採集、埋點日誌等,在多執行緒上有大量資料需要寫入。因此,XCode創造性設計了實體佇列技術

!!閱讀本文之前,建議閱讀https://www.yuque.com/smartstone/xcode/batch

什麼是實體佇列

要說實體佇列EntityDeferredQueue,就不得不提它的基類延遲佇列DeferredQueue。

延遲佇列DeferredQueue的核心思想就是“湊批”,把要處理的零散資料放入一個“佇列”,然後定時集中處理

例如物聯網採集服務端從多個連線收到資料,需要寫入資料庫,為了提升吞吐,可以把實體資料放入延遲佇列,然後定時的落庫,此時,延遲佇列得到一批資料,可以使用批量插入技術。

實際上DeferredQueue內部並不是一個佇列,而是一個併發字典,因為有些業務場景,需要在“入佇列”時去重,例如統計資料,需要拿出某省份的統計資料,多次累加後集中儲存。

private static readonly DeferredQueue _statCache = new EntityDeferredQueue { Name = "Gun", Action = EntityActions.Save };
private static void SaveStat(DateTime date, Int32 provinceID, String kind, ScanKinds scanKind, String code)
{
    var key = $"{date:yyMMdd}_{provinceID}_{kind}";
    var stat = _statCache.GetOrAdd(key, k => GunProvinceStat.FindByKey(k, true) ?? new GunProvinceStat());

    stat.StatDate = date;
    stat.Kind = kind;
    stat.ProvinceID = provinceID;
    stat.LastCode = code;

    stat.ProcessStat(scanKind);

    _statCache.Commit(key);
}

主要流程


對於統計型資料來說,可以在記憶體裡面多次累加計算指標,然後一次性儲存,並且是批量儲存,極大減少了資料庫寫入次數。這是大資料分析必備利器!

延遲佇列主要屬性

/// <summary>跟蹤數。達到該值時輸出跟蹤日誌,預設1000</summary>
public Int32 TraceCount { get; set; } = 1000;

/// <summary>週期。預設10_000毫秒</summary>
public Int32 Period { get; set; } = 10_000;

/// <summary>最大個數。超過該個數時,進入佇列將產生堵塞。預設100_000</summary>
public Int32 MaxEntity { get; set; } = 100_000;

/// <summary>批大小。預設5_000</summary>
public Int32 BatchSize { get; set; } = 5_000;

/// <summary>等待借出物件確認修改的時間,預設3000ms</summary>
public Int32 WaitForBusy { get; set; } = 3_000;

/// <summary>儲存速度,每秒儲存多少個實體</summary>
public Int32 Speed { get; private set; }

/// <summary>是否非同步處理。預設true表示非同步處理,共用DQ定時排程;false表示同步處理,獨立執行緒</summary>
public Boolean Async { get; set; } = true;

回過頭來,實體佇列EntityDeferredQueue作為延遲佇列的擴充套件延伸,實際上是定義了“佇列資料”的處理行為。延遲佇列只負責收集資料和定時排程,實際處理行為Process需要擴充套件。

EntityDeferredQueue定義了 Save/Insert/Update/Upsert/Delete等行為供選擇。

如何使用實體佇列提升吞吐

再次深入分析前文的例子

private static readonly DeferredQueue _statCache = new EntityDeferredQueue { Name = "Gun", Action = EntityActions.Save };
private static void SaveStat(DateTime date, Int32 provinceID, String kind, ScanKinds scanKind, String code)
{
    var key = $"{date:yyMMdd}_{provinceID}_{kind}";
    var stat = _statCache.GetOrAdd(key, k => GunProvinceStat.FindByKey(k, true) ?? new GunProvinceStat());

    stat.StatDate = date;
    stat.Kind = kind;
    stat.ProvinceID = provinceID;
    stat.LastCode = code;

    stat.ProcessStat(scanKind);

    _statCache.Commit(key);
}

這是一個非常簡單的資料分析專案,統計每天各省每一種掃描型別的操作次數。日均分析處理5億行資料,每一行資料都要識別出日期、省份、類別等欄位,也就是SaveStat每天要呼叫5億次,結果資料分類存入統計表。共31省份27種類別,每日統計行數約800行(並非每個省都有全部類別)。通俗來講,5億行資料,分組聚合得到800行,實時計算,每5秒計算一次。

採用流式計算框架,逐行遍歷5億行實時資料,如果Insert/Update資料庫5億次,顯然很不現實!

平均每行寫入62.5萬次(5億/800),如果能夠在記憶體裡面“湊一湊”,每1000次更新,才寫入一次資料庫,那麼總寫入次數降低為50萬次,平均每行寫入625次。

實體佇列/延遲佇列,正是為了這類場景而設計!

首先,根據業務去構造一個唯一key,在這裡就是日期+省份+類別;

其次,GetOrAdd嘗試從佇列裡獲取該key對應的統計物件,99%時候記憶體命中,如果不存在,則查資料庫或者new一個;

再次,取得統計物件後,可以進行欄位累加,stat.ProcessStat(scanKind);

最後,Commit告訴佇列,該key對應的實體物件已經使用完成,可以提交;

在延遲佇列內部,定時(Period=10_000ms)執行一次儲存,把記憶體裡面的統計物件批量儲存到資料庫,並清空佇列。

這裡遇到的第一個問題就是,少量統計物件仍然使用怎麼辦?請放心,定時任務會等待一定時間(WaitForBusy=3000ms),如果使用方Commit則提前完成。因此,上面的Commit可以不要,效果會變差一些,同時,統計邏輯必須儘快完成(<3000ms)。

第二個問題很重要,定時間隔(Period=10_000ms)之內,記憶體資料是高危狀態,如果此時程序退出,則意味著統計資料丟失。標準架構應該是在資料落庫以後做Ack確認,但是原始資料實在太多(5億),很不現實。因此,實際工作中,我們是通過提升系統可靠性來規避該問題,採用螞蟻排程AntJob,結合分散式多節點部署,在實時計算中,記憶體保留資料並不多。每次需要更新程式時,先停止排程一分鐘,等待資料落庫和冷卻,才能推出應用程序。在資料分析領域,一般允許有一定的資料誤差(<0.01%),或者白天實時計算加夜晚離線重算的模式!

實際經驗表明,只要應用沒有非法退出,不存在資料丟失問題!

再來看看 ProcessStat內部,(這裡的GunProvinceStat是XCode實體類,一張統計表)

public void ProcessStat(ScanKinds kind)
{
    //stat.Total++;
    Interlocked.Increment(ref _Total);

    switch (kind)
    {
        case ScanKinds.Receipt:
            //stat.Receipts++;
            Interlocked.Increment(ref _Receipts);
            break;
        case ScanKinds.SendBill:
        case ScanKinds.SendAir:
            //stat.Sends++;
            Interlocked.Increment(ref _Sends);
            break;
        case ScanKinds.SendBag:
            Interlocked.Increment(ref _SendBags);
            break;
        case ScanKinds.ComeBill:
        case ScanKinds.ComeAir:
            //stat.Comes++;
            Interlocked.Increment(ref _Comes);
            break;
        case ScanKinds.ComeBag:
            Interlocked.Increment(ref _ComeBags);
            break;
        case ScanKinds.SendCar:
        case ScanKinds.ComeCar:
            Interlocked.Increment(ref _Cars);
            break;
        case ScanKinds.Dispatch:
            //stat.Dispatchs++;
            Interlocked.Increment(ref _Dispatchs);
            break;
        case ScanKinds.Sign:
            //stat.Signs++;
            Interlocked.Increment(ref _Signs);
            break;
        case ScanKinds.Back:
            Interlocked.Increment(ref _Backs);
            break;
        case ScanKinds.Problem:
            Interlocked.Increment(ref _Problems);
            break;
        case ScanKinds.Stay:
        case ScanKinds.Other:
        case ScanKinds.Input:
        case ScanKinds.Order:
        case ScanKinds.Electronic:
        default:
            Interlocked.Increment(ref _Others);
            break;
    }
}

資料表結構

<Table Name="GunProvinceStat" Description="巴槍省份統計" IgnoreNameCase="False">
  <Columns>
    <Column Name="ID" DataType="Int32" Identity="True" PrimaryKey="True" Description="編號" />
    <Column Name="StatDate" DataType="DateTime" Description="統計日期" />
    <Column Name="ProvinceID" DataType="Int32" Description="省份。0表示全國" />
    <Column Name="Kind" DataType="String" Description="類別。All表示所有型別" />
    <Column Name="Total" DataType="Int64" Description="總次數" />
    <Column Name="Receipts" DataType="Int64" Description="收件數" />
    <Column Name="Sends" DataType="Int64" Description="發件數" />
    <Column Name="Comes" DataType="Int64" Description="到件數" />
    <Column Name="Dispatchs" DataType="Int64" Description="派件數" />
    <Column Name="Signs" DataType="Int64" Description="簽收數" />
    <Column Name="SendBags" DataType="Int64" Description="發包數" />
    <Column Name="ComeBags" DataType="Int64" Description="到包數" />
    <Column Name="Cars" DataType="Int64" Description="掃車數" />
    <Column Name="Backs" DataType="Int64" Description="退件數" />
    <Column Name="Problems" DataType="Int64" Description="問題件數" />
    <Column Name="Others" DataType="Int64" Description="其它數" />
    <Column Name="LastCode" DataType="String" Description="最後單號" />
    <Column Name="CreateTime" DataType="DateTime" Description="建立時間" />
    <Column Name="UpdateTime" DataType="DateTime" Description="更新時間" />
  </Columns>
  <Indexes>
    <Index Columns="StatDate,ProvinceID,Kind" Unique="True" />
    <Index Columns="Kind,ProvinceID" />
  </Indexes>
</Table>

系列教程

NewLife.XCode教程系列[2019版]

  1. 增刪改查入門。快速展現用法,程式碼配置連線字串
  2. 資料模型檔案。建立表格欄位和索引,名字以及資料型別規範,推薦欄位(時間,使用者,IP)
  3. 實體類詳解。資料類業務類,泛型基類,介面
  4. 功能設定。連線字串,除錯開關,SQL日誌,慢日誌,引數化,執行超時。程式碼與配置檔案設定,連線字串區域性設定
  5. 反向工程。自動建立資料庫資料表
  6. 資料初始化。InitData寫入初始化資料
  7. 高階增刪改。過載攔截,自增欄位,Valid驗證,實體模型(時間,使用者,IP)
  8. 髒資料。如何產生,怎麼利用
  9. 增量累加。高併發統計
  10. 事務處理。單表和多表,不同連線,多種寫法
  11. 擴充套件屬性。多表關聯,Map對映
  12. 高階查詢。複雜條件,分頁,自定義擴充套件FieldItem,查總記錄數,查彙總統計
  13. 資料層快取。Sql快取,更新機制
  14. 實體快取。全表整理快取,更新機制
  15. 物件快取。字典快取,適用使用者等資料較多場景。
  16. 百億級效能。欄位精煉,索引完備,合理查詢,充分利用快取
  17. 實體工廠。元資料,通用處理程式
  18. 角色許可權。Membership
  19. 匯入匯出。Xml,Json,二進位制,網路或檔案
  20. 分表分庫。常見拆分邏輯
  21. 高階統計。聚合統計,分組統計
  22. 批量寫入。批量插入,批量Upsert,非同步儲存
  23. 實體佇列。寫入級快取,提升效能。
  24. 備份同步。備份資料,恢復資料,同步資料
  25. 資料服務。提供RPC介面服務,遠端執行查詢,例如SQLite網路版
  26. 大資料分析。ETL抽取,排程計算處理,結果持久化
我不相信神話,我只相信汗水!我不相信命運,我只相信雙手!