1. 程式人生 > 實用技巧 >談反應式程式設計在服務端中的應用,資料庫操作優化,提速 Upsert

談反應式程式設計在服務端中的應用,資料庫操作優化,提速 Upsert

反應式程式設計在客戶端程式設計當中的應用相當廣泛,而當前在服務端中的應用相對被提及較少。本篇將介紹如何在服務端程式設計中應用響應時程式設計來改進資料庫操作的效能。

開篇就是結論

接續上一篇《談反應式程式設計在服務端中的應用,資料庫操作優化,從 20 秒到 0.5 秒》之後,這次,我們帶來了關於利用反應式程式設計進行 upsert 優化的案例說明。建議讀者可以先閱讀一下前一篇,這樣更容易理解本篇介紹的方法。

同樣還是利用批量化的思路,將單個 upsert 操作批量進行合併。已達到減少資料庫連結消耗從而大幅提升效能的目的。

業務場景

在最近的一篇文章《十萬同時線上使用者,需要多少記憶體?——Newbe.Claptrap 框架水平擴充套件實驗

》中。我們通過啟用多個常駐於記憶體當中的 Claptrap 來實現快速驗證 JWT 正確性的目的。

但,當時有一個技術問題沒有得到解決:

Newbe.Claptrap 框架設計了一個特性:當 Claptrap Deactive 時,可以選擇將快照立即儲存到資料庫。因此,當嘗試從叢集中關閉一個節點時,如果節點上存在大量的 Claptrap ,那麼將產生大量的資料庫 upsert 操作。瞬間推高資料庫消耗,甚至導致部分錯誤而儲存失敗。

一點點程式碼

有了前篇的IBatchOperator,那麼留給這篇的程式碼內容就非常少了。

首先,按照使用上一篇的 IBatchOperator 編寫一個支援操作的 Repository,形如以下程式碼:

public class BatchUpsert : IUpsertRepository
{
private readonly IDatabase _database;
private readonly IBatchOperator<(int, int), int> _batchOperator; public BatchUpsert(IDatabase database)
{
_database = database;
var options = new BatchOperatorOptions<(int, int), int>
{
BufferCount = 100,
BufferTime = TimeSpan.FromMilliseconds(50),
DoManyFunc = DoManyFunc
};
_batchOperator = new BatchOperator<(int, int), int>(options);
} private Task<int> DoManyFunc(IEnumerable<(int, int)> arg)
{
return _database.UpsertMany(arg.ToDictionary(x => x.Item1, x => x.Item2));
} public Task UpsertAsync(int key, int value)
{
return _batchOperator.CreateTask((key, value));
}
}

然後,只要實現對應資料庫的 UpsertMany 方法,便可以很好地完成這項優化。

各種資料庫的操作

結合 Newbe.Claptrap 現在專案的實際。目前,被支援的資料庫分別有 SQLite、PostgreSQL、MySql 和 MongoDB。以下,分別對不同型別的資料庫的批量 Upsert 操作進行說明。

由於在 Newbe.Claptrap 專案中的 Upsert 需求都是以主鍵作為對比鍵,因此以下也只討論這種情況。

SQLite

根據官方檔案,使用INSERT OR REPLACE INTO便可以實現主鍵衝突時替換資料的需求。

具體的語句格式形如以下:

INSERT OR REPLACE INTO TestTable (id, value)
VALUES
(@id0,@value0),
...
(@idn,@valuen);

因此只要直接拼接語句和引數呼叫即可。需要注意的是,SQLite 的可傳入引數預設為 999,因此拼接的變數也不應大於該數量。

官方檔案:INSERT

PostgreSQL

眾所周知,PostgreSQL 在進行批量寫入時,可以使用高效的COPY語句來完成資料的高速匯入,這遠遠快於INSERT語句。但可惜的是COPY並不能支援ON CONFLICT DO UPDATE子句。因此,無法使用COPY來完成 upsert 需求。

因此,我們還是迴歸使用INSERT配合ON CONFLICT DO UPDATE子句,以及unnest函式來完成批量 upsert 的需求。

具體的語句格式形如以下:

INSERT INTO TestTable (id, value)
VALUES (unnest(@ids), unnest(@values))
ON CONFLICT ON CONSTRAINT TestTable_pkey
DO UPDATE SET value=excluded.value;

其中的 ids 和 values 分別為兩個等長的陣列物件,unnest 函式可以將陣列物件轉換為行資料的形式。

注意,可能會出現 ON CONFLICT DO UPDATE command cannot affect row a second time 錯誤。

因此如果嘗試使用上述方案,需要在傳入資料庫之前,先在程式中去重一遍。而且,通常來說,在程式中進行一次去重可以減少向資料庫中傳入的資料,這本身也很有意義。

官方檔案:unnest 函式
官方檔案:Insert 語句

MySql

MySql 與 SQLite 類似,支援REPLACE語法。具體語句形式如下:

REPLACE INTO TestTable (id, value)
VALUES
(@id0,@value0),
...
(@idn,@valuen);

官方檔案:REPLACE 語句

MongoDB

MongoDB 原生支援 bulkWrite 的批量傳輸模式,也支援 replace 的 upsert 語法。因此操作非常簡單。

那麼這裡展示一下 C# 操作方法:

