1. 程式人生 > 實用技巧 >時序資料庫DolphinDB與Druid的對比測試

時序資料庫DolphinDB與Druid的對比測試

https://zhuanlan.zhihu.com/p/56102593

DolphinDB和Druid都是分散式的分析型時序資料庫。儘管前者使用c++開發,後者使用java開發,兩者在架構、功能、應用場景等方面有很多共同點。本報告在SQL查詢、資料匯入、磁碟佔用空間等方面對兩者進行效能的對比測試。

測試資料集使用約300GB的美國股票市場交易與報價資料。通過測試我們發現:

  • DolphinDB的資料寫入速度大約是Druid的30倍。
  • DolphinDB的查詢速度是Druid的10倍左右。
  • DolphinDB資料庫的靜態空間佔用比Druid高80%,執行時使用的總磁碟空間略低於Druid。

1. 系統介紹

DolphinDB是一款分析型的分散式時序資料庫,由C++編寫,內建流資料處理引擎,平行計算引擎和分散式計算的功能。DolphinDB database 內建分散式檔案系統,支援叢集水平和垂直擴充套件。提供類SQL和Python的指令碼語言,不僅可以用SQL進行對資料進行操作,也可以完成更為複雜的記憶體計算。提供其它常用程式語言的API,方便與已有應用程式整合。DolphinDB能對萬億級資料快速處理,在金融領域中的歷史資料分析建模與實時流資料處理,以及物聯網領域中的海量感測器資料處理與實時分析等場景中均有非常出色的表現。

Druid是一個由Java語言實現的OLAP資料倉庫,適用於萬億級別資料量上的低延時查詢和插入以及實時流資料分析。Druid採用分散式、SN架構和列式儲存、倒排索引、點陣圖索引等關鍵技術,具有高可用性和高擴充套件性的特點。同時,Druid提供了多種語言介面,支援部分SQL。

2. 系統配置

2.1 硬體配置

本次測試的硬體配置如下:

裝置:DELL OptiPlex 7060

CPU:Inter(R) Core™ i7-8700 CPU @ 3.20GHz,6核心12執行緒

記憶體:32GB

硬碟:256GB SSD,1.8TB希捷ST2000DM008-2FR102機械硬碟

作業系統:Ubuntu 16.04 x64

2.2 環境配置

本次的測試環境為單伺服器下的多節點叢集。設定DolphinDB的資料節點的個數為4個,單個數據節點最大可用記憶體設定為4GB。設定Druid的節點個數為5個,分別為overload,broker,historical,coordinator和middleManager。Druid預設對查詢結果進行快取,影響測試時通過多次查詢求平均值這個方法的正確性,故關閉query cache的功能。為不影響Druid的寫入效能測試, 關閉了Druid的roll up功能。其他配置均服從預設配置。

原始csv檔案儲存在HDD上。資料庫儲存在SSD上。

3. 測試資料集

本次測試採用了2007年8月美國股票市場level1的TAQ資料集。TAQ資料集按日分為23個csv檔案,單個檔案大小在7.8G到19.1G不等,整個資料集大小約290G,共有6,561,693,704條資料。

測試資料集TAQ在DolphinDB和Druid中各個欄位的資料型別如下所示:

在Druid中,DATE欄位指定為timestamp列。其它欄位均用作dimension欄位。

4. 資料分割槽方案

在DolphinDB中,採用股票程式碼+日期組合分割槽,其中按照股票程式碼範圍分為128個分割槽,按照日期分為23個分割槽。

Druid僅支援時間範圍分割槽,因此我們把DATE列指定為timestamp型別,以日為單位,共劃分為23個分割槽。

5. 對比測試

我們從資料庫查詢效能、I/O效能以及磁碟佔用空間三方面對DolphinDB和Druid進行了對比測試。

5.1 資料庫查詢效能

DolphinDB指令碼語言支援SQL語法,同時針對時序資料進行了功能擴充套件。Druid提供了基於Json資料格式的語言進行查詢,同時也提供了dsql來進行SQL查詢。本次測試使用Druid自帶的dsql。

我們對TAQ資料集進行了若干種常用的SQL查詢。為了減少偶然因素對結果的影響,本次查詢效能測試對每種查詢操作均進行了10次,對總時間取平均值,時間以毫秒為單位。測試DolphinDB時,我們使用了timer語句來評估SQL語句在服務端的執行時間。由於Druid中沒有提供輸出查詢時間的工具或函式,採用了客戶端命令列工具dsql列印的執行時間。Druid返回的執行時間相比DolphinDB,多了查詢結果的傳輸和顯示時間。由於查詢返回的資料量都很小,dsql與Druid伺服器又在同一個節點上,影響的時間在1ms左右。1ms左右的時間不影響我們的測試結論,因此沒有做特殊處理。

7個查詢的SQL表示如下表所示。

測試結果如下表所示。

