時序資料庫DolphinDB外掛開發教程
技術標籤:使用教程DolphinDB時序資料庫資料庫量化金融工業物聯網
DolphinDB支援動態載入外部外掛,以擴充套件系統功能。外掛用C++編寫,需要編譯成".so"或".dll"共享庫檔案。本文著重介紹開發外掛的方法和注意事項,並詳細介紹以下幾個具體場景的外掛開發流程:
- 如何開發支援時間序列資料處理的外掛函式
- 如何開發用於處理分散式SQL的聚合函式
- 如何開發支援新的分散式演算法的外掛函式
- 如何開發支援流資料處理的外掛函式
- 如何開發支援外部資料來源的外掛函式
1. 如何開發外掛
1.1 基本概念
DolphinDB的外掛實現了能在指令碼中呼叫的函式。一個外掛函式可能是運算子函式(Operator function),也可能是系統函式(System function),它們的區別在於,前者接受的引數個數小於等於2,而後者的函式可以接受任意個引數,並支援會話的訪問操作。
開發一個運算子函式,需要編寫一個原型為ConstantSP (const ConstantSP& a, const ConstantSP& b)
的C++函式。當函式引數個數為2時,a
和b
分別為外掛函式的第一和第二個引數;當引數個數為1時,b
是一個佔位符,沒有實際用途;當沒有引數時,a
和b
均為佔位符。
開發一個系統函式,需要編寫一個原型為ConstantSP (Heap* heap, vector<ConstantSP>& args)
的C++函式。使用者在DolphinDB中呼叫外掛函式時傳入的引數,都按順序儲存在C++的向量args
中。heap
引數不需要使用者傳入。
函式原型中的ConstantSP
可以表示絕大多數DolphinDB物件(標量、向量、矩陣、表,等等)。其他常用的派生自它的變數型別有VectorSP
(向量)、TableSP
(表)等。
1.2 建立變數
建立標量,可以直接用new
語句建立標頭檔案ScalarImp.h
中宣告的型別物件,並將它賦值給一個ConstantSP
。ConstantSP
是一個經過封裝的智慧指標,會在變數的引用計數為0時自動釋放記憶體,因此,使用者不需要手動delete
已經建立的變數:
ConstantSP i = new Int(1); // 相當於1i ConstantSP d = new Date(2019, 3, 14); // 相當於2019.03.14 ConstantSP s = new String("DolphinDB"); // 相當於"DolphinDB" ConstantSP voidConstant = new Void(); // 建立一個void型別變數,常用於表示空的函式引數
標頭檔案Util.h
聲明瞭一系列函式,用於快速建立某個型別和格式的變數:
VectorSP v = Util::createVector(DT_INT, 10); // 建立一個初始長度為10的int型別向量
v->setInt(0, 60); // 相當於v[0] = 60
VectorSP t = Util::createVector(DT_ANY, 0); // 建立一個初始長度為0的any型別向量(元組)
t->append(new Int(3)); // 相當於t.append!(3)
t->get(0)->setInt(4); // 相當於t[0] = 4
// 這裡不能用t->setInt(0, 4),因為t是一個元組,setInt(0, 4)只對int型別的向量有效
ConstantSP seq = Util::createIndexVector(5, 10); // 相當於5..14
int seq0 = seq->getInt(0); // 相當於seq[0]
ConstantSP mat = Util::createDoubleMatrix(5, 10);// 建立一個10行5列的double型別矩陣
mat->setColumn(3, seq); // 相當於mat[3] = seq
1.3 異常處理和引數校驗
1.3.1 異常處理
外掛開發時的異常丟擲和處理,和一般C++開發中一樣,都通過throw
關鍵字丟擲異常,try
語句塊處理異常。DolphinDB在標頭檔案Exceptions.h
中聲明瞭異常型別。
外掛函式若遇到執行時錯誤,一般丟擲RuntimeException
。
在外掛開發時,通常會校驗函式引數,如果引數不符合要求,丟擲一個IllegalArgumentException
。常用的引數校驗函式有:
ConstantSP->getType()
:返回變數的型別(int, char, date等等),DolphinDB的型別定義在標頭檔案Types.h
中。ConstantSP->getCategory()
:返回變數的類別,常用的類別有INTEGRAL(整數型別,包括int, char, short, long等)、FLOATING(浮點數型別,包括float, double等)、TEMPORAL(時間型別,包括time, date, datetime等)、LITERAL(字串型別,包括string, symbol等),都定義在標頭檔案Types.h
中。ConstantSP->getForm()
:返回變數的格式(標量、向量、表等等),DolphinDB的格式定義在標頭檔案Types.h
中。ConstantSP->isVector()
:判斷變數是否為向量。ConstantSP->isScalar()
:判斷變數是否為標量。ConstantSP->isTable()
:判斷變數是否為表。ConstantSP->isNumber()
:判斷變數是否為數字型別。ConstantSP->isNull()
:判斷變數是否為空值。ConstantSP->getInt()
:獲得變數對應的整數值,常用於判斷邊界。ConstantSP->getString()
:獲得變數對應的字串。ConstantSP->size()
:獲得變數的長度。
更多引數校驗函式一般在標頭檔案CoreConcept.h
的Constant
類方法中。
1.3.2 引數校驗的範例
本節將開發一個外掛函式用於求非負整數的階乘,返回一個long型別變數。
DolphinDB中long型別的最大值為2^63 - 1
,能表示的階乘最大為25!
,因此只有0~25
範圍內的引數是合法的。
#include "CoreConcept.h"
#include "Exceptions.h"
#include "ScalarImp.h"
ConstantSP factorial(const ConstantSP &n, const ConstantSP &placeholder) {
string syntax = "Usage: factorial(n). ";
if (!n->isScalar() || n->getCategory() != INTEGRAL)
throw IllegalArgumentException("factorial", syntax + "n must be an integral scalar.");
int nValue = n->getInt();
if (nValue < 0 || nValue > 25)
throw IllegalArgumentException("factorial", syntax + "n must be a non-negative integer less than 26.");
long long fact = 1;
for (int i = nValue; i > 0; i--)
fact *= i;
return new Long(fact);
}
1.4 呼叫DolphinDB內建函式
有時會需要呼叫DolphinDB的內建函式對資料進行處理。有些類已經定義了一些常用的內建函式作為方法:
VectorSP v = Util::createIndexVector(1, 100);
ConstantSP avg = v->avg(); // 相當於avg(v)
ConstantSP sum2 = v->sum2(); // 相當於sum2(v)
v->sort(false); // 相當於sort(v, false)
如果需要呼叫其它內建函式,外掛函式的型別必須是系統函式。通過heap->currentSession()->getFunctionDef
函式獲得一個內建函式,然後用call
方法呼叫它。如果該內建函式是運算子函式,應呼叫原型call(Heap, const ConstantSP&, const ConstantSP&)
;如果是系統函式,應呼叫原型call(Heap, vector<ConstantSP>&)
。以下是呼叫內建函式cumsum
的一個例子:
ConstantSP v = Util::createIndexVector(1, 100);
v->setTemporary(false); // v的值可能在內建函式呼叫時被修改。如果不希望它被修改,應先呼叫setTemporary(false)
FunctionDefSP cumsum = heap->currentSession()->getFunctionDef("cumsum");
ConstantSP result = cumsum->call(heap, v, new Void()); // 相當於cumsum(v),這裡的new Void()是一個佔位符,沒有實際用途
2. 如何開發支援時間序列資料處理的外掛函式
DolphinDB的特色之一在於它對時間序列有良好支援。
本章以編寫一個msum函式的外掛為例,介紹如何開發外掛函式支援時間序列資料處理。
時間序列處理函式通常接受向量作為引數,並對向量中的每個元素進行計算處理。在本例中,msum
函式接受兩個引數:一個向量和一個視窗大小。它的原型是:
ConstantSP msum(const ConstantSP &X, const ConstantSP &window);
msum
函式的返回值是一個和輸入向量同樣長度的向量。本例為簡便起見,假定返回值是一個double
型別的向量。可以通過Util::createVector
函式預先為返回值分配空間:
int size = X->size();
int windowSize = window->getInt();
ConstantSP result = Util::createVector(DT_DOUBLE, size);
在DolphinDB外掛編寫時處理向量,可以迴圈使用getDoubleConst
,getIntConst
等函式,批量獲得一定長度的只讀資料,儲存在相應型別的緩衝區中,從緩衝區中取得資料進行計算。這樣做的效率比迴圈使用getDouble
,getInt
等函式要高。本例為簡便起見,統一使用getDoubleConst
,每次獲得長度為Util::BUF_SIZE
的資料。這個函式返回一個const double*
,指向緩衝區頭部:
double buf[Util::BUF_SIZE];
INDEX start = 0;
while (start < size) {
int len = std::min(Util::BUF_SIZE, size - start);
const double *p = X->getDoubleConst(start, len, buf);
for (int i = 0; i < len; i++) {
double val = p[i];
// ...
}
start += len;
}
在本例中,msum
將計算X
中長度為windowSize
的視窗中所有資料的和。可以用一個臨時變數tmpSum
記錄當前視窗的和,每當視窗移動時,只要給tmpSum
增加新視窗尾部的值,減去舊視窗頭部的值,就能計算得到當前視窗中資料的和。為了將計算值寫入result
,可以迴圈用result->getDoubleBuffer
獲取一個可讀寫的緩衝區,寫完後使用result->setDouble
函式將緩衝區寫回陣列。setDouble
函式會檢查給定的緩衝區地址和變數底層儲存的地址是否一致,如果一致就不會發生資料拷貝。在多數情況下,用getDoubleBuffer
獲得的緩衝區就是變數實際的儲存區域,這樣能減少資料拷貝,提高效能。
需要注意的是,DolphinDB用double
型別的最小值(已經定義為巨集DBL_NMIN
)表示double
型別的NULL
值,要專門判斷。
返回值的前windowSize - 1
個元素為NULL
。可以對X
中的前windowSize
個元素和之後的元素用兩個迴圈分別處理,前一個迴圈只計算累加,後一個迴圈執行加和減的操作。最終的實現如下:
ConstantSP msum(const ConstantSP &X, const ConstantSP &window) {
INDEX size = X->size();
int windowSize = window->getInt();
ConstantSP result = Util::createVector(DT_DOUBLE, size);
double buf[Util::BUF_SIZE];
double windowHeadBuf[Util::BUF_SIZE];
double resultBuf[Util::BUF_SIZE];
double tmpSum = 0.0;
INDEX start = 0;
while (start < windowSize) {
int len = std::min(Util::BUF_SIZE, windowSize - start);
const double *p = X->getDoubleConst(start, len, buf);
double *r = result->getDoubleBuffer(start, len, resultBuf);
for (int i = 0; i < len; i++) {
if (p[i] != DBL_NMIN) // p[i] is not NULL
tmpSum += p[i];
r[i] = DBL_NMIN;
}
result->setDouble(start, len, r);
start += len;
}
result->setDouble(windowSize - 1, tmpSum); // 上一個迴圈多設定了一個NULL,填充為tmpSum
while (start < size) {
int len = std::min(Util::BUF_SIZE, size - start);
const double *p = X->getDoubleConst(start, len, buf);
const double *q = X->getDoubleConst(start - windowSize, len, windowHeadBuf);
double *r = result->getDoubleBuffer(start, len, resultBuf);
for (int i = 0; i < len; i++) {
if (p[i] != DBL_NMIN)
tmpSum += p[i];
if (q[i] != DBL_NMIN)
tmpSum -= q[i];
r[i] = tmpSum;
}
result->setDouble(start, len, r);
start += len;
}
return result;
}
3. 如何開發用於處理分散式SQL的聚合函式
在DolphinDB中,SQL的聚合函式通常接受一個或多個向量作為引數,最終返回一個標量。在開發聚合函式的外掛時,需要了解如何訪問向量中的元素。
DolphinDB中的向量有兩種儲存方式。一種是常規陣列,資料在記憶體中連續儲存;另一種是大陣列,其中的資料分塊儲存。
本章將以編寫一個求幾何平均數的函式為例,介紹如何開發聚合函式,重點關注陣列中元素的訪問。
3.1 聚合函式範例
幾何平均數geometricMean
函式接受一個向量作為引數。為了防止溢位,一般採用其對數形式計算,即
geometricMean([x1, x2, ..., xn])
= exp((log(x1) + log(x2) + log(x3) + ... + log(xn))/n)
為了實現這個函式的分散式版本,可以先開發聚合函式外掛logSum
,用以計算某個分割槽上的資料的對數和,然後用defg
關鍵字定義一個Reduce函式,用mapr
關鍵字定義一個MapReduce函式。
在DolphinDB外掛開發中,對陣列的操作通常要考慮它是常規陣列還是大陣列。可以用isFastMode
函式判斷:
ConstantSP logSum(const ConstantSP &x, const ConstantSP &placeholder) {
if (((VectorSP) x)->isFastMode()) {
// ...
}
else {
// ...
}
}
如果陣列是常規陣列,它在記憶體中連續儲存。可以用getDataArray
函式獲得它資料的指標。假定資料是以double
型別儲存的:
if (((VectorSP) x)->isFastMode()) {
int size = x->size();
double *data = (double *) x->getDataArray();
double logSum = 0;
for (int i = 0; i < size; i++) {
if (data[i] != DBL_NMIN) // is not NULL
logSum += std::log(data[i]);
}
return new Double(logSum);
}
如果資料是大陣列,它在記憶體中分塊儲存。可以用getSegmentSize
獲得每個塊的大小,用getDataSegment
獲得首個塊的地址。它返回一個二級指標,指向一個指標陣列,這個陣列中的每個元素指向每個塊的資料陣列:
// ...
else {
int size = x->size();
int segmentSize = x->getSegmentSize();
double **segments = (double **) x->getDataSegment();
INDEX start = 0;
int segmentId = 0;
double logSum = 0;
while (start < size) {
double *block = segments[segmentId];
int blockSize = std::min(segmentSize, size - start);
for (int i = 0; i < blockSize; i++) {
if (block[i] != DBL_NMIN) // is not NULL
logSum += std::log(block[i]);
}
start += blockSize;
segmentId++;
}
return new Double(logSum);
}
在實際開發中,陣列的底層儲存不一定是double
型別。使用者需要考慮具體型別。本例採用了泛型程式設計統一處理不同型別,具體程式碼參見附件。
3.2 在DolphinDB中呼叫函式
通常需要實現一個聚合函式的非分散式版本和分散式版本,系統會基於哪個版本更高效來選擇呼叫這個版本。
在DolphinDB中定義非分散式的geometricMean函式:
def geometricMean(x) {
return exp(logSum::logSum(x) \ count(x))
}
然後通過定義Map和Reduce函式,最終用mapr
定義分散式的版本:
def geometricMeanMap(x) {
return logSum::logSum(x)
}
defg geometricMeanReduce(myLogSum, myCount) {
return exp(sum(myLogSum) \ sum(myCount))
}
mapr geometricMean(x) { geometricMeanMap(x), count(x) -> geometricMeanReduce }
這樣就實現了geometricMean
函式。
如果是在單機環境中執行這個函式,只需要在執行的節點上載入外掛。如果有資料位於遠端節點,需要在每一個遠端節點載入外掛。可以手動在每個節點執行loadPlugin
函式,也可以用以下指令碼快速在每個節點上載入外掛:
each(rpc{, loadPlugin, pathToPlugin}, getDataNodes())
通過以下指令碼建立一個分割槽表,驗證函式:
db = database("", VALUE, 1 2 3 4)
t = table(take(1..4, 100) as id, rand(1.0, 100) as val)
t0 = db.createPartitionedTable(t, `tb, `id)
t0.append!(t)
select geometricMean(val) from t0 group by id
3.3 隨機訪問大陣列
可以對大陣列進行隨機訪問,但要經過下標計算。用getSegmentSizeInBit
函式獲得塊大小的二進位制位數,通過位運算獲得塊的偏移量和塊內偏移量:
int segmentSizeInBit = x->getSegmentSizeInBit();
int segmentMask = (1 << segmentSizeInBit) - 1;
double **segments = (double **) x->getDataSegment();
int index = 3000000; // 想要訪問的下標
double result = segments[index >> segmentSizeInBit][index & segmentMask];
// ^ 塊的偏移量 ^ 塊內偏移量
3.4 應該選擇哪種方式訪問向量
上一章介紹了通過getDoubleConst
,getIntConst
等一族方法獲得只讀緩衝區,以及通過getDoubleBuffer
,getIntBuffer
等一族方法獲得可讀寫緩衝區,這兩種訪問向量的方法。本章介紹了通過getDataArray
和getDataSegment
方法直接訪問向量的底層儲存。在實際開發中,前一種方法更通用,一般應該選擇前一種方法。但在某些特別的場合(例如明確知道資料儲存在大陣列中,且知道資料的型別),可以採用第二種方法。
4. 如何開發支援新的分散式演算法的外掛函式
在DolphinDB中,Map-Reduce是執行分散式演算法的通用計算框架。DolphinDB提供了mr函式和imr函式,使使用者能通過指令碼實現分散式演算法。而在編寫分散式演算法的外掛時,使用的同樣是這兩個函式。本章主要介紹如何用C++語言編寫自定義的map, reduce等函式,並呼叫mr和imr兩個函式,最終實現分散式計算。
4.1 分散式演算法範例
本章將以mr
為例,實現一個函式,求分散式表中相應列名的所有列平均值,介紹編寫DolphinDB 分散式演算法外掛的整體流程,及需要注意的技術細節。
在外掛開發中,使用者自定義的map, reduce, final, term函式,可以是運算子函式,也可以是系統函式。
本例的map函式,對錶的一個分割槽內對應列名的列做計算,返回一個長度為2的元組,分別包含資料的和,及資料非空元素的個數。具體實現如下:
ConstantSP columnAvgMap(Heap *heap, vector<ConstantSP> &args) {
TableSP table = args[0];
ConstantSP colNames = args[1];
double sum = 0.0;
int count = 0;
for (int i = 0; i < colNames->size(); i++) {
string colName = colNames->getString(i);
VectorSP col = table->getColumn(colName);
sum += col->sum()->getDouble();
count += col->count();
}
ConstantSP result = Util::createVector(DT_ANY, 2);
result->set(0, new Double(sum));
result->set(1, new Int(count));
return result;
}
本例的reduce函式,是對map結果的相加。DolphinDB的內建函式add
就提供了這個功能,可以用heap->currentSession()->getFunctionDef("add")
獲得這個函式:
FunctionDefSP reduceFunc = heap->currentSession()->getFunctionDef("add");
本例的final函式,是對reduce結果中的資料總和sum
和非空元素個數count
做除法,求得所有分割槽中對應列的平均數。具體實現如下:
ConstantSP columnAvgFinal(const ConstantSP &result, const ConstantSP &placeholder) {
double sum = result->get(0)->getDouble();
int count = result->get(1)->getInt();
return new Double(sum / count);
}
定義了map, reduce, final等函式後,將它們匯出為外掛函式(在標頭檔案的函式宣告前加上extern "C"
,並在載入外掛的文字檔案中列出這些函式),然後通過heap->currentSession->getFunctionDef
獲取這些函式,就能以這些函式為引數呼叫mr
函式。如:
FunctionDefSP mapFunc = Heap->currentSession()->getFunctionDef("columnAvg::columnAvgMap");
在本例中,map函式接受兩個引數table
和colNames
,但mr
只允許map函式有一個引數,因此需要以部分應用的形式呼叫map函式,可以用Util::createPartialFunction
將它包裝為部分應用,實現如下:
vector<ConstantSP> mapWithColNamesArgs {new Void(), colNames};
FunctionDefSP mapWithColNames = Util::createPartitalFunction(mapFunc, mapWithColNamesArgs);
用heap->currentSession()->getFunctionDef("mr")
獲得系統內建函式mr
,呼叫mr->call
方法,就相當於在DolphinDB指令碼中呼叫mr
函式。最後實現的columnAvg函式定義如下:
ConstantSP columnAvg(Heap *heap, vector<ConstantSP> &args) {
ConstantSP ds = args[0];
ConstantSP colNames = args[1];
FunctionDefSP mapFunc = heap->currentSession()->getFunctionDef("columnAvg::columnAvgMap");
vector<ConstantSP> mapWithColNamesArgs = {new Void(), colNames};
FunctionDefSP mapWithColNames = Util::createPartialFunction(mapFunc, mapWithColNamesArgs); // columnAvgMap{, colNames}
FunctionDefSP reduceFunc = heap->currentSession()->getFunctionDef("add");
FunctionDefSP finalFunc = heap->currentSession()->getFunctionDef("columnAvg::columnAvgFinal");
FunctionDefSP mr = heap->currentSession()->getFunctionDef("mr"); // mr(ds, columnAvgMap{, colNames}, add, columnAvgFinal)
vector<ConstantSP> mrArgs = {ds, mapWithColNames, reduceFunc, finalFunc};
return mr->call(heap, mrArgs);
}
4.2 在DolphinDB中呼叫函式
如果是在單機環境中執行這個函式,只需要在執行的節點上載入外掛。但如果有資料位於遠端節點,需要在每一個遠端節點載入外掛。可以手動在每個節點執行loadPlugin
函式,也可以用以下指令碼快速在每個節點上載入外掛:
each(rpc{, loadPlugin, pathToPlugin}, getDataNodes())
載入外掛後,用sqlDS
函式生成資料來源,並呼叫函式:
n = 100
db = database("dfs://testColumnAvg", VALUE, 1..4)
t = db.createPartitionedTable(table(10:0, `id`v1`v2, [INT,DOUBLE,DOUBLE]), `t, `id)
t.append!(table(take(1..4, n) as id, rand(10.0, n) as v1, rand(100.0, n) as v2))
ds = sqlDS(<select * from t>)
columnAvg::columnAvg(ds, `v1`v2)
5.如何開發支援流資料處理的外掛函式
在DolphinDB中,流資料訂閱端可以通過一個handler函式處理收到的資料。訂閱資料可以是一個數據表,或一個元組,由subsrciebeTable
函式的msgAsTable
引數決定。通常可以用handler函式對流資料進行過濾、插入另一張表等操作。
本章將編寫一個handler函式。它接受的訊息型別是元組。另外接受兩個引數:一個是int型別的標量或向量indices
,表示元組中元素的下標,另一個是一個表table
。它將元組中對應下標的列插入到表中。
向表中新增資料的介面是bool append(vector<ConstantSP>& values, INDEX& insertedRows, string& errMsg)
,如果插入成功,返回true
,並向insertedRows
中寫入插入的行數。否則返回false
,並在errMsg
中寫入出錯資訊。外掛的實現如下:
ConstantSP handler(Heap *heap, vector<ConstantSP> &args) {
ConstantSP indices = args[0];
TableSP table = args[1];
ConstantSP msg = args[2];
vector<ConstantSP> msgToAppend;
for (int i = 0; i < indices->size(); i++) {
int index = indices->get(i);
msgToAppend.push_back(msg->get(index));
}
INDEX insertedRows;
string errMsg;
table->append(msgToAppend, insertedRows, errMsg);
return new Void();
}
在實際應用中,可能需要知道插入出錯時的原因。可以引入標頭檔案Logger.h
,將出錯資訊寫入日誌中。注意需要在編譯外掛時加上巨集定義-DLOGGING_LEVEL_2
:
// ...
bool success = table->append(msgToAppend, insertedRows, errMsg);
if (!success)
LOG_ERR("Failed to append to table: ", errMsg);
可以用以下指令碼模擬流資料寫入,驗證handler函式:
loadPlugin("/path/to/PluginHandler.txt")
share streamTable(10:0, `id`sym`timestamp, [INT,SYMBOL,TIMESTAMP]) as t0
t1 = table(10:0, `sym`timestamp, [SYMBOL,TIMESTAMP])
subscribeTable(, `t0, , , handler::handler{[1,2], t1})
t0.append!(table(1..100 as id, take(`a`b`c`d, 100) as symbol, now() + 1..100 as timestamp))
select * from t1
6.如何開發支援外部資料來源的外掛函式
在為第三方資料設計可擴充套件的介面外掛時,有幾個需要關注的問題:
- 資料來源(Data source)。資料來源是一個特殊的資料物件,包含了資料實體的元描述,執行一個數據源能獲得資料實體,可能是表、矩陣、向量等等。使用者可以提供資料來源呼叫
olsEx
,randomForestClassifier
等分散式計算函式,也可以呼叫mr
,imr
或ComputingModel.h
中定義的更底層的計算模型做平行計算。DolphinDB的內建函式sqlDS
就通過SQL表示式獲取資料來源。在設計第三方資料介面時,通常需要實現一個獲取資料來源的函式,它將大的檔案分成若干個部分,每部分都表示資料的一個子集,最後返回一個數據源的元組。資料來源一般用一個Code object表示,是一個函式呼叫,它的引數是元資料,返回一個表。 - 結構(Schema)。表的結構描述了表的列數,每一列的列名和資料型別。第三方介面通常需要實現一個函式,快速獲得資料的表結構,以便使用者在這個結構的基礎上調整列名和列的資料型別。
- IO問題。在多核多CPU的環境中,IO可能成為瓶頸。DolphinDB提供了抽象的IO介面,
DataInputStream
和DataOutputStream
,這些介面封裝了資料壓縮,Endianness,IO型別(網路,磁碟,buffer等)等細節,方便開發。此外還特別實現了針對多執行緒的IO實現,BlockFileInputStream
和BlockFileOutputStream
。這個實現有兩個優點:
- 實現計算和IO並行。A執行緒在處理資料的時候,後臺執行緒在非同步幫A執行緒預讀取後面需要的資料。
- 避免了多執行緒的磁碟競爭。當執行緒個數增加的時候,如果並行往同一個磁碟上讀寫,效能會急劇下降。這個實現,會對同一個磁碟的讀寫序列化,從而提高吞吐量。
本章將介紹通常需要實現的幾個函式,為設計第三方資料介面提供一個簡單的範例。
6.1 資料格式描述
假定本例中的資料儲存在平面檔案資料庫,以二進位制格式按行儲存,資料從檔案頭部直接開始儲存。每行有四列,分別為id(按有符號64位長整型格式儲存,8位元組),symbol(按C字串格式儲存,8位元組),date(按BCD碼格式儲存,8位元組),value(按IEEE 754標準的雙精度浮點數格式儲存,8位元組),每行共32位元組。以下是一行的例子:
這一行的十六進位制表示為:
0x 00 00 00 00 00 00 00 05
0x 49 42 4D 00 00 00 00 00
0x 02 00 01 09 00 03 01 03
0x 40 24 33 33 33 33 33 33
6.2 extractMyDataSchema函式
這個函式提取資料檔案的表結構。在本例中,表結構是確定的,不需要實際讀取檔案。該函式提供了一個如何生成表結構的範例。它通過Util::createTable
函式建立一張結構表:
ConstantSP extractMyDataSchema(const ConstantSP &placeholderA, const ConstantSP &placeholderB) {
ConstantSP colNames = Util::createVector(DT_STRING, 4);
ConstantSP colTypes = Util::createVector(DT_STRING, 4);
string names[] = {"id", "symbol", "date", "value"};
string types[] = {"LONG", "SYMBOL", "DATE", "DOUBLE"};
colNames->setString(0, 4, names);
colTypes->setString(0, 4, types);
vector<ConstantSP> schema = {colNames, colTypes};
vector<string> header = {"name", "type"};
return Util::createTable(header, schema);
}
在實際開發中,可能需要以讀取檔案頭等方式獲得表結構。如何讀檔案將在後面介紹。
6.3 loadMyData函式
loadMyData
函式讀取檔案,並輸出一張DolphinDB表。給定一個檔案的路徑,可以通過Util::createBlockFileInputStream
建立一個輸入流,此後,可對這個流呼叫readBytes
函式讀取給定長度的位元組,readBool
讀取下一個bool
值,readInt
讀取下一個int
值,等等。本例給loadMyData
函式設計的語法為:loadMyData(path, [start], [length])
。除了接受檔案路徑path
,還接受兩個int
型別的引數start
和length
,分別表示開始讀取的行數和需要讀取的總行數。createBlockFileInputStream
函式可以通過引數決定開始讀取的位元組數和需要讀取的總位元組數:
ConstantSP loadMyData(Heap *heap, vector<ConstantSP> &args) {
ConstantSP path = args[0];
long long fileLength = Util::getFileLength(path->getString());
size_t bytesPerRow = 32;
int start = args.size() >= 2 ? args[1]->getInt() : 0;
int length = args.size() >= 3 ? args[2]->getInt() : fileLength / bytesPerRow - start;
DataInputStreamSP inputStream = Util::createBlockFileInputStream(path->getString(), 0, fileLength, Util::BUF_SIZE, start * bytesPerRow, length * bytesPerRow);
char buf[Util::BUF_SIZE];
size_t actualLength;
while (true) {
inputStream->readBytes(buf, Util::BUF_SIZE, actualLength);
if (actualLength <= 0)
break;
// ...
}
}
在讀取資料時,通常將資料快取到陣列中,等待緩衝區滿後批量插入。例如,假定要讀取一個內容全為char
型別位元組的二進位制檔案,將它寫入一個char
型別的DolphinDB向量vec
。最後返回只由vec
一列組成的表:
char buf[Util::BUF_SIZE];
VectorSP vec = Util::createVector(DT_CHAR, 0);
size_t actualLength;
while (true) {
inputStream->readBytes(buf, Util::BUF_SIZE, actualLength);
if (actualLength <= 0)
break;
vec->appendChar(buf, actualLength);
}
vector<ConstantSP> cols = {vec};
vector<string> colNames = {"col0"};
return Util::createTable(colNames, cols);
本節的完整程式碼請參考附件中的程式碼。在實際開發中,載入資料的函式可能還會接受表結構引數schema
,按實際需要改變讀取的資料型別。
6.4loadMyDataEx
函式
loadMyData
函式總是將資料載入到記憶體,當資料檔案非常龐大時,工作機的記憶體很容易成為瓶頸。所以設計loadMyDataEx
函式解決這個問題。它通過邊匯入邊儲存的方式,把靜態的二進位制檔案以較為平緩的資料流的方式儲存為DolphinDB的分散式表,而不是採用全部匯入記憶體再存為分割槽表的方式,從而降低記憶體的使用需求。
loadMyDataEx
函式的引數可以參考DolphinDB內建函式loadTextEx
。它的語法是:loadMyDataEx(dbHandle, tableName, partitionColumns, path, [start], [length])
。如果資料庫中的表存在,則將匯入的資料新增到已有的表result
中。如果表不存在,則建立一張表result
,然後新增資料。最後返回這張表:
string dbPath = ((SystemHandleSP) db)->getDatabaseDir();
vector<ConstantSP> existsTableArgs = {new String(dbPath), tableName};
bool existsTable = heap->currentSession()->getFunctionDef("existsTable")->call(heap, existsTableArgs)->getBool(); // 相當於existsTable(dbPath, tableName)
ConstantSP result;
if (existsTable) { // 表存在,直接載入表
vector<ConstantSP> loadTableArgs = {db, tableName};
result = heap->currentSession()->getFunctionDef("loadTable")->call(heap, loadTableArgs); // 相當於loadTable(db, tableName)
}
else { // 表不存在,建立表
TableSP schema = extractMyDataSchema(new Void(), new Void());
ConstantSP dummyTable = DBFileIO::createEmptyTableFromSchema(schema);
vector<ConstantSP> createTableArgs = {db, dummyTable, tableName, partitionColumns};
result = heap->currentSession()->getFunctionDef("createPartitionedTable")->call(heap, createTableArgs); // 相當於createPartitionedTable(db, dummyTable, tableName, partitionColumns)
}
讀取資料並新增到表中的程式碼實現採用了Pipeline框架。它的初始任務是一系列具有不同start
引數的loadMyData
函式呼叫,pipeline的follower
函式是一個部分應用append!{result}
,相當於把整個讀取資料的任務分成若干份執行,呼叫loadMyData
分塊讀取後,將相應的資料通過append!
插入表中。核心部分的程式碼如下:
int sizePerPartition = 16 * 1024 * 1024;
int partitionNum = fileLength / sizePerPartition;
vector<DistributedCallSP> tasks;
FunctionDefSP func = Util::createSystemFunction("loadMyData", loadMyData, 1, 3, false);
int partitionStart = start;
int partitionLength = length / partitionNum;
for (int i = 0; i < partitionNum; i++) {
if (i == partitionNum - 1)
partitionLength = length - partitionLength * i;
vector<ConstantSP> partitionArgs = {path, new Int(partitionStart), new Int(partitionLength)};
ObjectSP call = Util::createRegularFunctionCall(func, partitionArgs); // 將會呼叫loadMyData(path, partitionStart, partitionLength)
tasks.push_back(new DistributedCall(call, true));
partitionStart += partitionLength;
}
vector<ConstantSP> appendToResultArgs = {result};
FunctionDefSP appendToResult = Util::createPartialFunction(heap->currentSession()->getFunctionDef("append!"), appendToResultArgs); // 相當於append!{result}
vector<FunctionDefSP> functors = {appendToResult};
PipelineStageExecutor executor(functors, false);
executor.execute(heap, tasks);
本節的完整程式碼請參考附件中的程式碼。用Pipeline框架實現資料的分塊匯入,只是一種思路。在具體開發時,可以採用ComputingModel.h
中宣告的StaticStageExecutor
,也可以使用Concurrent.h
中宣告的執行緒模型Thread
。實現方法有很多種,需要根據實際場景選擇。
6.5myDataDS
函式
myDataDS
函式返回一個數據源的元組。每個資料來源都是一個表示函式呼叫的Code object,可以通過Util::createRegularFunctionCall
生成。執行這個物件可以取得對應的資料。以下是基於loadMyData
函式產生資料來源的一個例子:
ConstantSP myDataDS(Heap *heap, vector<ConstantSP> &args) {
ConstantSP path = args[0];
long long fileLength = Util::getFileLength(path->getString());
size_t bytesPerRow = 32;
int start = args.size() >= 2 ? args[1]->getInt() : 0;
int length = args.size() >= 3 ? args[2]->getInt() : fileLength / bytesPerRow - start;
int sizePerPartition = 16 * 1024 * 1024;
int partitionNum = fileLength / sizePerPartition;
int partitionStart = start;
int partitionLength = length / partitionNum;
FunctionDefSP func = Util::createSystemFunction("loadMyData", loadMyData, 1, 3, false);
ConstantSP dataSources = Util::createVector(DT_ANY, partitionNum);
for (int i = 0; i < partitionNum; i++) {
if (i == partitionNum - 1)
partitionLength = length - partitionLength * i;
vector<ConstantSP> partitionArgs = {path, new Int(partitionStart), new Int(partitionLength)};
ObjectSP code = Util::createRegularFunctionCall(func, partitionArgs); // 將會呼叫loadMyData(path, partitionStart, partitionLength)
dataSources->set(i, new DataSource(code));
}
return dataSources;
}
教程中的完整程式碼見https://github.com/dolphindb/Tu