1. 程式人生 > 實用技巧 >實時讀取目錄檔案到HDFS

實時讀取目錄檔案到HDFS

1. 建立配置檔案flume-dir-hdfs.conf

建立一個檔案並開啟檔案

[ck@hadoop102 job]$ touch flume-dir-hdfs.conf
[ck@hadoop102 job]$ vim flume-dir-hdfs.conf

新增如下內容

a3.sources = r3
a3.sinks = k3
a3.channels = c3

#Describe/configure the source
a3.sources.r3.type = spooldir
a3.sources.r3.spoolDir = /opt/module/flume-1.9.0/upload
a3.sources.r3.fileSuffix = .COMPLETED
a3.sources.r3.fileHeader = true
a3.sources.r3.ignorePattern = ([^ ]*\.tmp)

#Describe the sink
a3.sinks.k3.type = hdfs
a3.sinks.k3.hdfs.path = hdfs://hadoop102:9000/flume-1.9.0/upload/%Y%m%d/%H
a3.sinks.k3.hdfs.filePrefix = upload-
a3.sinks.k3.hdfs.round = true
a3.sinks.k3.hdfs.roundValue = 1
a3.sinks.k3.hdfs.roundUnit = hour
a3.sinks.k3.hdfs.useLocalTimeStamp = true
a3.sinks.k3.hdfs.batchSize = 100
a3.sinks.k3.hdfs.fileType = DataStream
a3.sinks.k3.hdfs.rollInterval = 60
a3.sinks.k3.hdfs.rollSize = 134217700
a3.sinks.k3.hdfs.rollCount = 0
a3.sinks.k3.hdfs.minBlockReplicas = 1
 
#Use a channel which buffers events in memory
a3.channels.c3.type = memory
a3.channels.c3.capacity = 1000
a3.channels.c3.transactionCapacity = 100
 
#Bind the Source and sink to the channel
a3.sources.r3.channels = c3
a3.sinks.k3.channel = c3
 

2.啟動監控資料夾命令

[ck@hadoop102 flume-1.9.0]$ bin/flume-ng agent --conf conf/ --name a3 --conf-file job/flume-dir-hdfs.conf

說明:在使用spooling Directory Source時

1) 不要在監控目錄中建立並持續修改檔案

2) 上傳完成的檔案會以.COMPLETED結尾

3) 被監控資料夾每500毫秒掃描一次檔案變動

3. 向upload資料夾中新增檔案

[ck@hadoop102 flume-1.9.0]$ mkdir upload
[ck@hadoop102 flume-1.9
.0]$ cd upload/ [ck@hadoop102 upload]$ touch ck.log [ck@hadoop102 upload]$ touch ck.txt [ck@hadoop102 upload]$ touch ck.tmp

4. 檢視HDFS上的資料

5. 等待1s,再次查詢upload資料夾

案例來源於atguigu視訊