druid 的基礎架構與應用
本文介紹了druid的基礎架構以及工作過程,通過一個應用案例加深了解。
durid簡介
druid是一種高性能、列式存儲、分布式數據存儲的時序數據分析引擎。能支持“PB”級數據的秒級查詢。類似的產品有kylin/clickhouse。druid典型的應用就是OLAP場景下的cube組合查詢分析。如數據鉆取(Drill-down)、上卷(Roll-up)、切片(Slice)、切塊(Dice)以及旋轉(Pivot)。後面的應用示例章節再詳細闡述。
durid基礎架構
先來了解一下durid主要節點:
1、broker node(代理節點)
Broker節點扮演著歷史節點和實時節點的查詢路由的角色。主要負責接收外部查詢,轉發查詢至各個segment數據所在的節點,並聚合結果返回。
2、historical node(歷史節點)
historical主要負責歷史數據存儲和查詢,接收協調節點數據加載與刪除指令,從deepstoage中下載segment,完成數據加載或者刪除後在zk中進行通告。歷史節點遵循shared-nothing的架構,因此節點間沒有單點問題。節點間是相互獨立的並且提供的服務也是簡單的,它們只需要知道如何加載、刪除和處理不可變的segment。historical節點也可以進行分組,組合成不同的historical tier。這會在集群規模較大的時候體現出優勢。如做數據的冷熱分離,按不同業務的數據分離(一定程度的資源隔離)。當然,historical 節點是整個集群查詢性能的核心所在,因為historical會承擔絕大部分的segment查詢。
3、coordinator node(協調節點)
主要負責數據的管理和在歷史節點上的分布。協調節點告訴歷史節點加載新數據、卸載過期數據、復制數據、和為了負載均衡移動數據。可以配置load數據及drop數據規則。
4、overlord node(index service 可以理解為任務管理節點)
功能描述:負責接收任務,管理任務。接收外部http請求(新建任務、查詢任務狀態、kill任務等),分配管理任務(當有新的任務請求,overload node會將任務分配給middleManager node去執行)。
5、middleManager node(可以理解為overlord節點的工作節點)
功能描述:可以啟動n(可配置)個peon,接收overlord分配的task,再交給自己peon去執行。
圖片描述(最多50字)
查詢過程 見上圖藍色箭頭,Broker節點接收到查詢(Q1),再將查詢發送給歷史節點與實時節點(Q2,Q3),在上圖的模式中,實時節點是MM節點上啟動的task。該task會負責數據的攝入以及提供實時數據的查詢。 數據攝入過程 見上圖紅色箭頭,D1是client生產數據最終寫入kafka(這個過程可能在client與kafka的中間,還包含了多個環節,如數據傳輸與數據清洗),D2和D3過程是部署tranquility-kafka服務,消費kafka數據寫入對應的task,tranquility-kakfa啟動的時候會跟overlord節點通信,由overlord節點分配任務給middleManager執行。D4是task 負責的segment段正常結束,然後將segment數據寫入deepstorage過程。(實時task運行時間是segmentGranularity+windowPeriod+intermediatePersistPeriod)。D5則是historical節點從deepstorage下載segment並在zk中聲明負責該segment段查詢的過程。 目前druid數據攝入過程還有一種更推薦的方式就是kafka index service(簡稱kis),有興趣的同學可以參考官方文檔,kis對kafka的版本有強要求。 druid整體架構雖然略為復雜,但是整體穩定性非常不錯,幾乎很少出現集群故障。拋開集群硬件故障和數據本身問題,SLA基本能到4個9。coordinator,overlord兩個節點是主從模式,保證每個角色起兩個實例即可。broker節點無狀態,可以起多個實例,前面掛個域名即可(為了保證緩存命中,最好配置ip hash)。historical節點無狀態,有一定冗余即可。middleManager用作數據攝入節點,若task沒有配置副本,則節點宕機會引發丟數據的風險。當然,kis可以避免該問題。 durid數據聚合、存儲核心思想 druid 數據存儲分為三部分timestamp、dimensions、metrics。其中,timestamp、metrics部分是采用lz4直接壓縮。 但是dimensions部分需要支持過濾查詢以及分組查詢。所以dimensions部分的每個維度都采用了以下三種數據結構做轉碼、存儲:
A dictionary that maps values (which are always treated as strings) to integer IDs,
For each distinct value in the column,a bitmap that indicates which rows contain that value,and
A list of the column’s values,encoded using the dictionary in 1
舉個例子,源數據如下:
圖片描述(最多50字)
name列來說 1. Dictionary that encodes column values
字典表的key都是唯一的,所以Map的key是unique的column value,Map的value從0開始不斷增加。 示例數據的name列只有兩個不同的值。所以張三編號為0,李四編號為1:
{
"張三": 0
"李四": 1
}
- Column data
要保存的是每一行中這一列的值,值是ID而不是原始的值。因為有了上面的Map字典,所以有下面的對應關系:
[0,
1,
1,
0]
- Bitmaps - one for each unique value of the column
BitMap的key是第一步Map的key(原始值), value則是真假的一個標識(是|否?等於|不等於?),取值只有0、1,如下:
value="張三": [1,0,0,1]
value=“李四": [0,1,1,0]
所以由上可知最壞的情況可能是隨著數據量的增加,bitmap的個數也成線性增長,為數據量大小*列的個數。那麽在什麽情況下會導致這種線性增長?這裏我們引入了一個基數(cardinality)的概念。基數=unique(dim1,dim2.....),如若dim取值均為各種爆炸性id或者隨機數,則druid的預聚合將完全失去意義。所以在druid的應用場景中,基數約小,聚合效率越高。
講了dimensions怎麽存儲,那麽metrics又是怎麽聚合(roll-up)呢?這就要引入druid數據schema定義了。下一章結合應用一塊看一個示例。
應用示例與實踐經驗
假設有這樣一份數據,典型的商品銷售數據。
圖片描述(最多50字)
我們構造成druid中的數據schema如下:
{
"dataSources" : [ {
"spec" : {
"dataSchema" : {
"dataSource" : "test_datasource",
"granularitySpec" : {
"segmentGranularity" : "hour",
"queryGranularity" : "minute",
"type" : "uniform"
},
"parser" : {
"type" : "string",
"parseSpec" : {
"format" : "json",
"timestampSpec" : {
"column" : "time",
"format" : "auto"
},
"dimensionsSpec" : {
"dimensions" : [ "productName", "city", "channel", “action"]
}
}
},
"metricsSpec" : [ {
"name" : "count",
"type" : "count"
}, {
"type" : "doubleSum",
"fieldName" : "price",
"name" : “sale"
} ]
},
"tuningConfig" : {
"type" : "realtime",
"windowPeriod" : "P×××0M",
"intermediatePersistPeriod" : "P×××0M",
"maxRowsInMemory" : "100000"
}
},
"properties" : {
"topicPattern" : "test_datasource",
"task.partitions" : "2",
"task.replicants" : "1"
}
} ],
"properties" : {
...
}
}
前面重點說了dimensions,我們再來看下metrics。在上面的例子中我們只定義count和針對price的doubleSum,那麽這些指標就已經固定了後期的分析需求。我們看到上面table中的一二行標紅部分,所有dim取值完全相同,queryGranularity為一分鐘。那麽在這2018-06-11 12:23:00這個點,這兩行數據就被聚合成一行,count=2,sale=0。以此類推。
然後我們再來看看具體的分析需求,一個鉆取的例子。我們首先查看商品A昨天的點擊量,select sum(count) from table where productName=‘A’ and action=‘click‘,再想看看地區=北京,渠道=web呢?是不是再加幾個where就搞定了?select sum(count) from table where productName=‘A’ and city=‘北京’ and channel=‘web‘ and action=‘click’; 然後就是切片和切塊,也很簡單,就是幾個group by。這些在druid中都能非常輕松的支持。
具體使用上的經驗總結:
1. reindex思想。一般我們實時數據查詢粒度配置的會比較小,秒級或者分鐘級。那麽對於一天前,三天前,一個月前的數據呢?這時候一般關註的粒度將不再那麽細,所以我們一般會采取redinx的策略進行再聚合
2. 針對歷史數據,可能對於某些維度將不在關心,這時候我們也可以在reindex時,將無用的維度剔除掉,可能大大減少整體數據的基數。
3. 一般數據壓縮比例。這裏提供一個大概的參考值。數據總基數在10W以下,每天數據量約百億左右,druid中聚合後的索引數據與原始數據大小之比可以到1:100,甚至1:1000。
4. druid適用於常規的olap場景,能非常輕松的支撐每天百億甚至千億級別的數據寫入。
5. 爆炸性維度數據,以及頻繁update數據的需求,不適用於druid的場景。
總結
本文主要對druid做了入門級的基礎介紹,可以給大家做olap引擎技術選型時做一個參考。以及對druid的初學者做一個大致介紹。druid是一款非常優秀的olap引擎,從性能、穩定性上來說,都是非常不錯的。
druid 的基礎架構與應用