Druid 0.17 入門(3)—— 資料接入指南
阿新 • • 發佈:2020-03-17
![file](https://img2020.cnblogs.com/other/1089984/202003/1089984-20200317085651906-571819466.jpg)
在快速開始中,我們演示了接入本地示例資料方式,但Druid其實支援非常豐富的資料接入方式。比如批處理資料的接入和實時流資料的接入。本文我們將介紹這幾種資料接入方式。
- **檔案資料接入**:從檔案中載入批處理資料
- **從Kafka中接入流資料**:從Kafka中載入流資料
- **Hadoop資料接入**:從Hadoop中載入批處理資料
- **編寫自己的資料接入規範**:自定義新的接入規範
本文主要介紹前兩種最常用的資料接入方式。
## 1、Loading a file——載入檔案
Druid提供以下幾種方式載入資料:
- 通過頁面資料載入器
- 通過控制檯
- 通過命令列
- 通過Curl命令呼叫
### 1.1、資料載入器
Druid提供了一個示例資料檔案,其中包含2015年9月12日發生的Wiki的示例資料。
此樣本資料位於`quickstart/tutorial/wikiticker-2015-09-12-sampled.json.gz`
示例資料大概是這樣:
```
{
"timestamp":"2015-09-12T20:03:45.018Z",
"channel":"#en.wikipedia",
"namespace":"Main",
"page":"Spider-Man's powers and equipment",
"user":"foobar",
"comment":"/* Artificial web-shooters */",
"cityName":"New York",
"regionName":"New York",
"regionIsoCode":"NY",
"countryName":"United States",
"countryIsoCode":"US",
"isAnonymous":false,
"isNew":false,
"isMinor":false,
"isRobot":false,
"isUnpatrolled":false,
"added":99,
"delta":99,
"deleted":0,
}
```
Druid載入資料分為以下幾種:
- 載入檔案
- 從kafka中載入資料
- 從hadoop中載入資料
- 自定義載入方式
我們這樣演示一下載入示例檔案資料
##### 1.1.1、進入localhost:8888 點選load data
![file](https://img2020.cnblogs.com/other/1089984/202003/1089984-20200317085652292-1477787964.jpg)
##### 1.1.2、選擇local disk
![file](https://img2020.cnblogs.com/other/1089984/202003/1089984-20200317085652704-1762187705.jpg)
##### 1.1.3、選擇Connect data
![file](https://img2020.cnblogs.com/other/1089984/202003/1089984-20200317085653061-634765944.jpg)
##### 1.1.4、預覽資料
Base directory輸入quickstart/tutorial/
File filter輸入 wikiticker-2015-09-12-sampled.json.gz
然後點選apply預覽 就可以看見資料了 點選Next:parse data解析資料
![file](https://img2020.cnblogs.com/other/1089984/202003/1089984-20200317085653652-590886946.jpg)
##### 1.1.5、解析資料
可以看到json資料已經被解析了 繼續解析時間
![file](https://img2020.cnblogs.com/other/1089984/202003/1089984-20200317085654068-1185738456.jpg)
##### 1.1.6、解析時間
解析時間成功 之後兩步是transform和filter 這裡不做演示了 直接next
![file](https://img2020.cnblogs.com/other/1089984/202003/1089984-20200317085654460-1302068690.jpg)
##### 1.1.7、確認Schema
這一步會讓我們確認Schema 可以做一些修改
由於資料量較小 我們直接關掉Rollup 直接下一步
![file](https://img2020.cnblogs.com/other/1089984/202003/1089984-20200317085654856-423400439.jpg)
##### 1.1.8、設定分段
這裡可以設定資料分段 我們選擇hour next
![file](https://img2020.cnblogs.com/other/1089984/202003/1089984-20200317085655207-1910689495.jpg)
##### 1.1.9、確認釋出
![file](https://img2020.cnblogs.com/other/1089984/202003/1089984-20200317085655533-1019324676.jpg)
![file](https://img2020.cnblogs.com/other/1089984/202003/1089984-20200317085655856-446442045.jpg)
##### 1.1.10、釋出成功 開始解析資料
![file](https://img2020.cnblogs.com/other/1089984/202003/1089984-20200317085656178-822464076.jpg)
等待任務成功
![file](https://img2020.cnblogs.com/other/1089984/202003/1089984-20200317085656509-1753060725.jpg)
##### 1.1.11、檢視資料
選擇datasources 可以看到我們載入的資料
可以看到資料來源名稱 Fully是完全可用 還有大小等各種資訊
![file](https://img2020.cnblogs.com/other/1089984/202003/1089984-20200317085656848-1762524416.jpg)
##### 1.1.12、查詢資料
點選query按鈕
我們可以寫sql查詢資料了 還可以將資料下載
![file](https://img2020.cnblogs.com/other/1089984/202003/1089984-20200317085657184-897146130.jpg)
### 1.2 控制檯
在任務檢視中,單擊Submit JSON task
![file](https://img2020.cnblogs.com/other/1089984/202003/1089984-20200317085657825-842617736.jpg)
這將開啟規格提交對話方塊,貼上規範
```
{
"type" : "index_parallel",
"spec" : {
"dataSchema" : {
"dataSource" : "wikipedia",
"dimensionsSpec" : {
"dimensions" : [
"channel",
"cityName",
"comment",
"countryIsoCode",
"countryName",
"isAnonymous",
"isMinor",
"isNew",
"isRobot",
"isUnpatrolled",
"metroCode",
"namespace",
"page",
"regionIsoCode",
"regionName",
"user",
{ "name": "added", "type": "long" },
{ "name": "deleted", "type": "long" },
{ "name": "delta", "type": "long" }
]
},
"timestampSpec": {
"column": "time",
"format": "iso"
},
"metricsSpec" : [],
"granularitySpec" : {
"type" : "uniform",
"segmentGranularity" : "day",
"queryGranularity" : "none",
"intervals" : ["2015-09-12/2015-09-13"],
"rollup" : false
}
},
"ioConfig" : {
"type" : "index_parallel",
"inputSource" : {
"type" : "local",
"baseDir" : "quickstart/tutorial/",
"filter" : "wikiticker-2015-09-12-sampled.json.gz"
},
"inputFormat" : {
"type": "json"
},
"appendToExisting" : false
},
"tuningConfig" : {
"type" : "index_parallel",
"maxRowsPerSegment" : 5000000,
"maxRowsInMemory" : 25000
}
}
}
```
![file](https://img2020.cnblogs.com/other/1089984/202003/1089984-20200317085658126-438187606.jpg)
檢視載入任務即可。
### 1.3 命令列
為了方便起見,Druid提供了一個載入資料的指令碼
```
bin/post-index-task
```
我們可以執行命令
```
bin/post-index-task --file quickstart/tutorial/wikipedia-index.json --url http://localhost:8081
```
看到如下輸出:
```
Beginning indexing data for wikipedia
Task started: index_wikipedia_2018-07-27T06:37:44.323Z
Task log: http://localhost:8081/druid/indexer/v1/task/index_wikipedia_2018-07-27T06:37:44.323Z/log
Task status: http://localhost:8081/druid/indexer/v1/task/index_wikipedia_2018-07-27T06:37:44.323Z/status
Task index_wikipedia_2018-07-27T06:37:44.323Z still running...
Task index_wikipedia_2018-07-27T06:37:44.323Z still running...
Task finished with status: SUCCESS
Completed indexing data for wikipedia. Now loading indexed data onto the cluster...
wikipedia loading complete! You may now query your data
```
檢視載入任務即可。
### 1.4 CURL
我們可以通過直接呼叫CURL來載入資料
```
curl -X 'POST' -H 'Content-Type:application/json' -d @quickstart/tutorial/wikipedia-index.json http://localhost:8081/druid/indexer/v1/task
```
提交成功
```
{"task":"index_wikipedia_2018-06-09T21:30:32.802Z"}
```
## 2、Load from Apache Kafka——從Apache Kafka載入流資料
Apache Kafka是一個高效能的訊息系統,由Scala 寫成。是由Apache 軟體基金會開發的一個開源訊息系統專案。
Kafka 最初是由LinkedIn 開發,並於2011 年初開源。2012 年10 月從Apache Incubator 畢業。該專案的目標是為處理實時資料提供一個統一、高通量、低等待(低延時)的平臺。
更多kafka相關請檢視[Kafka入門寶典(詳細截圖版)](https://mp.weixin.qq.com/s/oFEv5c5zO7NAMA3YYB3CrQ)
### 2.1 安裝kafka
我們安裝一個最新的kafka
```
curl -O https://archive.apache.org/dist/kafka/2.1.0/kafka_2.12-2.1.0.tgz
tar -xzf kafka_2.12-2.1.0.tgz
cd kafka_2.12-2.1.0
```
啟動kafka
```
./bin/kafka-server-start.sh config/server.properties
```
建立一個topic
```
./bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic wikipedia
```
### 2.2 將資料寫入Kafka
向kafka的topic為wikipedia寫入資料
```
cd quickstart/tutorial
gunzip -c wikiticker-2015-09-12-sampled.json.gz > wikiticker-2015-09-12-sampled.json
```
在kafka目錄中執行命令 {PATH_TO_DRUID}替換為druid目錄
```
export KAFKA_OPTS="-Dfile.encoding=UTF-8"
./bin/kafka-console-producer.sh --broker-list localhost:9092 --topic wikipedia < {PATH_TO_DRUID}/quickstart/tutorial/wikiticker-2015-09-12-sampled.json
```
### 2.3 載入kafka資料到Druid
druid載入kafka的資料也有多種方式
- 資料載入器
- 控制檯
- CURL
#### 2.3.1 資料載入器
##### 2.3.1.1 進入localhost:8888 點選load data
選擇`Apache Kafka`並單擊`Connect data`
![file](https://img2020.cnblogs.com/other/1089984/202003/1089984-20200317085658508-13515230.jpg)
##### 2.3.1.2 輸入kafka伺服器`localhost:9092`
##### 輸入topic wikipedia 可以預覽資料 然後下一步
![file](https://img2020.cnblogs.com/other/1089984/202003/1089984-20200317085659086-1877246991.jpg)
##### 2.3.1.3 解析資料
![file](https://img2020.cnblogs.com/other/1089984/202003/1089984-20200317085659516-1969246996.jpg)
2.3.1.4 解析時間戳 設定轉換 設定過濾
![file](https://img2020.cnblogs.com/other/1089984/202003/1089984-20200317085659953-1741556967.jpg)
![file](https://img2020.cnblogs.com/other/1089984/202003/1089984-20200317085700451-371940198.jpg)
![file](https://img2020.cnblogs.com/other/1089984/202003/1089984-20200317085700822-141714061.jpg)
##### 2.3.1.4 這步比較重要 確定統計的範圍
![file](https://img2020.cnblogs.com/other/1089984/202003/1089984-20200317085701138-1897062580.jpg)
![file](https://img2020.cnblogs.com/other/1089984/202003/1089984-20200317085701530-1741799766.jpg)
##### 2.3.1.5 釋出
![file](https://img2020.cnblogs.com/other/1089984/202003/1089984-20200317085702014-626016517.jpg)
##### 2.3.1.6 等待任務完成
![file](https://img2020.cnblogs.com/other/1089984/202003/1089984-20200317085702366-1043860547.jpg)
![file](https://img2020.cnblogs.com/other/1089984/202003/1089984-20200317085702760-1551772401.jpg)
##### 2.3.1.7 去查詢頁面檢視
![file](https://img2020.cnblogs.com/other/1089984/202003/1089984-20200317085703163-391760406.jpg)
#### 2.3.2 控制檯
在任務檢視中,單擊`Submit JSON supervisor`以開啟對話方塊。
![file](https://img2020.cnblogs.com/other/1089984/202003/1089984-20200317085703477-940123858.jpg)
貼上進去如下指令
```
{
"type": "kafka",
"spec" : {
"dataSchema": {
"dataSource": "wikipedia",
"timestampSpec": {
"column": "time",
"format": "auto"
},
"dimensionsSpec": {
"dimensions": [
"channel",
"cityName",
"comment",
"countryIsoCode",
"countryName",
"isAnonymous",
"isMinor",
"isNew",
"isRobot",
"isUnpatrolled",
"metroCode",
"namespace",
"page",
"regionIsoCode",
"regionName",
"user",
{ "name": "added", "type": "long" },
{ "name": "deleted", "type": "long" },
{ "name": "delta", "type": "long" }
]
},
"metricsSpec" : [],
"granularitySpec": {
"type": "uniform",
"segmentGranularity": "DAY",
"queryGranularity": "NONE",
"rollup": false
}
},
"tuningConfig": {
"type": "kafka",
"reportParseExceptions": false
},
"ioConfig": {
"topic": "wikipedia",
"inputFormat": {
"type": "json"
},
"replicas": 2,
"taskDuration": "PT10M",
"completionTimeout": "PT20M",
"consumerProperties": {
"bootstrap.servers": "localhost:9092"
}
}
}
}
```
#### 2.3.3 CURL
我們也可以通過直接呼叫CURL來載入kafka資料
```
curl -XPOST -H'Content-Type: application/json' -d @quickstart/tutorial/wikipedia-kafka-supervisor.json http://localhost:8081/druid/indexer/v1/supervisor
```
**靜下心來,努力的提升自己,永遠都沒有錯。更多實時計算相關博文,歡迎關注實時流式計算**
![file](https://img2020.cnblogs.com/other/1089984/202003/1089984-20200317085704153-227283