微軟併發Key-Value儲存庫FASTER介紹
微軟支援併發的Key-Value 儲存庫有C++與C#兩個版本。號稱迄今為止最快的併發鍵值儲存。下面是C#版本翻譯:
FASTER C#可在.NET Framework和.NET Core中執行,並且可以在單執行緒和併發設定中使用。經過測試,可以在Windows和Linux上使用。它公開了一種API,該API可以執行讀取,盲更新(Upserts)和讀取-修改-寫入(RMW)操作的混合。它支援大於記憶體的資料,並接受IDevice將日誌儲存在檔案中的實現。提供了IDevice本地檔案系統的實現,也可以寫入遠端檔案系統。或者將遠端儲存對映到本地檔案系統中。FASTER可以用作傳統併發資料結構類似ConcurrentDictionary的高效能替代品,並且還支援大於記憶體的資料。它支援增量或非增量資料結構型別的檢查點。
FASTER支援三種基本操作:
- Read:從鍵值儲存中讀取資料
- Upsert:將值盲目向上插入到儲存中(不檢查先前的值)
- Read-Modify-Write:更新儲存區中的值,用於實現“求和”和“計數”之類的操作。
構建
在例項化FASTER之前,您需要建立FASTER將使用的儲存裝置。如果使用的是可移植型別(byte、int、double)型別,則僅需要混合日誌裝置。如果使用物件,則需要建立一個單獨的物件日誌裝置。
IDevice log = Devices.CreateLogDevice("C:\\Temp\\hybridlog_native.log");
然後,按如下方式建立一個FASTER例項:
fht = new FasterKV<Key, Value, Input, Output, Empty, Functions>
(1L << 20, new Functions(), new LogSettings { LogDevice = log });
建構函式的型別引數
有六個基本概念,在例項化FASTER時作為通用型別引數提供:
- Key:這是鍵的型別,例如long。
- Value:這是儲存在FASTER中的值的型別。
- Input:這是呼叫Read或RMW時提供給FASTER的輸入型別。它可以被視為讀取或RMW操作的引數。例如,對於RMW,可是增量累加到值。
- Output:這是讀操作的輸出型別,將值的相關部分複製到輸出。
- Context:操作的使用者定義上下文,如果沒有必要使用Empty。
- Functions:需要回調時,使用IFunctions<>呼叫。
回撥函式
使用者提供一個例項化IFunctions<>。此型別封裝了所有回撥,下面將對其進行介紹:
- SingleReader和併發讀ConcurrentReader:這些用於讀取儲存值並將它們複製到Output。單個讀取器可以假定沒有併發操作。
- SingleWriter和ConcurrentWriter:這些用於將值從源值寫入儲存。
- Completion callbacks完成回撥:各種操作完成時呼叫。
- RMWUpdaters:使用者指定了三個更新器,InitialUpdater,InPlaceUpdater和CopyUpdater。它們一起用於實現RMW操作。
- Hash Table Siz雜湊表大小:這是分配給FASTER的儲存行數,其中每個行為64位元組。
- LogSettings 日誌設定:這些設定與日誌的大小、裝置。
- Checkpoint設定:這些是與檢查相關的設定,例如檢查型別和資料夾。
- Serialization序列化設定:用於為鍵和值型別提供自定義序列化程式。序列化程式實現IObjectSerializer<Key>鍵和IObjectSerializer<Value>值。只有C#類物件非可移植型別才需要這些。
- Key比較器:用於為key提供更好的比較器IFasterEqualityComparer<Key>。
建構函式引數
FASTER的總記憶體佔用量由以下引數控制:
- 雜湊表大小:此引數(第一個建構函式引數)乘以64是記憶體中雜湊表的大小(以位元組為單位)。
- 日誌大小:logSettings.MemorySizeBits表示混合日誌的記憶體部分的大小(以位為單位)。換句話說對於引數設定B,日誌的大小為2 ^ B位元組。如果日誌指向類物件,則此大小不包括物件的大小,因為FASTER無法訪問此資訊。日誌的較舊部分溢位到儲存中。
Sessions (Threads)會話(執行緒)
例項化FASTER之後,執行緒可以使用Session來使用FASTER
fht.StartSession();
fht.StopSession();
當所有執行緒都在FASTER上完成操作後,您最終銷燬FASTER例項:
fht.Dispose();
示例
以下是一個簡單示例,其中所有資料都在記憶體中,因此我們不必擔心掛起的I / O操作。在此示例中也沒有檢查點。
public static void Test()
{
var log = Devices.CreateLogDevice("C:\\Temp\\hlog.log");
var fht = new FasterKV<long, long, long, long, Empty, Funcs>
(1L << 20, new Funcs(), new LogSettings { LogDevice = log });
fht.StartSession();
long key = 1, value = 1, input = 10, output = 0;
fht.Upsert(ref key, ref value, Empty.Default, 0);
fht.Read(ref key, ref input, ref output, Empty.Default, 0);
Debug.Assert(output == value);
fht.RMW(ref key, ref input, Empty.Default, 0);
fht.RMW(ref key, ref input, Empty.Default, 0);
fht.Read(ref key, ref input, ref output, Empty.Default, 0);
Debug.Assert(output == value + 20);
fht.StopSession();
fht.Dispose();
log.Close();
}
此示例的函式:
public class Funcs : IFunctions<long, long, long, long, Empty>
{
public void SingleReader(ref long key, ref long input, ref long value, ref long dst) => dst = value;
public void SingleWriter(ref long key, ref long src, ref long dst) => dst = src;
public void ConcurrentReader(ref long key, ref long input, ref long value, ref long dst) => dst = value;
public void ConcurrentWriter(ref long key, ref long src, ref long dst) => dst = src;
public void InitialUpdater(ref long key, ref long input, ref long value) => value = input;
public void CopyUpdater(ref long key, ref long input, ref long oldv, ref long newv) => newv = oldv + input;
public void InPlaceUpdater(ref long key, ref long input, ref long value) => value += input;
public void UpsertCompletionCallback(ref long key, ref long value, Empty ctx) { }
public void ReadCompletionCallback(ref long key, ref long input, ref long output, Empty ctx, Status s) { }
public void RMWCompletionCallback(ref long key, ref long input, Empty ctx, Status s) { }
public void CheckpointCompletionCallback(Guid sessionId, long serialNum) { }
}
更多例子
檢查點和恢復
FASTER支援基於檢查點的恢復。每個新的檢查點都會保留(或使之持久)其他使用者操作(讀取,更新或RMW)。FASTER允許客戶端執行緒跟蹤已持久的操作和未使用基於會話的API的操作。
回想一下,每個FASTER執行緒都會啟動一個與唯一的Guid相關聯的會話。所有FASTER執行緒操作(讀取,Upsert,RMW)都帶有單調序列號。在任何時間點,都可以呼叫Checkpoint以啟動FASTER的非同步檢查點。在呼叫之後Checkpoint,(最終)向每個FASTER執行緒通知一個序列號,這樣可以確保直到該序列號之前的所有操作以及在該序列號之後沒有任何操作被保留為該檢查點的一部分。FASTER執行緒可以使用此序列號來清除等待執行的操作的任何記憶體緩衝區。
在恢復期間,執行緒可以使用繼續使用相同的Guid進行會話ContinueSession。該函式返回執行緒本地序列號,直到恢復該會話雜湊為止。從那時起,新執行緒可以使用此資訊來重播所有未提交的操作。
下面一個單執行緒的簡單恢復示例。
public class PersistenceExample
{
private FasterKV<long, long, long, long, Empty, Funcs> fht;
private IDevice log;
public PersistenceExample()
{
log = Devices.CreateLogDevice("C:\\Temp\\hlog.log");
fht = new FasterKV<long, long, long, long, Empty, Funcs>
(1L << 20, new Funcs(), new LogSettings { LogDevice = log });
}
public void Run()
{
IssuePeriodicCheckpoints();
RunSession();
}
public void Continue()
{
fht.Recover();
IssuePeriodicCheckpoints();
ContinueSession();
}
/* Helper Functions */
private void RunSession()
{
Guid guid = fht.StartSession();
System.IO.File.WriteAllText(@"C:\\Temp\\session1.txt", guid.ToString());
long seq = 0; // sequence identifier
long key = 1, input = 10;
while(true)
{
key = (seq % 1L << 20);
fht.RMW(ref key, ref input, Empty.Default, seq);
seq++;
}
// fht.StopSession() - outside infinite loop
}
private void ContinueSession()
{
string guidText = System.IO.File.ReadAllText(@"C:\\Temp\session1.txt");
Guid sessionGuid = Guid.Parse(guidText);
long seq = fht.ContinueSession(sessionGuid); // recovered seq identifier
seq++;
long key = 1, input = 10;
while(true)
{
key = (seq % 1L << 20);
fht.RMW(ref key, ref input, Empty.Default, seq);
seq++;
}
}
private void IssuePeriodicCheckpoints()
{
var t = new Thread(() =>
{
while(true)
{
Thread.Sleep(10000);
fht.StartSession();
fht.TakeCheckpoint(out Guid token);
fht.CompleteCheckpoint(token, true);
fht.StopSession();
}
});
t.Start();
}
}
FASTER支援兩種檢查點概念:“快照”和“摺疊”。前者是將記憶體中的完整快照複製到一個單獨的快照檔案中,而後者是自上一個檢查點以來更改的增量檢查點。摺疊有效地將混合日誌的只讀標記移到尾部,因此所有資料都作為同一混合日誌的一部分保留(沒有單獨的快照檔案)。所有後續更新均寫入新的混合日誌尾部位置,這使Fold-Over具有增量性質。
專案路徑:
https://github.com/Microsoft/FASTER/tree/master/cs