談反應式程式設計在服務端中的應用,資料庫操作優化,萬條記錄從20秒到0.5秒
反應式程式設計在客戶端程式設計當中的應用相當廣泛,而當前在服務端中的應用相對被提及較少。本篇將介紹如何在服務端程式設計中應用響應時程式設計來改進資料庫操作的效能。
開篇就是結論
利用 System.Reactive 配合 TaskCompelteSource ,可以將分散的單次資料庫插入請求合併會一個批量插入的請求。在確保正確性的前提下,實現資料庫插入效能的優化。
如果讀者已經瞭解瞭如何操作,那麼剩下的內容就不需要再看了。
預設條件
現在,我們假設存在這樣一個 Repository 介面來表示一次資料庫的插入操作。
csharpnamespace Newbe.RxWorld.DatabaseRepository { public interface IDatabaseRepository { /// <summary> /// Insert one item and return total count of data in database /// </summary> /// <param name="item"></param> /// <returns></returns> Task<int> InsertData(int item); } }
接下來,我們在不改變該介面簽名的前提下,體驗一下不同的實現帶來的效能區別。
基礎版本
首先是基礎版本,採用的是最為常規的單次資料庫INSERT
操作來完成資料的插入。本示例採用的是SQLite
作為演示資料庫,方便讀者自行實驗。
namespace Newbe.RxWorld.DatabaseRepository.Impl { public class NormalDatabaseRepository : IDatabaseRepository { private readonly IDatabase _database; public NormalDatabaseRepository( IDatabase database) { _database = database; } public Task<int> InsertData(int item) { return _database.InsertOne(item); } } }
常規操作。其中_database.InsertOne(item)
的具體實現就是呼叫了一次INSERT
。
基礎版本在同時插入小於20次時基本上可以較快的完成。但是如果數量級增加,例如需要同時插入一萬條資料庫,將會花費約20秒鐘,存在很大的優化空間。
TaskCompelteSource
TaskCompelteSource 是 TPL 庫中一個可以生成一個可操作 Task 的型別。對於 TaskCompelteSource 不太熟悉的讀者可以通過該例項程式碼瞭解。
此處也簡單解釋一下該物件的作用,以便讀者可以繼續閱讀。
對於熟悉 javascript 的朋友,可以認為 TaskCompelteSource 相當於 Promise 物件。也可以相當於 jQuery 當中的 $.Deferred 。
如果都不瞭解的朋友,可以聽一下筆者吃麻辣燙時想到的生活化例子。
吃麻辣燙 | 技術解釋 |
---|---|
吃麻辣燙之前,需要先用盤子夾菜。 | 構造引數 |
夾好菜之後,拿到結賬處去結賬 | 呼叫方法 |
收銀員結賬完畢之後,會得到一個叫餐牌,會響鈴的那種 | 得到一個 Task 返回值 |
拿著菜牌找了一個位子坐下,玩手機等餐 | 正在 await 這個 Task ,CPU轉而處理其他事情 |
餐牌響了,去取餐,吃起來 | Task 完成,await 節數,繼續執行下一行程式碼 |
那麼 TaskCompelteSource 在哪兒呢?
首先,根據上面的例子,在餐牌響的時候,我們才會去取餐。那麼餐牌什麼時候才會響呢?當然是服務員手動按了一個在櫃檯的手動開關才觸發了這個響鈴。
那麼,櫃檯的這個開關,可以被技術解釋為 TaskCompelteSource 。
餐檯開關可以控制餐牌的響鈴。同樣, TaskCompelteSource 就是一種可以控制 Task 的狀態的物件。
解決思路
有了前面對 TaskCompelteSource 的瞭解,那麼接下來就可以解決文章開頭的問題了。思路如下:
當呼叫 InsertData 時,可以建立一個 TaskCompelteSource 以及 item 的元組。為了方便說明,我們將這個元組命名為BatchItem
。
將 BatchItem 的 TaskCompelteSource 對應的 Task 返回出去。
呼叫 InsertData 的程式碼會 await 返回的 Task,因此只要不操作 TaskCompelteSource ,呼叫者就一會一直等待。
然後,另外啟動一個執行緒,定時將 BatchItem 佇列消費掉。
這樣就完成了單次插入變為批量插入的操作。
筆者可能解釋的不太清楚,不過以下所有版本的程式碼均基於以上思路。讀者可以結合文字和程式碼進行理解。
ConcurrentQueue 版本
基於以上的思路,我們採用 ConcurrentQueue 作為 BatchItem 佇列進行實現,程式碼如下(程式碼很多,不必糾結,因為下面還有更簡單的):
csharpnamespace Newbe.RxWorld.DatabaseRepository.Impl
{
public class ConcurrentQueueDatabaseRepository : IDatabaseRepository
{
private readonly ITestOutputHelper _testOutputHelper;
private readonly IDatabase _database;
private readonly ConcurrentQueue<BatchItem> _queue;
// ReSharper disable once PrivateFieldCanBeConvertedToLocalVariable
private readonly Task _batchInsertDataTask;
public ConcurrentQueueDatabaseRepository(
ITestOutputHelper testOutputHelper,
IDatabase database)
{
_testOutputHelper = testOutputHelper;
_database = database;
_queue = new ConcurrentQueue<BatchItem>();
// 啟動一個 Task 消費佇列中的 BatchItem
_batchInsertDataTask = Task.Factory.StartNew(RunBatchInsert, TaskCreationOptions.LongRunning);
_batchInsertDataTask.ConfigureAwait(false);
}
public Task<int> InsertData(int item)
{
// 生成 BatchItem ,將物件放入佇列。返回 Task 出去
var taskCompletionSource = new TaskCompletionSource<int>();
_queue.Enqueue(new BatchItem
{
Item = item,
TaskCompletionSource = taskCompletionSource
});
return taskCompletionSource.Task;
}
// 從佇列中不斷獲取 BatchItem ,並且一批一批插入資料庫,更新 TaskCompletionSource 的狀態
private void RunBatchInsert()
{
foreach (var batchItems in GetBatches())
{
try
{
BatchInsertData(batchItems).Wait();
}
catch (Exception e)
{
_testOutputHelper.WriteLine($"there is an error : {e}");
}
}
IEnumerable<IList<BatchItem>> GetBatches()
{
var sleepTime = TimeSpan.FromMilliseconds(50);
while (true)
{
const int maxCount = 100;
var oneBatchItems = GetWaitingItems()
.Take(maxCount)
.ToList();
if (oneBatchItems.Any())
{
yield return oneBatchItems;
}
else
{
Thread.Sleep(sleepTime);
}
}
IEnumerable<BatchItem> GetWaitingItems()
{
while (_queue.TryDequeue(out var item))
{
yield return item;
}
}
}
}
private async Task BatchInsertData(IEnumerable<BatchItem> items)
{
var batchItems = items as BatchItem[] ?? items.ToArray();
try
{
// 呼叫資料庫的批量插入操作
var totalCount = await _database.InsertMany(batchItems.Select(x => x.Item));
foreach (var batchItem in batchItems)
{
batchItem.TaskCompletionSource.SetResult(totalCount);
}
}
catch (Exception e)
{
foreach (var batchItem in batchItems)
{
batchItem.TaskCompletionSource.SetException(e);
}
throw;
}
}
private struct BatchItem
{
public TaskCompletionSource<int> TaskCompletionSource { get; set; }
public int Item { get; set; }
}
}
}
以上程式碼中使用了較多的 Local Function 和 IEnumerable 的特性,不瞭解的讀者可以點選此處進行了解。
正片開始!
接下來我們使用 System.Reactive 來改造上面較為複雜的 ConcurrentQueue 版本。如下:
csharpnamespace Newbe.RxWorld.DatabaseRepository.Impl
{
public class AutoBatchDatabaseRepository : IDatabaseRepository
{
private readonly ITestOutputHelper _testOutputHelper;
private readonly IDatabase _database;
private readonly Subject<BatchItem> _subject;
public AutoBatchDatabaseRepository(
ITestOutputHelper testOutputHelper,
IDatabase database)
{
_testOutputHelper = testOutputHelper;
_database = database;
_subject = new Subject<BatchItem>();
// 將請求進行分組,每50毫秒一組或者每100個一組
_subject.Buffer(TimeSpan.FromMilliseconds(50), 100)
.Where(x => x.Count > 0)
// 將每組資料呼叫批量插入,寫入資料庫
.Select(list => Observable.FromAsync(() => BatchInsertData(list)))
.Concat()
.Subscribe();
}
// 這裡和前面對比沒有變化
public Task<int> InsertData(int item)
{
var taskCompletionSource = new TaskCompletionSource<int>();
_subject.OnNext(new BatchItem
{
Item = item,
TaskCompletionSource = taskCompletionSource
});
return taskCompletionSource.Task;
}
// 這段和前面也完全一樣,沒有變化
private async Task BatchInsertData(IEnumerable<BatchItem> items)
{
var batchItems = items as BatchItem[] ?? items.ToArray();
try
{
var totalCount = await _database.InsertMany(batchItems.Select(x => x.Item));
foreach (var batchItem in batchItems)
{
batchItem.TaskCompletionSource.SetResult(totalCount);
}
}
catch (Exception e)
{
foreach (var batchItem in batchItems)
{
batchItem.TaskCompletionSource.SetException(e);
}
throw;
}
}
private struct BatchItem
{
public TaskCompletionSource<int> TaskCompletionSource { get; set; }
public int Item { get; set; }
}
}
}
程式碼減少了 50 行,主要原因就是使用 System.Reactive 中提供的很強力的 Buffer 方法實現了 ConcurrentQueue 版本中的複雜的邏輯實現。
老師,可以更給力一點嗎?
我們,可以“稍微”優化一下程式碼,將 Buffer 以及相關的邏輯獨立於“資料庫插入”這個業務邏輯。那麼我們就會得到一個更加簡單的版本:
csharpnamespace Newbe.RxWorld.DatabaseRepository.Impl
{
public class FinalDatabaseRepository : IDatabaseRepository
{
private readonly IBatchOperator<int, int> _batchOperator;
public FinalDatabaseRepository(
IDatabase database)
{
var options = new BatchOperatorOptions<int, int>
{
BufferTime = TimeSpan.FromMilliseconds(50),
BufferCount = 100,
DoManyFunc = database.InsertMany,
};
_batchOperator = new BatchOperator<int, int>(options);
}
public Task<int> InsertData(int item)
{
return _batchOperator.CreateTask(item);
}
}
}
其中 IBatchOperator 等程式碼,讀者可以到程式碼庫中進行檢視,此處就不在陳列了。
效能測試
基本可以測定如下:
在 10 條資料併發操作時,原始版本和批量版本沒有多大區別。甚至批量版本在數量少時會更慢,畢竟其中存在一個最大 50 毫秒的等待時間。
但是,如果需要批量操作併發操作一萬條資料,那麼原始版本可能需要消耗20秒,而批量版本僅僅只需要0.5秒。
所有的示例程式碼均可以在程式碼庫中找到。如果 Github Clone 存在困難,也可以點選此處從 Gitee 進行 Clone
最後但是最重要!
最近作者正在構建以反應式
、Actor模式
和事件溯源
為理論基礎的一套服務端開發框架。希望為開發者提供能夠便於開發出“分散式”、“可水平擴充套件”、“可測試性高”的應用系統——Newbe.Claptrap
本篇文章是該框架的一篇技術選文,屬於技術構成的一部分。如果讀者對該內容感興趣,歡迎轉發、評論、收藏文章以及專案。您的支援是促進專案成功的關鍵。
當前專案已經快要釋出 0.1 alpha 版本,歡迎參與討論。
GitHub 專案地址:https://github.com/newbe36524/Newbe.Claptrap
Gitee 專案地址:https://gitee.com/yks/Newbe.Claptrap
文章作者: newbe36524
本文章著作權歸作者所有,任何形式的轉載都請註明出處。