1. 程式人生 > >如何快速地把HDFS中的資料匯入ClickHouse

如何快速地把HDFS中的資料匯入ClickHouse

如何快速地把HDFS中的資料匯入ClickHouse

3739fb5b1bad134c6fc2ebccb502178d.jpeg

ClickHouse是面向OLAP的分散式列式DBMS。我們部門目前已經把所有資料分析相關的日誌資料儲存至ClickHouse這個優秀的資料倉庫之中,當前日資料量達到了300億。

之前介紹的有關資料處理入庫的經驗都是基於實時資料流,資料儲存在Kafka中,我們使用Java或者Golang將資料從Kafka中讀取、解析、清洗之後寫入ClickHouse中,這樣可以實現資料的快速接入。然而在很多同學的使用場景中,資料都不是實時的,可能需要將HDFS或者是Hive中的資料匯入ClickHouse。有的同學通過編寫Spark程式來實現資料的匯入,那麼是否有更簡單、高效的方法呢。

目前開源社群上有一款工具Waterdrop,專案地址https://github.com/InterestingLab/waterdrop,可以快速地將HDFS中的資料匯入ClickHouse。

HDFS to ClickHouse

假設我們的日誌儲存在HDFS中,我們需要將日誌進行解析並篩選出我們關心的欄位,將對應的欄位寫入ClickHouse的表中。

Log Sample

我們在HDFS中儲存的日誌格式如下, 是很常見的Nginx日誌

10.41.1.28 github.com 114.250.140.241 0.001s "127.0.0.1:80" [26/Oct/2018:03:09:32 +0800] "GET /InterestingLab/waterdrop HTTP/1.1" 200 0 "-" - "Dalvik/2.1.0 (Linux; U; Android 7.1.1; OPPO R11 Build/NMF26X)" "196" "-" "mainpage" "443" "-" "172.16.181.129"

ClickHouse Schema

我們的ClickHouse建表語句如下,我們的表按日進行分割槽

CREATE TABLE cms.cms_msg
(
    date Date, 
    datetime DateTime, 
    url String, 
    request_time Float32, 
    status String, 
    hostname String, 
    domain String, 
    remote_addr String, 
    data_size Int32, 
    pool String
) ENGINE = MergeTree PARTITION BY date ORDER BY date SETTINGS index_granularity = 16384

Waterdrop with ClickHouse

接下來會給大家詳細介紹,我們如何通過Waterdrop滿足上述需求,將HDFS中的資料寫入ClickHouse中。

Waterdrop

Waterdrop是一個非常易用,高效能,能夠應對海量資料的實時資料處理產品,它構建在Spark之上。Waterdrop擁有著非常豐富的外掛,支援從Kafka、HDFS、Kudu中讀取資料,進行各種各樣的資料處理,並將結果寫入ClickHouse、Elasticsearch或者Kafka中。

Prerequisites

首先我們需要安裝Waterdrop,安裝十分簡單,無需配置系統環境變數

  1. 準備Spark環境
  2. 安裝Waterdrop
  3. 配置Waterdrop

以下是簡易步驟,具體安裝可以參照Quick Start

cd /usr/local
wget https://archive.apache.org/dist/spark/spark-2.2.0/spark-2.2.0-bin-hadoop2.7.tgz
tar -xvf https://archive.apache.org/dist/spark/spark-2.2.0/spark-2.2.0-bin-hadoop2.7.tgz
wget https://github.com/InterestingLab/waterdrop/releases/download/v1.1.1/waterdrop-1.1.1.zip
unzip waterdrop-1.1.1.zip
cd waterdrop-1.1.1

vim config/waterdrop-env.sh
# 指定Spark安裝路徑
SPARK_HOME=${SPARK_HOME:-/usr/local/spark-2.2.0-bin-hadoop2.7}

Waterdrop Pipeline

我們僅需要編寫一個Waterdrop Pipeline的配置檔案即可完成資料的匯入。

配置檔案包括四個部分,分別是Spark、Input、filter和Output。

Spark

這一部分是Spark的相關配置,主要配置Spark執行時所需的資源大小。

spark {
  spark.app.name = "Waterdrop"
  spark.executor.instances = 2
  spark.executor.cores = 1
  spark.executor.memory = "1g"
}

Input

這一部分定義資料來源,如下是從HDFS檔案中讀取text格式資料的配置案例。

input {
    hdfs {
        path = "hdfs://nomanode:8020/rowlog/accesslog"
        table_name = "access_log"
        format = "text"
    }
}

Filter

在Filter部分,這裡我們配置一系列的轉化,包括正則解析將日誌進行拆分、時間轉換將HTTPDATE轉化為ClickHouse支援的日期格式、對Number型別的欄位進行型別轉換以及通過SQL進行欄位篩減等