private async Task SaveManyCoreMany(
IDbFactory dbFactory,
IEnumerable<StateEntity> entities)
{
var array = entities as StateEntity[] ?? entities.ToArray();
var items = array
.Select(x => new MongoStateEntity
{
claptrap_id = x.ClaptrapId,
claptrap_type_code = x.ClaptrapTypeCode,
version = x.Version,
state_data = x.StateData,
updated_time = x.UpdatedTime,
})
.ToArray(); var client = dbFactory.GetConnection(_connectionName);
var db = client.GetDatabase(_databaseName);
var collection = db.GetCollection<MongoStateEntity>(_stateCollectionName); var upsertModels = items.Select(x =>
{
var filter = new ExpressionFilterDefinition<MongoStateEntity>(entity =>
entity.claptrap_id == x.claptrap_id && entity.claptrap_type_code == x.claptrap_type_code);
return new ReplaceOneModel<MongoStateEntity>(filter, x)
{
IsUpsert = true
};
});
await collection.BulkWriteAsync(upsertModels);
}

這是從 Newbe.Claptrap 專案業務場景中給出的程式碼,讀者可以結合自身需求進行修改。

官方檔案:db.collection.bulkWrite ()

通用型解法

優化的本質是減少資料庫連結的使用,儘可能在一個連結內完成更多的工作。因此如果特定的資料庫不支援以上資料庫類似的操作。那麼還是存在一種通用型的解法:

  1. 以儘可能快地方式將資料寫入一臨時表
  2. 將臨時表的資料已連表 update 的方式更新的目標表
  3. 刪除臨時表

UPDATE with a join

效能測試

以 SQLite 為例,嘗試對 12345 條資料進行 2 次 upsert 操作。

單條併發:1 分 6 秒

批量處理:2.9 秒

可以在該連結找到測試的程式碼。

樣例中不包含有 MySql、PostgreSQL 和 MongoDB 的樣例,因為沒有優化之前,在不提高連線池的情況下,一併發基本就爆炸了。所有優化的結果是直接解決了可用性的問題。

所有的示例程式碼均可以在程式碼庫中找到。如果 Github Clone 存在困難,也可以點選此處從 Gitee 進行 Clone

常見問題解答

此處對一些常見的問題進行解答。

客戶端是等待批量操作的結果嗎?

這是一個很多網友提出的問題。答案是:是的。

假設我們公開了一個 WebApi 作為介面,由瀏覽器呼叫。如果同時有 100 個瀏覽器同時發出請求。

那麼這 100 個請求會被合併,然後寫入資料庫。而在寫入資料庫之前,這些客戶端都不會得到服務端的響應,會一直等待。

這也是該合併方案區別於普通的 “寫佇列,後寫庫” 方案的地方。

原理上講,這種和 bulkcopy 有啥不一樣?

兩者是不相關,必須同時才有作用的功能。
首先,程式碼中的 database.InsertMany 就是你提到的 bulkcopy。

這個程式碼的關鍵不是 InsertMany ,而是如何將單次的插入請求合併。
試想一下,你可以在 webapi 上公開一個 bulkcopy 的 API。
但是,你無法將來自不同客戶端的請求合併在同一個 API 裡面來呼叫 bulkcopy。
例如,有一萬個客戶端都在呼叫你的 API,那怎麼合併這些 API 請求呢?

如果如果通過上面這種方式,雖然你只是對外公開了一個單次插入的 API。你卻實現了來自不同客戶端請求的合併,變得可以使用 bulkcopy 了。這在高併發下很有意義。

另外,這符合開閉的原理,因為你沒有修改 Repository 的 InsertOne 介面,卻實現了 bulkcopy
的效果。

如果批量操作中一個操作異常失敗是否會導致被合併的其他操作全部失敗?

如果業務場景是合併會有影響,那當然不應該合併。

批量操作一個失敗,當然是一起失敗,因為底層的資料庫事務肯定也是一起失敗。

除非批量介面也支援對每個傳入的 ID 做區別對待。典型的,比如 mongodb 的 bulkcopy 可以返回哪些成功哪些失敗,那麼我們就有能力設定不同的 Tcs 狀態。

哪些該合併,哪些不該合併,完全取決於業務。樣例給出的是如果要合併,應該怎麼合併。不會要求所有都要合併。

Insert 和 Upsert 都說了,那 Delete 和 Select 呢?

筆者籠統地將該模式稱為 “反應式批量處理”。要確認業務場景是否應用該模式,需要具備以下這兩個基本的要求:

  • 業務下游的批量處理是否會比累積的單條處理要快,如果會,那可以用
  • 業務上游是否會出現短時間的突增頻率的請求,如果會,那可以用

當然,還需要考量,比如:下游的批量操作能否卻分每個請求的結果等等問題。但以上兩點是一定需要考量的。

那麼以 Delete 為例:

  • Delete Where In 的速度會比 Delete = 的速度快嗎?試一下
  • 會有突增的 Delete 需求嗎?想一下

小小工具 Zeal

筆者是一個完整儲存過程都寫不出來的人。能夠查閱到這些資料庫的檔案,全靠一款名為 Zeal 的離線檔案檢視免費軟體。推薦給您,您也值得擁有。

Zeal 官網地址:https://zealdocs.org/

最後但是最重要!

最近作者正在構建以反應式Actor模式事件溯源為理論基礎的一套服務端開發框架。希望為開發者提供能夠便於開發出 “分散式”、“可水平擴充套件”、“可測試性高” 的應用系統 ——Newbe.Claptrap

本篇文章是該框架的一篇技術選文,屬於技術構成的一部分。如果讀者對該內容感興趣,歡迎轉發、評論、收藏文章以及專案。您的支援是促進專案成功的關鍵。

如果你對該專案感興趣,你可以通過 github issues 提交您的看法。

如果您無法正常訪問 github issue,您也可以傳送郵件到[email protected]來參與我們的討論。

點選連結 QQ 交流【Newbe.Claptrap】:https://jq.qq.com/?_wv=1027&k=5uJGXf5

您還可以查閱本系列的其他選文:

GitHub 專案地址:https://github.com/newbe36524/Newbe.Claptrap

Gitee 專案地址:https://gitee.com/yks/Newbe.Claptrap