【Flume】概述及組成、入門案例、進階(事務、拓撲結構)、不同拓撲案例、自定義、資料流監控Ganglia
一、概述
1、定義
日誌採集、聚合、傳輸的系統,基於流式結構
即:讀取本地磁碟資料,寫入HDFS或kafka
2、架構
Agent:JVM程序,以事件形式將資料送到目的地。
Agent由三部分組成:Source、Channel、Sink
Source:接受各類日誌格式的資料,如avro、thrift、exec、jms、spooling directory、netcat、sequence generator、syslog、http、legacy
Sink:輪詢Channel事件並移除,從而寫入儲存系統/另一個Flume Agent;
目的地包括hdfs、logger、avro、thrift、ipc、file、HBase、solr、自定義。
Channel:位於Source和Sink之間的緩衝區,允許二者不同速率,且執行緒安全,能同時處理多個寫入和讀出操作
自帶兩種Channel:Memory Channel(不關心資料是否丟失)和File Channel(不會丟失資料)
資料進行快取,可以存放在Memory或File中
Event:資料傳輸的基本單位,由Header和Body兩部分組成
公司採用的Source型別為:
(1)監控後臺日誌:exec
(2)監控後臺產生日誌的埠:netcat
Exec spooldir
二、Flume入門
1、安裝部署
下載、上傳、改名
2、入門案例
2.1使用Flume監聽一個埠,收集該埠資料,並列印到控制檯
安裝netcat
檢查埠是否被佔用:sudo netstat -tunlp | grep 44444
建立job資料夾並編寫配置檔案vim flume-netcat-logger.conf【配置source和sink的】
開啟flume監聽埠&執行配置檔案:bin/flume-ng agent --conf conf/ --name a1【事件名】 --conf-file job/flume-netcat-logger.conf -Dflume.root.logger=INFO【日誌級別,日誌級別包括:log、info、warn、error】,console
向本機的44444埠傳送內容:nc localhost 44444
2.2實時監控Hive日誌,並上傳到HDFS中
配置環境變數,依賴Hadoop的jar包
建立配置檔案:vim flume-file-hdfs.conf
執行flume:bin/flume-ng agent --conf conf/ --name a2 --conf-file job/flume-file-hdfs.conf
開啟Hadoop和Hive並操作Hive產生日誌:sbin/start-dfs.shsbin/start-yarn.sh
bin/hive
在hdfs上檢視檔案
2.3使用Flume監聽整個目錄的檔案,並上傳至HDFS
2.4實時監控目錄下的多個追加檔案
Exec source適用於監控一個實時追加的檔案,不能實現斷點續傳;
Spooldir Source適合用於同步新檔案,但不適合對實時追加日誌的檔案進行監聽並同步;
Taildir Source適合用於監聽多個實時追加的檔案,並且能夠實現斷點續傳。
aildir Source維護了一個json格式的position File,其會定期的往position File中更新每個檔案讀取到的最新的位置,因此能夠實現斷點續傳
Linux中儲存檔案元資料的區域就叫做inode,每個inode都有一個號碼,作業系統用inode號碼來識別不同的檔案,Unix/Linux系統內部不使用檔名,而使用inode號碼來識別檔案。
{"inode":2496272,"pos":12,"file":"/opt/module/flume/files/file1.txt"}
{"inode":2496275,"pos":12,"file":"/opt/module/flume/files/file2.txt"}
三、Flume進階
1、事務
ChannelSelector選出Event將要被髮往哪個Channel,兩種型別,分別是Replicating(複製)和Multiplexing(多路複用)。
SinkProcessor共有三種類型,分別是DefaultSinkProcessor(單個sink)、(Sink Group)LoadBalancingSinkProcessor【負載均衡】和FailoverSinkProcessor【錯誤恢復】
2、Flume拓撲結構
簡單串聯
複製和多路複用
負載均衡和故障轉移(將多個sink邏輯上分到一個sink組)
聚合方式:多對多。每臺伺服器部署一個flume採集日誌,傳送到一個集中收集日誌的flume,再由此flume上傳到hdfs、hive、hbase等,進行日誌分析
3、開發案例
3.1複製和多路複用
檔案變動,Flume-1將變動內容傳遞給Flume-2,Flume-2負責儲存到HDFS。同時Flume-1將變動內容傳遞給Flume-3,Flume-3負責輸出到Local FileSystem。
配置1個接收日誌檔案的source和兩個channel、兩個sink,分別輸送給flume-flume-hdfs和flume-flume-dir。
需要配置三個配置檔案
3.2負載均衡和故障處理
sink組中的sink分別對接Flume2和Flume3,採用FailoverSinkProcessor,實現故障轉移
Flume2 kill,觀察Flume3的控制檯列印情況
注:使用jps -ml檢視Flume程序
3.3聚合
Flume-1與Flume-2(分別監控日誌檔案和資料流)將資料傳送給hadoop104上的Flume-3,Flume-3將最終資料列印到控制檯。
分發flume:xsync flume
接收flume1與flume2傳送過來的資料流【在1和2內配置】
4、自定義Interceptor:按照日誌型別的不同,將不同種類的日誌發往不同的分析系統
以數字(單個)和字母(單個)模擬不同型別的日誌
@Override public Event intercept(Event event) { byte[] body = event.getBody(); if (body[0] < 'z' && body[0] > 'a') { event.getHeaders().put("type", "letter"); } else if (body[0] > '0' && body[0] < '9') { event.getHeaders().put("type", "number"); } return event; }
5、自定義Source:給每條資料新增字首
匯入依賴flume-ng-core
@Override public Status process() throws EventDeliveryException { try { //建立事件頭資訊 HashMap<String, String> hearderMap = new HashMap<>(); //建立事件 SimpleEvent event = new SimpleEvent(); //迴圈封裝事件 for (int i = 0; i < 5; i++) { //給事件設定頭資訊 event.setHeaders(hearderMap); //給事件設定內容 event.setBody((field + i).getBytes()); //將事件寫入channel getChannelProcessor().processEvent(event); Thread.sleep(delay); } } catch (Exception e) { e.printStackTrace(); return Status.BACKOFF; } return Status.READY; }
並在配置檔案中配置
# Name the components on this agent a1.sources = r1 a1.sinks = k1 a1.channels = c1 # Describe/configure the source a1.sources.r1.type = com.atguigu.MySource
6、自定義sink:並在Sink端給每條資料新增字首和字尾,輸出到控制檯
@Override public Status process() throws EventDeliveryException { //宣告返回值狀態資訊 Status status; //獲取當前Sink繫結的Channel Channel ch = getChannel(); //獲取事務 Transaction txn = ch.getTransaction(); //宣告事件 Event event; //開啟事務 txn.begin(); //讀取Channel中的事件,直到讀取到事件結束迴圈 while (true) { event = ch.take(); if (event != null) { break; } } try { //處理事件(列印) LOG.info(prefix + new String(event.getBody()) + suffix); //事務提交 txn.commit(); status = Status.READY; } catch (Exception e) { //遇到異常,事務回滾 txn.rollback(); status = Status.BACKOFF; } finally { //關閉事務 txn.close(); } return status; }
7、Flume資料流監控
Ganglia的安裝與部署
操作Flume測試監控,啟動Flume任務:
nc localhost 44444傳送資料並觀察截圖
四、問答題
Flume採集資料會丟失嗎?
不丟失,但會重複
根據Flume的架構原理,Flume是不可能丟失資料的,其內部有完善的事務機制,Source到Channel是事務性的,Channel到Sink是事務性的,因此這兩個環節不會出現資料的丟失,唯一可能丟失資料的情況是Channel採用memoryChannel,agent宕機導致資料丟失,或者Channel儲存資料已滿,導致Source不再寫入,未寫入的資料丟失。
Flume不會丟失資料,但是有可能造成資料的重複,例如資料已經成功由Sink發出,但是沒有接收到響應,Sink會再次傳送資料,此時可能會導致資料的重複。
本文來自部落格園,作者:劉金輝,轉載請註明原文連結:https://www.cnblogs.com/liujinhui/p/15536120.html