1. 程式人生 > >flume的TaildirSource介紹及升級改造

flume的TaildirSource介紹及升級改造

flume 1.7.0推出了taildirSource元件。tail監控目錄下匹配上正則表示式的的所有檔案,實現斷點續傳。

但是我後來發現cdh版的flume-1.6.0也已經有這個元件了,而官方的Apache版的apache-flume-1.6.0-bin卻沒有這個元件。並且Apache版的flume1.7.0比cdh版的flume1.6.0的org.apache.flume.source.taildir包中多出TaildirMatcher.java,並且其他的程式碼也有所不同,應該Apache版的flume1.7.0比cdh版的flume1.6.0更加完善,兩者的差異在哪裡,有時間再研究研究吧。因為我的機器上已經裝有cdh版的apache-flume-1.6.0-cdh5.5.2-bin,所以就直接用這個而沒有重灌Apache版的flume1.7.0了,並且apache-flume-1.6.0-cdh5.5.2-bin目前已經夠用了。

[[email protected] conf]$ cat taildir.conf 

a1.sources = r1
a1.channels = c1
a1.sinks = k1

# Describe/configure the source
#source的型別為TAILDIR,這裡的型別大小寫都可以
a1.sources.r1.type = taildir
a1.sources.r1.channels = c1
#儲存tial最後一個位置儲存位置
a1.sources.r1.positionFile = /home/hadoop/hui/taildir_position.json
#設定tiail的組, 使用空格隔開
a1.sources.r1.filegroups = f1 f2
#設定每個分組的絕對路徑
a1.sources.r1.filegroups.f1 = /home/hadoop/hui/test1/hehe.txt
a1.sources.r1.filegroups.f2 = /home/hadoop/hui/test2/.*
#.匹配除換行符 \n 之外的任何單字元。*匹配前面的子表示式零次或多次。這裡也可以用messages.*
a1.sources.r1.fileHeader = true

# Describe the sink
a1.sinks.k1.type = file_roll
a1.sinks.k1.sink.directory = /home/hadoop/hui
a1.sinks.k1.sink.rollInterval = 0
 
# Use a channel which buffers events in memory
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
 
# Bind the source and sink to the channel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
[[email protected] hui]$ tree
.
|-- messages.1
|-- qiang
|   `-- hui.txt
|-- test1
|   |-- hehe.txt
|   `-- messages.2
`-- test2
    |-- messages.3
    `-- messages.4

3 directories, 6 files
[[email protected] hui]$ cat test1/hehe.txt 
hello world hehe
[[email protected] hui]$ cat test1/messages.2 
hello world 2
[
[email protected]
hui]$ cat test2/messages.3 
hello world 3
[[email protected] hui]$ cat test2/messages.4
hello world 4

啟動flume程序:
[[email protected] apache-flume-1.6.0-cdh5.5.2-bin]$ bin/flume-ng agent -c . -f conf/taildir.conf -n a1 -Dflume.root.logger=INFO,console

[[email protected] hui]$ ls(在hui/目錄下生成了1489881718232-1和taildir_position.json檔案)
1489881718232-1  messages.1  qiang  taildir_position.json  test1  test2
[[email protected] hui]$ cat 1489881718232-1 
hello world hehe
hello world 3
hello world 4
[[email protected] hui]$ cat taildir_position.json 
[{"inode":1532721,"pos":17,"file":"/home/hadoop/hui/test1/hehe.txt"},{"inode":1532719,"pos":14,"file":"/home/hadoop/hui/test2/messages.3"},{"inode":1532720,"pos":14,"file":"/home/hadoop/hui/test2/messages.4"}]

往test1/hehe.txt中傳入資料:
[[email protected] hui]$ echo "ni hao world" >> test1/hehe.txt 

再觀察1489881718232-1和taildir_position.json檔案
[[email protected] hui]$ cat 1489881718232-1 
hello world hehe
hello world 3
hello world 4
ni hao world
[[email protected] hui]$ cat taildir_position.json 
[{"inode":1532721,"pos":30,"file":"/home/hadoop/hui/test1/hehe.txt"},{"inode":1532719,"pos":14,"file":"/home/hadoop/hui/test2/messages.3"},{"inode":1532720,"pos":14,"file":"/home/hadoop/hui/test2/messages.4"}]

限制:
1.可以配置一個監聽目錄,會監聽該目錄下所有的檔案,但是如果配置目錄下面嵌套了子目錄,則無法監聽。需要修改原始碼,我們可以遞迴地對配置目錄的所有子目錄的所有檔案進行監聽
2.taildirSource元件不支援檔案改名的。如果檔案改名會認為是新檔案,就會重新讀取,這就導致了日誌檔案重讀。(應用場景:Linux的系統日誌輪詢功能)