從結果可以看出,對於幾乎所有查詢,DolphinDB的效能都優於Druid,速度大約是Druid的3到30倍。

由於Druid只允許根據時間戳進行segment的劃分,而DolphinDB允許從多個維度上對資料進行劃分,在TAQ分割槽時用了時間和股票程式碼兩個維度,因此在查詢中需要根據股票程式碼過濾或分組的測試(如第1、3、6、7項測試)中,DolphinDB的優勢更明顯。

5.2 I/O效能測試

我們測試了DolphinDB和Druid在匯入單個檔案(7.8G)和多個檔案(290.8G)時的效能。公平起見,我們關閉了Druid的Roll up功能。測試結果如下表所示,時間以秒為單位。

相同情況下匯入單個檔案,Druid的匯入時間是DolphinDB的16倍以上,匯入多個檔案時,由於DolphinDB支援並行匯入,速度相比Druid更快。資料匯入指令碼見附錄2。

5.3 磁碟佔用空間測試

資料匯入到DolphinDB和Druid後,我們比較了兩者的資料壓縮率。測試結果如下表所示。

DolphinDB採用LZ4壓縮演算法,對列式儲存的資料進行快速壓縮。DolphinDB的SYMBOL型別在壓縮之前,會使用字典編碼,將字串轉化成整型。Druid在資料儲存過程中,對timestamp和metrics採用LZ4演算法直接壓縮,對dimensions欄位使用字典編碼、點陣圖索引以及roaring bitmap進行壓縮。使用字典編碼可以減少字串儲存的空間,點陣圖索引可快速地進行按位邏輯操作,點陣圖索引壓縮排一步節約了儲存空間。

本次測試中,DolphinDB資料庫佔用的磁碟空間比Druid高出約80%。造成這個差異的主要因素是BID和OFR兩個浮點型欄位在DolphinDB和Druid上的壓縮比有很大的差異。在DolphinDB上,這個兩個欄位的壓縮比是20%,而在Druid上高達5%。原因是測試資料集是一個歷史資料集,資料已經按照日期和股票兩個欄位排序。一個股票在短時間內的報價變化很小,unique的報價個數非常有限,Druid使用點陣圖壓縮的效果非常好。

雖然Druid資料庫的壓縮比更高,靜態的磁碟空間佔用較小,但是Druid執行時會產生segment cache目錄,總的磁碟空間佔用達到65 GB。而DolphinDB執行時不需要額外的空間,總的磁碟空間反而比Druid略小。

6. 小結

DolphinDB對於Druid的效能優勢來自於多個方面,包括(1)儲存機制和分割槽機制上的差別,(2)開發語言(c++ vs java)上的差別,(3)記憶體管理上的差別,以及(4)演算法(如排序和雜湊)實現上的差別。

在分割槽上,Druid只支援時間型別的範圍分割槽,相對於支援值分割槽、範圍分割槽、雜湊分割槽和列表分割槽且每張表可根據多個欄位進行組合分割槽的DolphinDB而言缺乏靈活性。DolphinDB的分割槽粒度更細,不易出現數據或查詢集中到某個節點的情況,在查詢時DolphinDB所需要掃描的資料塊也更少,響應時間更短,效能也更加出色。

除去效能,DolphinDB在功能上比Druid也更為完善。在SQL的支援方面,DolphinDB支援非常強大的window function機制,對SQL join的支援也更為全面。對時序資料特有的sliding function,asof join, window join,DolphinDB都有很好的支援。DolphinDB集資料庫、程式語言和分散式計算於一體,除了常規的資料庫查詢功能,DolphinDB也支援更為複雜的記憶體計算,分散式計算以及流計算。

DolphinDB和Druid在執行方式上也略有區別。在Druid崩潰後或是將segment-cache清空後重啟時,需要花大量的時間重新載入資料,將每一個segment解壓到segment-cache中再進行查詢,效率較低,cache也會佔用較大的空間,因此Druid在重新啟動時需要等待較長的時間,並且要求更大的空間。

附錄

附錄1. 環境配置

(1) DolphinDB配置

controller.cfg

localSite=localhost:9919:ctl9919
localExecutors=3
maxConnections=128
maxMemSize=4
webWorkerNum=4
workerNum=4
dfsReplicationFactor=1
dfsReplicaReliabilityLevel=0
enableDFS=1
enableHTTPS=0

cluster.nodes

localSite,mode
localhost:9910:agent,agent
localhost:9921:DFS_NODE1,datanode
localhost:9922:DFS_NODE2,datanode
localhost:9923:DFS_NODE3,datanode
localhost:9924:DFS_NODE4,datanode

cluster.cfg

maxConnection=128
workerNum=8
localExecutors=7
webWorkerNum=2
maxMemSize=4

agent.cfg

workerNum=3
localExecutors=2
maxMemSize=4
localSite=localhost:9910:agent
controllerSite=localhost:9919:ctl9919

(2) Druid配置

_common

