Flume-將資料寫入動態分割槽表
一、 場景描述
實時監控檔案目錄,將目錄中的實時產生的資料檔案(檔案內容非動態)寫入動態分割槽,分割槽為3級(裝置ID/檔案產生日期/檔案產生的時間(h)).檔名格式如下(日期+時間+產品ID.txt)
二、 主要存在的難點
由於flume只支援傳入一些簡單的引數變數(時間/日期/檔名等),所以這裡我們如果想動態的識別我們的檔名並直接生成sink的路徑及相應檔名有困難。
三、 解決方法
這裡我們依然選用hdfs sink來作資料消費,但要稍微改變一部分原始碼,來達到我們通過識別檔名來確定輸出路徑的目的,步驟如下:
(1) 下載apache-flume-1.7.0-src原始碼 (官網直接下載)
(2) 開啟cmd編譯原始碼(這裡用的
編譯小提示:
maven確實是一個好東西,但是在國內下載官方倉庫的jar卻是個大問題,速度不敢恭維,現在oschina的國內maven映象服務已關閉,無奈之下只能另尋門路。
今天突然發現了阿里雲maven國內映象,修改完以後速度飛一般的感覺。
修改方法:在~/.m2目錄下的settings.xml檔案中,(如果該檔案不存在,則需要從maven/conf目錄下拷貝一份),找到<mirrors>標籤,新增如下子標籤:
<mirror>
<id>alimaven</id>
<name>aliyun maven</name>
<url>http://maven.aliyun.com/nexus/content/groups/public/</url>
<mirrorOf>central</mirrorOf>
</mirror>
編譯成功後如下:
(3) 把專案匯入eclipse做修改
下圖為修改後程式碼並且在環境上執行通過並寫入後的程式碼,主要修改flume-hdfs-sink下的HDFSEentSink.java檔案,主要修改process()
修改之後直接重新將子專案flume-hdfs-sink打包為flume-hdfs-sink-1.7.0.jar,最後直接去環境下替換掉原來的jar包(路徑如下,記得將原來的jar包做備份):
到這裡我們對flume的一些小改動基本結束。
四、 測試
(1)建立flume配置檔案,簡單的flume配置檔案如下:
#agent1表示代理名稱
agent1.sources=source1
agent1.sinks=sink1
agent1.channels=channel1
#Spooling Directory是監控指定資料夾中新檔案的變化,一旦新檔案出現,就解析該檔案內容,然後寫入到channle。寫入完成後,標記該檔案已完成或者刪除該檔案。
#配置source1
agent1.sources.source1.type=spooldir
agent1.sources.source1.spoolDir=/tracy/flume_test
agent1.sources.source1.basenameHeader=true
agent1.sources.source1.basenameHeaderKey=fileName
agent1.sources.source1.deletePolicy=immediate
#batchSize是針對Source和Sink提出的一個概念,它用來限制source和sink對event批量處理的
agent1.sources.source1.batchSize=1000
#channel存event的最大數量
a1.channels.c1.capacity = 1000
#每次從source到channel或從channel到sink的event最大的吞吐量
a1.channels.c1.transactionCapacity = 100
#channle滿載的情況下30s後丟擲異常
agent1.channels.ch1.keep-alive = 30
agent1.sources.source1.request-timeout=2000
agent1.sources.source1.connect-timeout=3000
agent1.sources.source1.channels=channel1
#加攔截器
#agent1.sources.source1.interceptors = i1
#時間戳攔截器
#agent1.sources.source1.interceptors.i1.type = timestamp
#配置channel1
agent1.channels.channel1.type=file
agent1.channels.channel1.checkpointDir=/tracy/flume_test1
agent1.channels.channel1.dataDirs=/tracy/flume_test2
#配置sink1
agent1.sinks.sink1.type=hdfs
agent1.sinks.sink1.hdfs.path=hdfs://hadoop.nameNode1:9000/user/hive/warehouse/test.db/info_flume_data_dt1/
#DataStream類似於textfile
agent1.sinks.sink1.hdfs.fileType=DataStream
#只寫入event的body部分
agent1.sinks.sink1.hdfs.writeFormat=TEXT
agent1.sinks.sink1.hdfs.batchSize = 1
agent1.sinks.sink1.hdfs.rollInterval = 1
agent1.sinks.sink1.hdfs.rollcount = 1
agent1.sinks.sink1.hdfs.rollsize = 0
agent1.sinks.sink1.hdfs.inUsePrefix=t
agent1.sinks.sink1.hdfs.inUseSuffix=.temp
agent1.sinks.sink1.channel=channel1
agent1.sinks.sink1.hdfs.filePrefix=%{fileName}
(2)事先在Hive建最終要落地的分割槽表如下:
DROP TABLE IF EXISTS INFO_FLUME_DATA_DT1;
CREATE TABLE IF NOT EXISTS INFO_FLUME_DATA_DT1(
t_date string comment '時間',
detail string comment '引數'
) PARTITIONED BY (product string,l_date date,houra string)
--clustered by (t_date) sorted by(detail) into 4 buckets
ROW FORMAT DELIMITED
FIELDS TERMINATED BY '\t'
LINES TERMINATED BY '\n'
STORED AS TEXTFILE;
(4) 啟動我們的單節點flume進行測試:
./flume-ng agent -n agent1 -c ../conf -f ../conf/flume-conf-hdfs.properties -Dflume.root.logger=DEBUG,console
(5) 往flume監控的目錄放入檔案:
Flume控制臺輸出如下:
這裡可以看到我們程式碼中打樁的位置,路徑檔名沒什麼問題,再去hdfs路徑下看下檔案是否寫入:
沒問題
(6) 檢視hive表資料情況:
這裡我們要注意,要想我們在hdfs的資料在hive表可見,我們首先要alter一下分割槽:
ALTER TABLE INFO_FLUME_DATA_DT1
ADD PARTITION (product='02TE04-3',l_date='2017-03-13',houra='22');
最後去hive表看資料:
沒問題。
實際情境下我們不可能每天手動去新增分割槽,每天跑個指令碼把分割槽新增好就ok,如下(舉例):
# !/bin/sh
for((i=0;i<24;i++))
do
beeline -u jdbc:hive2:// --verbose=true -e "ALTER TABLE test.INFO_FLUME_DATA_DT1 ADD PARTITION (product='02TE04-4',l_date='`date +"%Y-%m-%d"`',houra='$i')"
echo $i
done