flume的TaildirSource介紹及升級改造
但是我後來發現cdh版的flume-1.6.0也已經有這個元件了,而官方的Apache版的apache-flume-1.6.0-bin卻沒有這個元件。並且Apache版的flume1.7.0比cdh版的flume1.6.0的org.apache.flume.source.taildir包中多出TaildirMatcher.java,並且其他的程式碼也有所不同,應該Apache版的flume1.7.0比cdh版的flume1.6.0更加完善,兩者的差異在哪裡,有時間再研究研究吧。因為我的機器上已經裝有cdh版的apache-flume-1.6.0-cdh5.5.2-bin,所以就直接用這個而沒有重灌Apache版的flume1.7.0了,並且apache-flume-1.6.0-cdh5.5.2-bin目前已經夠用了。
[[email protected] conf]$ cat taildir.conf
[[email protected] hui]$ treea1.sources = r1 a1.channels = c1 a1.sinks = k1 # Describe/configure the source #source的型別為TAILDIR,這裡的型別大小寫都可以 a1.sources.r1.type = taildir a1.sources.r1.channels = c1 #儲存tial最後一個位置儲存位置 a1.sources.r1.positionFile = /home/hadoop/hui/taildir_position.json #設定tiail的組, 使用空格隔開 a1.sources.r1.filegroups = f1 f2 #設定每個分組的絕對路徑 a1.sources.r1.filegroups.f1 = /home/hadoop/hui/test1/hehe.txt a1.sources.r1.filegroups.f2 = /home/hadoop/hui/test2/.* #.匹配除換行符 \n 之外的任何單字元。*匹配前面的子表示式零次或多次。這裡也可以用messages.* a1.sources.r1.fileHeader = true # Describe the sink a1.sinks.k1.type = file_roll a1.sinks.k1.sink.directory = /home/hadoop/hui a1.sinks.k1.sink.rollInterval = 0 # Use a channel which buffers events in memory a1.channels.c1.type = memory a1.channels.c1.capacity = 1000 a1.channels.c1.transactionCapacity = 100 # Bind the source and sink to the channel a1.sources.r1.channels = c1 a1.sinks.k1.channel = c1
.
|-- messages.1
|-- qiang
| `-- hui.txt
|-- test1
| |-- hehe.txt
| `-- messages.2
`-- test2
|-- messages.3
`-- messages.4
3 directories, 6 files
[[email protected] hui]$ cat test1/hehe.txt hello world hehe
[[email protected] hui]$ cat test1/messages.2
hello world 2
[
hello world 3
[[email protected] hui]$ cat test2/messages.4
hello world 4
啟動flume程序:
[[email protected] apache-flume-1.6.0-cdh5.5.2-bin]$ bin/flume-ng agent -c . -f conf/taildir.conf -n a1 -Dflume.root.logger=INFO,console
[[email protected] hui]$ ls(在hui/目錄下生成了1489881718232-1和taildir_position.json檔案)
1489881718232-1 messages.1 qiang taildir_position.json test1 test2
[[email protected] hui]$ cat 1489881718232-1
hello world hehe
hello world 3
hello world 4
[[email protected] hui]$ cat taildir_position.json
[{"inode":1532721,"pos":17,"file":"/home/hadoop/hui/test1/hehe.txt"},{"inode":1532719,"pos":14,"file":"/home/hadoop/hui/test2/messages.3"},{"inode":1532720,"pos":14,"file":"/home/hadoop/hui/test2/messages.4"}]
往test1/hehe.txt中傳入資料:
[[email protected] hui]$ echo "ni hao world" >> test1/hehe.txt
再觀察1489881718232-1和taildir_position.json檔案
[[email protected] hui]$ cat 1489881718232-1
hello world hehe
hello world 3
hello world 4
ni hao world
[[email protected] hui]$ cat taildir_position.json
[{"inode":1532721,"pos":30,"file":"/home/hadoop/hui/test1/hehe.txt"},{"inode":1532719,"pos":14,"file":"/home/hadoop/hui/test2/messages.3"},{"inode":1532720,"pos":14,"file":"/home/hadoop/hui/test2/messages.4"}]
限制:
1.可以配置一個監聽目錄,會監聽該目錄下所有的檔案,但是如果配置目錄下面嵌套了子目錄,則無法監聽。需要修改原始碼,我們可以遞迴地對配置目錄的所有子目錄的所有檔案進行監聽
2.taildirSource元件不支援檔案改名的。如果檔案改名會認為是新檔案,就會重新讀取,這就導致了日誌檔案重讀。(應用場景:Linux的系統日誌輪詢功能)
解決限制一:
網上已經有修改後的程式碼了,下載地址:https://github.com/qwurey/flume-source-taildir-recursive
將程式碼下載下來並匯入myeclipse中
解決限制二:
修改ReliableTaildirEventReader 類的 updateTailFiles 方法。
將其中的 tf.getPath().equals(f.getAbsolutePath()) 判斷條件去除。只用判斷檔案不為空即可,不用判斷檔案的名字。
// if (tf == null || !tf.getPath().equals(f.getAbsolutePath())) {
if (tf == null) {//檔案不存在 position 中則全讀。
修改TailFile 類的 updatePos 方法此處同樣的原因,inode 已經能夠確定唯一的 檔案了,所以不用加 path 作為判定條件了。所以去掉該條件就支援了檔案重新命名情況。
// if (this.inode == inode && this.path.equals(path)) {
if (this.inode == inode) {
修改這兩個地方就支援了檔案重新命名的問題,實現了目錄下多檔案監控,斷點續傳。新增自定義source入口,也就是將原始碼拷貝過來,然後將修改過的程式碼打包為自定義taildir的jar包執行flume。將taidir.jar匯入到flume的lib目錄下。
[[email protected] conf]$ cat taildir.conf
a1.sources = r1
a1.channels = c1
a1.sinks = k1
# Describe/configure the source
a1.sources.r1.type = com.urey.flume.source.taildir.TaildirSource
a1.sources.r1.channels = c1
a1.sources.r1.positionFile = /home/hadoop/q1/taildir_position.json
a1.sources.r1.filegroups = f1
a1.sources.r1.filegroups.f1 = /home/hadoop/hui/.*
a1.sources.r1.batchSize = 100
a1.sources.r1.backoffSleepIncrement = 1000
a1.sources.r1.maxBackoffSleep = 5000
# Describe the sink
a1.sinks.k1.type = file_roll
a1.sinks.k1.sink.directory = /home/hadoop/q1
a1.sinks.k1.sink.rollInterval = 0
# Use a channel which buffers events in memory
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
# Bind the source and sink to the channel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
[[email protected] ~]$ mkdir q1[[email protected] hui]$ tree
.
|-- messages.1
`-- test2
|-- messages.3
|-- messages.4
`-- test1
|-- hehe.txt
`-- messages.2
2 directories, 5 files
[[email protected] hui]$ cat messages.1 hello world 1
[[email protected] hui]$ cat test2/messages.3
hello world 3
[[email protected] hui]$ cat test2/messages.4
hello world 4
[[email protected] hui]$ cat test2/test1/hehe.txt
hello world hehe
[[email protected] hui]$ cat test2/test1/messages.2
hello world 2
啟動flume:
[[email protected] apache-flume-1.6.0-cdh5.5.2-bin]$ bin/flume-ng agent -c . -f conf/taildir.conf -n a1 -Dflume.root.logger=INFO,console
[[email protected] q1]$ ls
1489910670584-1 taildir_position.json
[[email protected] q1]$ cat 1489910670584-1
hello world 2
hello world 4
hello world 1
hello world hehe
hello world 3
[[email protected] q1]$ cat taildir_position.json
[{"inode":1532727,"pos":14,"file":"/home/hadoop/hui/test2/test1/messages.2"},{"inode":1532720,"pos":14,"file":"/home/hadoop/hui/test2/messages.4"},{"inode":1532722,"pos":14,"file":"/home/hadoop/hui/messages.1"},{"inode":1532718,"pos":17,"file":"/home/hadoop/hui/test2/test1/hehe.txt"},{"inode":1532712,"pos":14,"file":"/home/hadoop/hui/test2/messages.3"}]
[[email protected] hui]$ mv test2/test1/hehe.txt test2/haha.txt
[[email protected] hui]$ cat ../q1/1489910670584-1
hello world 2
hello world 4
hello world 1
hello world hehe
hello world 3
[[email protected] hui]$ cat ../q1/taildir_position.json
[{"inode":1532727,"pos":14,"file":"/home/hadoop/hui/test2/test1/messages.2"},{"inode":1532720,"pos":14,"file":"/home/hadoop/hui/test2/messages.4"},{"inode":1532722,"pos":14,"file":"/home/hadoop/hui/messages.1"},{"inode":1532718,"pos":17,"file":"/home/hadoop/hui/test2/test1/hehe.txt"},{"inode":1532712,"pos":14,"file":"/home/hadoop/hui/test2/messages.3"}]
[[email protected] hui]$ echo "hello world haha" >> test2/haha.txt
[[email protected] hui]$ cat ../q1/1489910670584-1
hello world 2
hello world 4
hello world 1
hello world hehe
hello world 3
hello world haha
[[email protected] hui]$ cat ../q1/taildir_position.json
[{"inode":1532727,"pos":14,"file":"/home/hadoop/hui/test2/test1/messages.2"},{"inode":1532720,"pos":14,"file":"/home/hadoop/hui/test2/messages.4"},{"inode":1532722,"pos":14,"file":"/home/hadoop/hui/messages.1"},{"inode":1532712,"pos":14,"file":"/home/hadoop/hui/test2/messages.3"},{"inode":1532718,"pos":34,"file":"/home/hadoop/hui/test2/haha.txt"}]
[[email protected] hui]$ echo "hello china" >> test2/test1/hehe.txt
[[email protected] hui]$ cat ../q1/1489910670584-1
hello world 2
hello world 4
hello world 1
hello world hehe
hello world 3
hello world haha
hello china
[[email protected] hui]$ cat ../q1/taildir_position.json
[{"inode":1532727,"pos":14,"file":"/home/hadoop/hui/test2/test1/messages.2"},{"inode":1532720,"pos":14,"file":"/home/hadoop/hui/test2/messages.4"},{"inode":1532722,"pos":14,"file":"/home/hadoop/hui/messages.1"},{"inode":1532712,"pos":14,"file":"/home/hadoop/hui/test2/messages.3"},{"inode":1532718,"pos":34,"file":"/home/hadoop/hui/test2/haha.txt"},{"inode":1532714,"pos":12,"file":"/home/hadoop/hui/test2/test1/hehe.txt"}]
參考:
http://blog.csdn.net/u012373815/article/details/62241528
http://www.2cto.com/net/201703/616085.html
http://blog.csdn.net/yeruby/article/details/51812759
相關推薦
flume的TaildirSource介紹及升級改造
flume 1.7.0推出了taildirSource元件。tail監控目錄下匹配上正則表示式的的所有檔案,實現斷點續傳。但是我後來發現cdh版的flume-1.6.0也已經有這個元件了,而官方的Apache版的apache-flume-1.6.0-bin卻沒有這個元件。並
Windows 活動目錄(AD)服務器系統升級到2012之環境介紹及準備(一)
AD 升級 activedirectory DHCP 1. AD服務器相關配置信息1 2 1.1 系統版本角色名稱操作系統版本語言數量ADWindows server 2003 sp2CN2ADWindows server 2012 R2CN21.2 主機名和IP地址服務器主機名IP地
JavaWeb網上圖書商城完整項目--day03-1.圖書模塊功能介紹及相關類創建
class default package ren 書籍 logs main java getc 1 前兩天我們學習了user用戶模塊和圖書的分類模塊,接下來我們學習圖書模塊 圖書模塊的功能主要是下面的功能: 2 接下來我們創建對應的包 我們來看看對應的數據庫表t_bo
C#數據緩存介紹及Caching通用幫助類整理
能夠 eric article for generic arr stat ati cti C#緩存主要是為了提高數據的讀取速度。由於server和應用client之間存在著流量的瓶頸,所以讀取大容量數據時,使用緩存來直接為client服務,能夠降低client與serv
ssh介紹及scp,sftp應用
network 服務器 數據安全 linux 數據包 一、ssh介紹 SSH是 secure Shell Protocol的簡寫,由IETF網絡工作小組(Network Working Group)制定;在進行數據傳輸之前 ,SSH先對聯機數據包通過加密技術進行加密處理,加密後再進行數據
自動化運維之saltstack(二)states介紹及使用
配置文件 如何 states master 根目錄 一、什麽是Salt States?Salt States是Salt模塊的擴展,主系統使用的狀態系統叫SLS系統,SLS代表Saltstack State,Salt是一些狀態文件,其中包含有關如何配置Salt子節點的信息,這些狀態被存放在一
fusion-io工具--更換fusion-ioSSD無法識別及升級指引
6.5 status 普通 拷貝 無法識別 其它 擦除 ase ios centos6.5系統,(含fusion-io共7塊盤)庫存的fusion-io卡需要升級固件才能識別 現在只認到6個盤 fdisk -l 2>/dev/null | grep ‘Disk /d
mongoDB簡單介紹及安裝
疑問 每次 data- .org 存儲 cmd 針對 安裝包 目錄 近期一段時間對mongoDB進行了簡單的學習,從它是什麽?幹什麽?怎麽用?優缺點?這一系列的疑問到如今可以簡單運用。我想須要對其進行簡單的總結和概述。那麽這一篇就從最基礎的開始,對其
展示C代碼覆蓋率的gcovr工具簡單介紹及相關命令使用演示樣例
文件夾 mes repo 例如 oid else if dir total down (本人正在參加2015博客之星評選,誠邀你來投票,謝謝:username=zhouzxi">http://vote.blog.csdn.net/blogstar2015
Nessus離線安裝及升級插件
家庭 clas 服務 軟件 update 專業 ddb 內網 用戶賬號 最近做客戶的內網主機漏洞掃描,申請了一臺內網主機做掃描服務器,安裝Nessus。由於客戶嚴格限制內網主機不能開通外網訪問權限,折騰了一下Nessus離線激活和離線更新漏洞插件,詳細過程截圖記錄。 一、安
架構師之路--搜索業務和技術介紹及容錯機制
朋友 單節點 adb 一致性 公司 一個 memcache 消息通知 包括 今天和搜索部門一起做了一下MQ的遷移,順便交流一下業務和技術。發現現在90後小夥都挺不錯。我是指能力和探究心。我家男孩,不招女婿。 在前面的文章中也提到,我們有媒資庫(樂視視頻音頻本身內容)
UltraEdit正則表達式介紹及實例
官網 文檔 ltr 能夠 本科 min 組合 表達 使用方法 前幾天,有個將Excel中的數據導入到數據庫中的需求。原本想到用程序讀取Excel中的數據並存儲到數據庫中,但經一哥們的提醒,說用 EditPlus或UltraEdit這種工具直接將數據拼湊成SQL插
Opencv介紹及opencv3.0在 vs2010上的配置
opencv介紹、opencv3.0在 vs2010上如何的配置 OpenCV是一個基於BSD許可(開源)發行的跨平臺計算機視覺庫,可以運行在Linux、Windows、Android和Mac OS操作系統上。它輕量級而且高效——由一系列 C 函數和少量 C++ 類構成,同時提供了Python、R
C#操作Word Aspose.Words組件介紹及使用—基本介紹與DOM概述
控制 包含 枚舉類 讀取 標記 服務器端 方法 python level 1.基本介紹 Aspose.Words是一個商業.NET類庫,可以使得應用程序處理大量的文件任務。Aspose.Words支持Doc,Docx,RTF,HTML,OpenDocument,PDF,XP
Golang學習-第一篇 Golang的簡單介紹及Windows環境下安裝、部署
需要 簡單 電腦 pan 生成文件 多核 -- pear () 序言 這是本人博客園第一篇文章,寫的不到位之處,希望各位看客們諒解。 本人一直從事.NET的開發工作,最近在學習Golang,所以想著之前學習的過程中都沒怎麽好好的將學習過程記錄下來。深感惋惜! 現在將Gola
樸素貝葉斯分類算法介紹及python代碼實現案例
urn bus 人的 元素 1.2 -s index 代碼 步驟 樸素貝葉斯分類算法 1、樸素貝葉斯分類算法原理 1.1、概述 貝葉斯分類算法是一大類分類算法的總稱 貝葉斯分類算法以樣本可能屬於某類的概率來作為分類依據 樸素貝葉斯分類算法是貝葉斯分類算法中最簡單的一種 註:
RabbitMQ介紹及安裝部署
lan 行數 安裝部署 原理圖 tro 快的 它的 主題 通配符 本節內容: RabbitMQ介紹 RabbitMQ運行原理 RabbitMQ重要術語 三種ExchangeType RabbitMQ集群種類 集群基本概念 鏡像模式部署集群 一、RabbitMQ介紹
Storm介紹及安裝部署
節點和 yam 實時計算系統 如果 int 端口 bili usr then 本節內容: Apache Storm是什麽 Apache Storm核心概念 Storm原理架構 Storm集群安裝部署 啟動storm ui、Nimbus和Supervisor 一、Ap
Python零基礎學習系列之二--Python介紹及環境搭建
url 軟件包 三方庫 簡單的 lin 文件的 span 高級程序設計 擴展 1-1、Python簡介: Python是一種解釋型、面向對象、動態數據類型的高級程序設計語言。Python由Guido van Rossum於1989年底發明,第一個公開發行版發行於1991年
Charles界面介紹及使用方法
ade 當前 lin rewrite ip地址 鼠標滑過 列表 exp 自動 本隨筆主要內容: 一、Charles界面介紹 二、Charles使用 1.會話(Repeat、Focus、Compare、黑白名單等) 2.模擬請求做mock,使用斷點、Map或Rewrite