解決限制一:
網上已經有修改後的程式碼了,下載地址:https://github.com/qwurey/flume-source-taildir-recursive


將程式碼下載下來並匯入myeclipse中


解決限制二:
修改ReliableTaildirEventReader 類的 updateTailFiles 方法。
將其中的 tf.getPath().equals(f.getAbsolutePath()) 判斷條件去除。只用判斷檔案不為空即可,不用判斷檔案的名字。

//        if (tf == null || !tf.getPath().equals(f.getAbsolutePath())) {
                if (tf == null) {//檔案不存在 position 中則全讀。
修改TailFile 類的 updatePos 方法
此處同樣的原因,inode 已經能夠確定唯一的 檔案了,所以不用加 path 作為判定條件了。所以去掉該條件就支援了檔案重新命名情況。
//     if (this.inode == inode && this.path.equals(path)) {
          if (this.inode == inode) {
修改這兩個地方就支援了檔案重新命名的問題,實現了目錄下多檔案監控,斷點續傳。

新增自定義source入口,也就是將原始碼拷貝過來,然後將修改過的程式碼打包為自定義taildir的jar包執行flume。將taidir.jar匯入到flume的lib目錄下。

[[email protected] conf]$ cat taildir.conf

a1.sources = r1
a1.channels = c1
a1.sinks = k1
  
# Describe/configure the source
a1.sources.r1.type = com.urey.flume.source.taildir.TaildirSource
a1.sources.r1.channels = c1
a1.sources.r1.positionFile = /home/hadoop/q1/taildir_position.json
a1.sources.r1.filegroups = f1
a1.sources.r1.filegroups.f1 = /home/hadoop/hui/.*
a1.sources.r1.batchSize = 100
a1.sources.r1.backoffSleepIncrement  = 1000
a1.sources.r1.maxBackoffSleep  = 5000

# Describe the sink
a1.sinks.k1.type = file_roll
a1.sinks.k1.sink.directory = /home/hadoop/q1
a1.sinks.k1.sink.rollInterval = 0

# Use a channel which buffers events in memory
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100

# Bind the source and sink to the channel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
[[email protected] ~]$ mkdir q1
[[email protected] hui]$ tree
.
|-- messages.1
`-- test2
    |-- messages.3
    |-- messages.4
    `-- test1
        |-- hehe.txt
        `-- messages.2

2 directories, 5 files
[[email protected] hui]$ cat messages.1 
hello world 1
[[email protected] hui]$ cat test2/messages.3
hello world 3
[[email protected] hui]$ cat test2/messages.4 
hello world 4
[[email protected] hui]$ cat test2/test1/hehe.txt 
hello world hehe
[[email protected] hui]$ cat test2/test1/messages.2 
hello world 2

啟動flume:
[[email protected] apache-flume-1.6.0-cdh5.5.2-bin]$ bin/flume-ng agent -c . -f conf/taildir.conf -n a1 -Dflume.root.logger=INFO,console

[[email protected] q1]$ ls
1489910670584-1  taildir_position.json
[[email protected] q1]$ cat 1489910670584-1 
hello world 2
hello world 4
hello world 1
hello world hehe
hello world 3
[[email protected] q1]$ cat taildir_position.json 
[{"inode":1532727,"pos":14,"file":"/home/hadoop/hui/test2/test1/messages.2"},{"inode":1532720,"pos":14,"file":"/home/hadoop/hui/test2/messages.4"},{"inode":1532722,"pos":14,"file":"/home/hadoop/hui/messages.1"},{"inode":1532718,"pos":17,"file":"/home/hadoop/hui/test2/test1/hehe.txt"},{"inode":1532712,"pos":14,"file":"/home/hadoop/hui/test2/messages.3"}]

[[email protected] hui]$ mv test2/test1/hehe.txt test2/haha.txt
[[email protected] hui]$ cat ../q1/1489910670584-1 
hello world 2
hello world 4
hello world 1
hello world hehe
hello world 3
[[email protected] hui]$ cat ../q1/taildir_position.json 
[{"inode":1532727,"pos":14,"file":"/home/hadoop/hui/test2/test1/messages.2"},{"inode":1532720,"pos":14,"file":"/home/hadoop/hui/test2/messages.4"},{"inode":1532722,"pos":14,"file":"/home/hadoop/hui/messages.1"},{"inode":1532718,"pos":17,"file":"/home/hadoop/hui/test2/test1/hehe.txt"},{"inode":1532712,"pos":14,"file":"/home/hadoop/hui/test2/messages.3"}]

[[email protected] hui]$ echo "hello world haha" >> test2/haha.txt 
[[email protected] hui]$ cat ../q1/1489910670584-1 
hello world 2
hello world 4
hello world 1
hello world hehe
hello world 3
hello world haha
[[email protected] hui]$ cat ../q1/taildir_position.json 
[{"inode":1532727,"pos":14,"file":"/home/hadoop/hui/test2/test1/messages.2"},{"inode":1532720,"pos":14,"file":"/home/hadoop/hui/test2/messages.4"},{"inode":1532722,"pos":14,"file":"/home/hadoop/hui/messages.1"},{"inode":1532712,"pos":14,"file":"/home/hadoop/hui/test2/messages.3"},{"inode":1532718,"pos":34,"file":"/home/hadoop/hui/test2/haha.txt"}]

[[email protected] hui]$ echo "hello china" >> test2/test1/hehe.txt
[[email protected] hui]$ cat ../q1/1489910670584-1 
hello world 2
hello world 4
hello world 1
hello world hehe
hello world 3
hello world haha
hello china
[[email protected] hui]$ cat ../q1/taildir_position.json 
[{"inode":1532727,"pos":14,"file":"/home/hadoop/hui/test2/test1/messages.2"},{"inode":1532720,"pos":14,"file":"/home/hadoop/hui/test2/messages.4"},{"inode":1532722,"pos":14,"file":"/home/hadoop/hui/messages.1"},{"inode":1532712,"pos":14,"file":"/home/hadoop/hui/test2/messages.3"},{"inode":1532718,"pos":34,"file":"/home/hadoop/hui/test2/haha.txt"},{"inode":1532714,"pos":12,"file":"/home/hadoop/hui/test2/test1/hehe.txt"}]

參考:
http://blog.csdn.net/u012373815/article/details/62241528
http://www.2cto.com/net/201703/616085.html
http://blog.csdn.net/yeruby/article/details/51812759

相關推薦

flume的TaildirSource介紹升級改造

flume 1.7.0推出了taildirSource元件。tail監控目錄下匹配上正則表示式的的所有檔案,實現斷點續傳。但是我後來發現cdh版的flume-1.6.0也已經有這個元件了,而官方的Apache版的apache-flume-1.6.0-bin卻沒有這個元件。並

Windows 活動目錄(AD)服務器系統升級到2012之環境介紹準備(一)

AD 升級 activedirectory DHCP 1. AD服務器相關配置信息1 2 1.1 系統版本角色名稱操作系統版本語言數量ADWindows server 2003 sp2CN2ADWindows server 2012 R2CN21.2 主機名和IP地址服務器主機名IP地

JavaWeb網上圖書商城完整項目--day03-1.圖書模塊功能介紹相關類創建

class default package ren 書籍 logs main java getc 1 前兩天我們學習了user用戶模塊和圖書的分類模塊,接下來我們學習圖書模塊 圖書模塊的功能主要是下面的功能: 2 接下來我們創建對應的包 我們來看看對應的數據庫表t_bo

C#數據緩存介紹Caching通用幫助類整理

能夠 eric article for generic arr stat ati cti C#緩存主要是為了提高數據的讀取速度。由於server和應用client之間存在著流量的瓶頸,所以讀取大容量數據時,使用緩存來直接為client服務,能夠降低client與serv

ssh介紹scp,sftp應用

network 服務器 數據安全 linux 數據包 一、ssh介紹 SSH是 secure Shell Protocol的簡寫,由IETF網絡工作小組(Network Working Group)制定;在進行數據傳輸之前 ,SSH先對聯機數據包通過加密技術進行加密處理,加密後再進行數據

自動化運維之saltstack(二)states介紹使用

配置文件 如何 states master 根目錄 一、什麽是Salt States?Salt States是Salt模塊的擴展,主系統使用的狀態系統叫SLS系統,SLS代表Saltstack State,Salt是一些狀態文件,其中包含有關如何配置Salt子節點的信息,這些狀態被存放在一

fusion-io工具--更換fusion-ioSSD無法識別升級指引

6.5 status 普通 拷貝 無法識別 其它 擦除 ase ios centos6.5系統,(含fusion-io共7塊盤)庫存的fusion-io卡需要升級固件才能識別 現在只認到6個盤 fdisk -l 2>/dev/null | grep ‘Disk /d

mongoDB簡單介紹安裝

疑問 每次 data- .org 存儲 cmd 針對 安裝包 目錄 近期一段時間對mongoDB進行了簡單的學習,從它是什麽?幹什麽?怎麽用?優缺點?這一系列的疑問到如今可以簡單運用。我想須要對其進行簡單的總結和概述。那麽這一篇就從最基礎的開始,對其

展示C代碼覆蓋率的gcovr工具簡單介紹相關命令使用演示樣例

文件夾 mes repo 例如 oid else if dir total down (本人正在參加2015博客之星評選,誠邀你來投票,謝謝:username=zhouzxi">http://vote.blog.csdn.net/blogstar2015

Nessus離線安裝升級插件

家庭 clas 服務 軟件 update 專業 ddb 內網 用戶賬號 最近做客戶的內網主機漏洞掃描,申請了一臺內網主機做掃描服務器,安裝Nessus。由於客戶嚴格限制內網主機不能開通外網訪問權限,折騰了一下Nessus離線激活和離線更新漏洞插件,詳細過程截圖記錄。 一、安

架構師之路--搜索業務和技術介紹容錯機制

朋友 單節點 adb 一致性 公司 一個 memcache 消息通知 包括  今天和搜索部門一起做了一下MQ的遷移,順便交流一下業務和技術。發現現在90後小夥都挺不錯。我是指能力和探究心。我家男孩,不招女婿。   在前面的文章中也提到,我們有媒資庫(樂視視頻音頻本身內容)

UltraEdit正則表達式介紹實例

官網 文檔 ltr 能夠 本科 min 組合 表達 使用方法 前幾天,有個將Excel中的數據導入到數據庫中的需求。原本想到用程序讀取Excel中的數據並存儲到數據庫中,但經一哥們的提醒,說用 EditPlus或UltraEdit這種工具直接將數據拼湊成SQL插

Opencv介紹opencv3.0在 vs2010上的配置

opencv介紹、opencv3.0在 vs2010上如何的配置 OpenCV是一個基於BSD許可(開源)發行的跨平臺計算機視覺庫,可以運行在Linux、Windows、Android和Mac OS操作系統上。它輕量級而且高效——由一系列 C 函數和少量 C++ 類構成,同時提供了Python、R

C#操作Word Aspose.Words組件介紹使用—基本介紹與DOM概述

控制 包含 枚舉類 讀取 標記 服務器端 方法 python level 1.基本介紹 Aspose.Words是一個商業.NET類庫,可以使得應用程序處理大量的文件任務。Aspose.Words支持Doc,Docx,RTF,HTML,OpenDocument,PDF,XP

Golang學習-第一篇 Golang的簡單介紹Windows環境下安裝、部署

需要 簡單 電腦 pan 生成文件 多核 -- pear () 序言 這是本人博客園第一篇文章,寫的不到位之處,希望各位看客們諒解。 本人一直從事.NET的開發工作,最近在學習Golang,所以想著之前學習的過程中都沒怎麽好好的將學習過程記錄下來。深感惋惜! 現在將Gola

樸素貝葉斯分類算法介紹python代碼實現案例

urn bus 人的 元素 1.2 -s index 代碼 步驟 樸素貝葉斯分類算法 1、樸素貝葉斯分類算法原理 1.1、概述 貝葉斯分類算法是一大類分類算法的總稱 貝葉斯分類算法以樣本可能屬於某類的概率來作為分類依據 樸素貝葉斯分類算法是貝葉斯分類算法中最簡單的一種 註:

RabbitMQ介紹安裝部署

lan 行數 安裝部署 原理圖 tro 快的 它的 主題 通配符 本節內容: RabbitMQ介紹 RabbitMQ運行原理 RabbitMQ重要術語 三種ExchangeType RabbitMQ集群種類 集群基本概念 鏡像模式部署集群 一、RabbitMQ介紹

Storm介紹安裝部署

節點和 yam 實時計算系統 如果 int 端口 bili usr then 本節內容: Apache Storm是什麽 Apache Storm核心概念 Storm原理架構 Storm集群安裝部署 啟動storm ui、Nimbus和Supervisor 一、Ap

Python零基礎學習系列之二--Python介紹環境搭建

url 軟件包 三方庫 簡單的 lin 文件的 span 高級程序設計 擴展 1-1、Python簡介:  Python是一種解釋型、面向對象、動態數據類型的高級程序設計語言。Python由Guido van Rossum於1989年底發明,第一個公開發行版發行於1991年

Charles界面介紹使用方法

ade 當前 lin rewrite ip地址 鼠標滑過 列表 exp 自動 本隨筆主要內容: 一、Charles界面介紹 二、Charles使用 1.會話(Repeat、Focus、Compare、黑白名單等) 2.模擬請求做mock,使用斷點、Map或Rewrite