1. 程式人生 > >Flume+HDFS+Kafka+Hive例項搭建

Flume+HDFS+Kafka+Hive例項搭建

          摘要:本文要實現的是一個使用Flume來處理Kafka的資料,並將其儲存到HDFS中去,然後通過Hive外部表關聯查詢出來儲存的資料。

所以在建立一個maven工程,整個工程最終的目錄如下:

下面開始一步一步講解

1、定義自己的source

   之所以不用源生的,是因為要對得到的訊息要一定的處理後再儲存到hdfs中去,這裡主要就是將每一條訊息解析並組裝成以“|”做分隔的一條記錄

在這個類中定義Start方法來初始化連線kafka:

在這個類中定義處理消費的方法:

其實就是將訊息處理成一條以“|”符號隔開的一條資料並放入到channel中:

中間還有一部分處理:

筆者都一而過了,這是放到event,下一步就是放在channel中了。

上面的process方法會呼叫到這:

其實就是將訊息解析,並組裝成一條以“|”隔開的資料

2、配置檔案

接下來就是配置source/channel/sink了。

配置檔案部分內容:

注意,這裡的channle使用的type是SPILLABLEMEMORY,表示source來的event都會先儲存到記憶體中去,記憶體不夠了再儲存到硬碟中去。各項引數此處不再做解釋

3、啟動指令碼:

    如果flume程式 所在的叢集不在haddop叢集中,需要將haddop叢集的/hadoop/native資料夾複製到此flume執行的機器,並且將hdfs-site.xml也一起復制過來。如果還操作了hbse/hive,那麼hbse-site.xml.hive-site.xml也要複製過來。

然後編寫啟動指令碼:

啟動執行:

停止執行:

事先要將checkpoint目錄建立起來:

4、打包

最後打包成一個tar包,並將配置檔案是jar包分離

打包後目錄:

所有的jar包都在lib目錄,而不是打成一個大jar包

在上面的啟動指令碼中就可以指明依賴的jar包:

啟動時新增 這個引數:

5、執行

flume安裝很簡單,將flume安裝包解壓後就可以,同時將我們的應用從上面的tar包上傳到此臺機器,並解壓

並進入bdp-flume建立如下幾個目錄:

channel_date:channel的硬碟儲存目錄

log:列印的日誌儲存目錄

var:存放當前flume應用的pid,主要是啟動和停止會用到

啟動:

這是log目錄下列印的部分日誌:

看一下hdfs是否有資料:

注意,這裡是直接將檔案存放到hdfs的這個路徑,並每天有一個檔案價(為後面和hive表分割槽關聯方便)

儲存的檔案:.tmp結尾的表示當前正在寫入的,還沒有滾動

因為是儲存成.snappy檔案,所以直接查詢會亂碼。如果是儲存成textfile的話,就不會。但是snappy有壓縮,textfile沒,建議使用snappy。

6、建立Hive外部表

CREATE  TABLE IF NOT EXISTS ods_uba.kafka_appchnl_source_log 
(
   source_log           string,
   muid                 string, -- 'parse ck',
   dev_no               string, -- 'uu',
   user_id              string, -- 'ur',
   mbl_nbr              string, -- 'ud',
   chnl_code            string, -- 'ch',
   opt_type             string, -- 'ac',  
   req_no               string, -- 'rn',
   vt_time              string, -- 'tm',
   st_time              string -- 'st',
) 
PARTITIONED BY (opdt string) 
row format delimited
fields terminated by '^' lines terminated by '\n'
stored as textfile
LOCATION '/hive/warehouse/ods_uba.db/kafka_appchnl_source_log';

注意:這裡Location指的是hdfs檔案存放的目錄,可以不用完全是hive表預設的儲存路徑一樣。

新增分割槽資料:

alter table kafka_appchnl_source_log add if not exists partition(opdt='2017-01-06')


查詢:

可以用presto和dbveare使用來查詢,也可以使用hive命令查詢

這是查詢出來的部分欄位:

更多技術請關注筆者微信技術公眾號"單例模式"