[[email protected] conf]$ cat taildir.conf
[[email protected] hui]$ treea1.sources = r1 a1.channels = c1 a1.sinks = k1 # Describe/configure the source #source的型別為TAILDIR,這裡的型別大小寫都可以 a1.sources.r1.type = taildir a1.sources.r1.channels = c1 #儲存tial最後一個位置儲存位置 a1.sources.r1.positionFile = /home/hadoop/hui/taildir_position.json #設定tiail的組, 使用空格隔開 a1.sources.r1.filegroups = f1 f2 #設定每個分組的絕對路徑 a1.sources.r1.filegroups.f1 = /home/hadoop/hui/test1/hehe.txt a1.sources.r1.filegroups.f2 = /home/hadoop/hui/test2/.* #.匹配除換行符 \n 之外的任何單字元。*匹配前面的子表示式零次或多次。這裡也可以用messages.* a1.sources.r1.fileHeader = true # Describe the sink a1.sinks.k1.type = file_roll a1.sinks.k1.sink.directory = /home/hadoop/hui a1.sinks.k1.sink.rollInterval = 0 # Use a channel which buffers events in memory a1.channels.c1.type = memory a1.channels.c1.capacity = 1000 a1.channels.c1.transactionCapacity = 100 # Bind the source and sink to the channel a1.sources.r1.channels = c1 a1.sinks.k1.channel = c1
|-- messages.1
|-- qiang
| `-- hui.txt
|-- test1
| |-- hehe.txt
| `-- messages.2
`-- test2
|-- messages.3
`-- messages.4
3 directories, 6 files
[[email protected] hui]$ cat test1/hehe.txt hello world hehe
[[email protected] hui]$ cat test1/messages.2
hello world 2
hello world 3
[[email protected] hui]$ cat test2/messages.4
hello world 4
[[email protected] apache-flume-1.6.0-cdh5.5.2-bin]$ bin/flume-ng agent -c . -f conf/taildir.conf -n a1 -Dflume.root.logger=INFO,console
[[email protected] hui]$ ls(在hui/目錄下生成了1489881718232-1和taildir_position.json檔案)
1489881718232-1 messages.1 qiang taildir_position.json test1 test2
[[email protected] hui]$ cat 1489881718232-1
hello world hehe
hello world 3
hello world 4
[[email protected] hui]$ cat taildir_position.json
[[email protected] hui]$ echo "ni hao world" >> test1/hehe.txt
[[email protected] hui]$ cat 1489881718232-1
hello world hehe
hello world 3
hello world 4
ni hao world
[[email protected] hui]$ cat taildir_position.json
修改ReliableTaildirEventReader 類的 updateTailFiles 方法。
將其中的 tf.getPath().equals(f.getAbsolutePath()) 判斷條件去除。只用判斷檔案不為空即可,不用判斷檔案的名字。
// if (tf == null || !tf.getPath().equals(f.getAbsolutePath())) {
if (tf == null) {//檔案不存在 position 中則全讀。
修改TailFile 類的 updatePos 方法此處同樣的原因,inode 已經能夠確定唯一的 檔案了,所以不用加 path 作為判定條件了。所以去掉該條件就支援了檔案重新命名情況。
// if (this.inode == inode && this.path.equals(path)) {
if (this.inode == inode) {
[[email protected] conf]$ cat taildir.conf
a1.sources = r1
a1.channels = c1
a1.sinks = k1
# Describe/configure the source
a1.sources.r1.type = com.urey.flume.source.taildir.TaildirSource
a1.sources.r1.channels = c1
a1.sources.r1.positionFile = /home/hadoop/q1/taildir_position.json
a1.sources.r1.filegroups = f1
a1.sources.r1.filegroups.f1 = /home/hadoop/hui/.*
a1.sources.r1.batchSize = 100
a1.sources.r1.backoffSleepIncrement = 1000
a1.sources.r1.maxBackoffSleep = 5000
# Describe the sink
a1.sinks.k1.type = file_roll
a1.sinks.k1.sink.directory = /home/hadoop/q1
a1.sinks.k1.sink.rollInterval = 0
# Use a channel which buffers events in memory
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
# Bind the source and sink to the channel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
[[email protected] ~]$ mkdir q1[[email protected] hui]$ tree
|-- messages.1
`-- test2
|-- messages.3
|-- messages.4
`-- test1
|-- hehe.txt
`-- messages.2
2 directories, 5 files
[[email protected] hui]$ cat messages.1 hello world 1
[[email protected] hui]$ cat test2/messages.3
hello world 3
[[email protected] hui]$ cat test2/messages.4
hello world 4
[[email protected] hui]$ cat test2/test1/hehe.txt
hello world hehe
[[email protected] hui]$ cat test2/test1/messages.2
hello world 2
[[email protected] apache-flume-1.6.0-cdh5.5.2-bin]$ bin/flume-ng agent -c . -f conf/taildir.conf -n a1 -Dflume.root.logger=INFO,console
[[email protected] q1]$ ls
1489910670584-1 taildir_position.json
[[email protected] q1]$ cat 1489910670584-1
hello world 2
hello world 4
hello world 1
hello world hehe
hello world 3
[[email protected] q1]$ cat taildir_position.json
[[email protected] hui]$ mv test2/test1/hehe.txt test2/haha.txt
[[email protected] hui]$ cat ../q1/1489910670584-1
hello world 2
hello world 4
hello world 1
hello world hehe
hello world 3
[[email protected] hui]$ cat ../q1/taildir_position.json
[[email protected] hui]$ echo "hello world haha" >> test2/haha.txt
[[email protected] hui]$ cat ../q1/1489910670584-1
hello world 2
hello world 4
hello world 1
hello world hehe
hello world 3
hello world haha
[[email protected] hui]$ cat ../q1/taildir_position.json
[[email protected] hui]$ echo "hello china" >> test2/test1/hehe.txt
[[email protected] hui]$ cat ../q1/1489910670584-1
hello world 2
hello world 4
hello world 1
hello world hehe
hello world 3
hello world haha
hello china
[[email protected] hui]$ cat ../q1/taildir_position.json
flume 1.7.0推出了taildirSource元件。tail監控目錄下匹配上正則表示式的的所有檔案,實現斷點續傳。但是我後來發現cdh版的flume-1.6.0也已經有這個元件了,而官方的Apache版的apache-flume-1.6.0-bin卻沒有這個元件。並