filter {
    # 使用正則解析原始日誌
    grok {
        source_field = "raw_message"
        pattern = '%{IP:ha_ip}\\s%{NOTSPACE:domain}\\s%{IP:remote_addr}\\s%{NUMBER:request_time}s\\s\"%{DATA:upstream_ip}\"\\s\\[%{HTTPDATE:timestamp}\\]\\s\"%{NOTSPACE:method}\\s%{DATA:url}\\s%{NOTSPACE:http_ver}\"\\s%{NUMBER:status}\\s%{NUMBER:body_bytes_send}\\s%{DATA:referer}\\s%{NOTSPACE:cookie_info}\\s\"%{DATA:user_agent}\"\\s%{DATA:uid}\\s%{DATA:session_id}\\s\"%{DATA:pool}\"\\s\"%{DATA:tag2}\"\\s%{DATA:tag3}\\s%{DATA:tag4}'
    }
    # 將"dd/MMM/yyyy:HH:mm:ss Z"格式的資料轉換為
    # "yyyy/MM/dd HH:mm:ss"格式的資料
    date {
        source_field = "timestamp"
        target_field = "datetime"
        source_time_format = "dd/MMM/yyyy:HH:mm:ss Z"
        target_time_format = "yyyy/MM/dd HH:mm:ss"
    }
    # 使用SQL篩選關注的欄位,並對欄位進行處理
    # 甚至可以通過過濾條件過濾掉不關心的資料
    sql {
        table_name = "access"
        sql = "select substring(date, 1, 10) as date, datetime, hostname, url, http_code, float(request_time), int(data_size), domain from access"
    }
}

Output

最後我們將處理好的結構化資料寫入ClickHouse

output {
    clickhouse {
        host = "your.clickhouse.host:8123"
        database = "waterdrop"
        table = "access_log"
        fields = ["date", "datetime", "hostname", "uri", "http_code", "request_time", "data_size", "domain"]
        username = "username"
        password = "password"
    }
}

Running Waterdrop

我們將上述四部分配置組合成為我們的配置檔案config/batch.conf

vim config/batch.conf
spark {
  spark.app.name = "Waterdrop"
  spark.executor.instances = 2
  spark.executor.cores = 1
  spark.executor.memory = "1g"
}
input {
    hdfs {
        path = "hdfs://nomanode:8020/rowlog/accesslog"
        table_name = "access_log"
        format = "text"
    }
}
filter {
    # 使用正則解析原始日誌
    grok {
        source_field = "raw_message"
        pattern = '%{IP:ha_ip}\\s%{NOTSPACE:domain}\\s%{IP:remote_addr}\\s%{NUMBER:request_time}s\\s\"%{DATA:upstream_ip}\"\\s\\[%{HTTPDATE:timestamp}\\]\\s\"%{NOTSPACE:method}\\s%{DATA:url}\\s%{NOTSPACE:http_ver}\"\\s%{NUMBER:status}\\s%{NUMBER:body_bytes_send}\\s%{DATA:referer}\\s%{NOTSPACE:cookie_info}\\s\"%{DATA:user_agent}\"\\s%{DATA:uid}\\s%{DATA:session_id}\\s\"%{DATA:pool}\"\\s\"%{DATA:tag2}\"\\s%{DATA:tag3}\\s%{DATA:tag4}'
    }
    # 將"dd/MMM/yyyy:HH:mm:ss Z"格式的資料轉換為
    # "yyyy/MM/dd HH:mm:ss"格式的資料
    date {
        source_field = "timestamp"
        target_field = "datetime"
        source_time_format = "dd/MMM/yyyy:HH:mm:ss Z"
        target_time_format = "yyyy/MM/dd HH:mm:ss"
    }
    # 使用SQL篩選關注的欄位,並對欄位進行處理
    # 甚至可以通過過濾條件過濾掉不關心的資料
    sql {
        table_name = "access"
        sql = "select substring(date, 1, 10) as date, datetime, hostname, url, http_code, float(request_time), int(data_size), domain from access"
    }
}
output {
    clickhouse {
        host = "your.clickhouse.host:8123"
        database = "waterdrop"
        table = "access_log"
        fields = ["date", "datetime", "hostname", "uri", "http_code", "request_time", "data_size", "domain"]
        username = "username"
        password = "password"
    }
}

執行命令,指定配置檔案,執行Waterdrop,即可將資料寫入ClickHouse。這裡我們以本地模式為例。

./bin/start-waterdrop.sh --config config/batch.conf -e client -m 'local[2]'

Conclusion

在這篇文章中,我們介紹瞭如何使用Waterdrop將HDFS中的Nginx日誌檔案匯入ClickHouse中。僅通過一個配置檔案便可快速完成資料的匯入,無需編寫任何程式碼。除了支援HDFS資料來源之外,Waterdrop同樣支援將資料從Kafka中實時讀取處理寫入ClickHouse中。我們的下一篇文章將會介紹,如何將Hive中的資料快速匯入ClickHouse中。

當然,Waterdrop不僅僅是ClickHouse資料寫入的工具,在Elasticsearch以及Kafka等資料來源的寫入上同樣可以扮演相當重要的角色。

希望瞭解Waterdrop和ClickHouse、Elasticsearch、Kafka結合使用的更多功能和案例,可以直接進入專案主頁https://github.com/InterestingLab/waterdrop

– Power by InterestingLab