1. 程式人生 > >6-druid原始碼分析之 Hadoop-index 過程

6-druid原始碼分析之 Hadoop-index 過程

druid 中資料批處理生成segment是通過 hadoop_index 方式來完成的.

hadoop_index 主要劃分成兩個過程,由兩個 job 來分別完成.

hadoop-index 過程有兩個 MR , 但是它們 Map 操作都是依賴於同一個抽象類HadoopDruidIndexerMapper.
這個類主要實現了 map 的前半段過程。

主要功能是 :

  • 解析每一行資料,得到 timestamp, demenssion, metric 列。
  • 將不在獲取時間範圍內的資料剔除掉

估算資料量

實現類

io.druid.indexer.DetermineHashedPartitionsJob

主要工作

DetermineHashedPartitionsJob 的工作主要是分析資料量併為資料轉segment過程分配合理的桶的數量,每個桶有一個 reduce 去執行,
這樣使得每個 reduce 的操作資料不至於過大。

  • map 的操作:

map 的數量通過檔案數量來決定, 每一個map對應一個檔案.

map 中主要有一個 hyperLogLogs 物件, 這個物件在 setup 中建立, 它使用segment(interval)作為key, 然後一個類似 list 的結構作為value,
list 中的每一個值都具有唯一性,hyperLogLogs 是一個基數估計器,用來計算大量資料中不同元素的個數, 我們將 map 的時間戳跟維度列雜湊計算後,放到相對應的segment 中。
最終,我們通過計算hyperLogLogs 中每個segment 的元素個數,就確定去大致有多少條資料了。

map 中主要的操作如下 :

  1. map 從檔案中讀取的每一行資料,將時間戳在時間範圍之外的資料排除掉, 然後將時間戳按照查詢粒度進行規整,與維度列一起構成groupKey,groupKey 作為元素的計數標準, 求出每一個檔案中每個segment中總共有的記錄數量.
  2. 解析每一條資料,根據時間戳確定資料所屬segment,然後將 groupKey 進行雜湊獲取一個位元組序列,將這個值add到 按照 segment 作為 key 的 hyperLogLogs 中。
  3. 當map的所有的資料解析完成後,遍歷這個 hyperLogLogs 的所有的 key(segment), 將每一個 key 對應的值轉位元組陣列後傳送到對應的 reduce 中去。

資料從 map 中出來後就確定 reduce , 在DetermineHashedPartitionsJob中的partition 過程是依據 segment(interval) 的,及每個 segment 對應一個reduce.

reduce 中主要操作如下:

  • reduce 端彙總指定的 segment 中的所有的hyperLogLogs 的輸出資料,分析出總的記錄條數寫入檔案。

當 job 執行完後, 我們便可以直到每個segment 中有多少條資料,然後我們可以根據配置,將每個 segment 劃分桶

桶的數量 = 總的數量/maxPartitionSize[配置屬性]

這樣我們就分配好了當真正執行 segment 轉換時需要多少個 reduce .