# Zookeeper
druid.zk.service.host=zk.host.ip
druid.zk.paths.base=/druid
# Metadata storage
druid.metadata.storage.type=mysql
druid.metadata.storage.connector.connectURI=jdbc:mysql://db.example.com:3306/druid
# Deep storage
druid.storage.type=local
druid.storage.storageDirectory=var/druid/segments
# Indexing service logs
druid.indexer.logs.type=file
druid.indexer.logs.directory=var/druid/indexing-logs

broker:

Xms24g
Xmx24g
XX:MaxDirectMemorySize=4096m

# HTTP server threads
druid.broker.http.numConnections=5
druid.server.http.numThreads=25

# Processing threads and buffers
druid.processing.buffer.sizeBytes=2147483648
druid.processing.numThreads=7

# Query cache
druid.broker.cache.useCache=false
druid.broker.cache.populateCache=false

coordinator:
Xms3g
Xmx3g

historical:
Xms8g
Xmx8g

# HTTP server threads
druid.server.http.numThreads=25

# Processing threads and buffers
druid.processing.buffer.sizeBytes=2147483648
druid.processing.numThreads=7

# Segment storage
druid.segmentCache.locations=[{"path":"var/druid/segment-cache","maxSize":0}]
druid.server.maxSize=130000000000

druid.historical.cache.useCache=false
druid.historical.cache.populateCache=false

middleManager:
Xms64m
Xmx64m

# Number of tasks per middleManager
druid.worker.capacity=3

# HTTP server threads
druid.server.http.numThreads=25

# Processing threads and buffers on Peons
druid.indexer.fork.property.druid.processing.buffer.sizeBytes=4147483648
druid.indexer.fork.property.druid.processing.numThreads=2

overload:

Xms3g
Xmx3g

附錄2. 資料匯入指令碼

DolphinDB database 指令碼:

if (existsDatabase("dfs://TAQ"))
dropDatabase("dfs://TAQ")

db = database("/Druid/table", SEQ, 4)
t=loadTextEx(db, 'table', ,"/data/data/TAQ/TAQ20070801.csv")
t=select count(*) as ct from t group by symbol
buckets = cutPoints(exec symbol from t, 128)
buckets[size(buckets)-1]=`ZZZZZ
t1=table(buckets as bucket)
t1.saveText("/data/data/TAQ/buckets.txt")

db1 = database("", VALUE, 2007.08.01..2007.09.01)
partition = loadText("/data/data/buckets.txt")
partitions = exec * from partition
db2 = database("", RANGE, partitions)
db = database("dfs://TAQ", HIER, [db1, db2])
db.createPartitionedTable(table(100:0, `symbol`date`time`bid`ofr`bidsiz`ofrsiz`mode`ex`mmid, [SYMBOL, DATE, SECOND, DOUBLE, DOUBLE, INT, INT, INT, CHAR, SYMBOL]), `quotes, `date`symbol)

def loadJob() {
filenames = exec filename from files('/data/data/TAQ')
db = database("dfs://TAQ")
filedir = '/data/data/TAQ'
for(fname in filenames){
jobId = fname.strReplace(".csv", "")
jobName = jobId 
submitJob(jobId,jobName, loadTextEx{db, "quotes", `date`symbol,filedir+'/'+fname})
}
}
loadJob()
select * from getRecentJobs()
TAQ = loadTable("dfs://TAQ","quotes");

Druid指令碼:

{
"type" : "index",
"spec" : {
"dataSchema" : {
"dataSource" : "TAQ",
"parser" : {
"type" : "string",
"parseSpec" : {
"format" : "csv",
"dimensionsSpec" : {
"dimensions" : [
"TIME",
"SYMBOL",
{"name":"BID", "type" : "double"},
{"name":"OFR", "type" : "double"},
{"name":"BIDSIZ", "type" : "int"},
{"name":"OFRSIZ", "type" : "int"},
"MODE",
"EX",
"MMID"
]
},
"timestampSpec": {
"column": "DATE",
"format": "yyyyMMdd"
},
"columns" : ["SYMBOL",
"DATE",
"TIME",
"BID",
"OFR",
"BIDSIZ",
"OFRSIZ",
"MODE",
"EX",
"MMID"]
}
},
"metricsSpec" : [],
"granularitySpec" : {
"type" : "uniform",
"segmentGranularity" : "day",
"queryGranularity" : "none",
"intervals" : ["2007-08-01/2007-09-01"],
"rollup" : false
}
},
"ioConfig" : {
"type" : "index",
"firehose" : {
"type" : "local",
"baseDir" : "/data/data/",
"filter" : "TAQ.csv"
},
"appendToExisting" : false
},
"tuningConfig" : {
"type" : "index",
"targetPartitionSize" : 5000000,
"maxRowsInMemory" : 25000,
"forceExtendableShardSpecs" : true
}
}
}

編輯於 2019-11-08 Druid Database Software 時序資料庫