Flume+HDFS+Kafka+Hive例項搭建
摘要:本文要實現的是一個使用Flume來處理Kafka的資料,並將其儲存到HDFS中去,然後通過Hive外部表關聯查詢出來儲存的資料。
所以在建立一個maven工程,整個工程最終的目錄如下:
下面開始一步一步講解
1、定義自己的source
之所以不用源生的,是因為要對得到的訊息要一定的處理後再儲存到hdfs中去,這裡主要就是將每一條訊息解析並組裝成以“|”做分隔的一條記錄
在這個類中定義Start方法來初始化連線kafka:
在這個類中定義處理消費的方法:
其實就是將訊息處理成一條以“|”符號隔開的一條資料並放入到channel中:
中間還有一部分處理:
筆者都一而過了,這是放到event,下一步就是放在channel中了。
上面的process方法會呼叫到這:
其實就是將訊息解析,並組裝成一條以“|”隔開的資料
2、配置檔案
接下來就是配置source/channel/sink了。
配置檔案部分內容:
注意,這裡的channle使用的type是SPILLABLEMEMORY,表示source來的event都會先儲存到記憶體中去,記憶體不夠了再儲存到硬碟中去。各項引數此處不再做解釋
3、啟動指令碼:
如果flume程式 所在的叢集不在haddop叢集中,需要將haddop叢集的/hadoop/native資料夾複製到此flume執行的機器,並且將hdfs-site.xml也一起復制過來。如果還操作了hbse/hive,那麼hbse-site.xml.hive-site.xml也要複製過來。
然後編寫啟動指令碼:
啟動執行:
停止執行:
事先要將checkpoint目錄建立起來:
4、打包
最後打包成一個tar包,並將配置檔案是jar包分離
打包後目錄:
所有的jar包都在lib目錄,而不是打成一個大jar包
在上面的啟動指令碼中就可以指明依賴的jar包:
啟動時新增 這個引數:
5、執行
flume安裝很簡單,將flume安裝包解壓後就可以,同時將我們的應用從上面的tar包上傳到此臺機器,並解壓
並進入bdp-flume建立如下幾個目錄:
channel_date:channel的硬碟儲存目錄
log:列印的日誌儲存目錄
var:存放當前flume應用的pid,主要是啟動和停止會用到
啟動:
這是log目錄下列印的部分日誌:
看一下hdfs是否有資料:
注意,這裡是直接將檔案存放到hdfs的這個路徑,並每天有一個檔案價(為後面和hive表分割槽關聯方便)
儲存的檔案:.tmp結尾的表示當前正在寫入的,還沒有滾動
因為是儲存成.snappy檔案,所以直接查詢會亂碼。如果是儲存成textfile的話,就不會。但是snappy有壓縮,textfile沒,建議使用snappy。
6、建立Hive外部表
CREATE TABLE IF NOT EXISTS ods_uba.kafka_appchnl_source_log
(
source_log string,
muid string, -- 'parse ck',
dev_no string, -- 'uu',
user_id string, -- 'ur',
mbl_nbr string, -- 'ud',
chnl_code string, -- 'ch',
opt_type string, -- 'ac',
req_no string, -- 'rn',
vt_time string, -- 'tm',
st_time string -- 'st',
)
PARTITIONED BY (opdt string)
row format delimited
fields terminated by '^' lines terminated by '\n'
stored as textfile
LOCATION '/hive/warehouse/ods_uba.db/kafka_appchnl_source_log';
注意:這裡Location指的是hdfs檔案存放的目錄,可以不用完全是hive表預設的儲存路徑一樣。
新增分割槽資料:
alter table kafka_appchnl_source_log add if not exists partition(opdt='2017-01-06')
查詢:
可以用presto和dbveare使用來查詢,也可以使用hive命令查詢
這是查詢出來的部分欄位:
更多技術請關注筆者微信技術公眾號"單例模式"