1. 程式人生 > 其它 >【Flume】概述及組成、入門案例、進階(事務、拓撲結構)、不同拓撲案例、自定義、資料流監控Ganglia

【Flume】概述及組成、入門案例、進階(事務、拓撲結構)、不同拓撲案例、自定義、資料流監控Ganglia

一、概述

1、定義

日誌採集、聚合、傳輸的系統,基於流式結構

即:讀取本地磁碟資料,寫入HDFS或kafka

2、架構

Agent:JVM程序,以事件形式將資料送到目的地。

Agent由三部分組成:Source、Channel、Sink

Source:接受各類日誌格式的資料,如avro、thrift、exec、jms、spooling directorynetcat、sequence generator、syslog、http、legacy

Sink:輪詢Channel事件並移除,從而寫入儲存系統/另一個Flume Agent;

   目的地包括hdfs、logger、avro、thrift、ipc、file、HBase、solr、自定義。

Channel:位於Source和Sink之間的緩衝區,允許二者不同速率,且執行緒安全,能同時處理多個寫入和讀出操作

   自帶兩種Channel:Memory Channel(不關心資料是否丟失)和File Channel(不會丟失資料)

   資料進行快取,可以存放在Memory或File中

Event:資料傳輸的基本單位,由Header和Body兩部分組成

公司採用的Source型別為:

(1)監控後臺日誌:exec
(2)監控後臺產生日誌的埠:netcat
Exec spooldir

二、Flume入門

1、安裝部署

下載、上傳、改名

2、入門案例

2.1使用Flume監聽一個埠,收集該埠資料,並列印到控制檯

安裝netcat

檢查埠是否被佔用:sudo netstat -tunlp | grep 44444

建立job資料夾並編寫配置檔案vim flume-netcat-logger.conf【配置source和sink的】

開啟flume監聽埠&執行配置檔案:bin/flume-ng agent --conf conf/ --name a1【事件名】 --conf-file job/flume-netcat-logger.conf -Dflume.root.logger=INFO【日誌級別,日誌級別包括:log、info、warn、error】,console

向本機的44444埠傳送內容:nc localhost 44444

2.2實時監控Hive日誌,並上傳到HDFS中

配置環境變數,依賴Hadoop的jar包

建立配置檔案:vim flume-file-hdfs.conf

執行flume:bin/flume-ng agent --conf conf/ --name a2 --conf-file job/flume-file-hdfs.conf

開啟Hadoop和Hive並操作Hive產生日誌:sbin/start-dfs.shsbin/start-yarn.sh

bin/hive

在hdfs上檢視檔案

2.3使用Flume監聽整個目錄的檔案,並上傳至HDFS

2.4實時監控目錄下的多個追加檔案

Exec source適用於監控一個實時追加的檔案,不能實現斷點續傳;

Spooldir Source適合用於同步新檔案,但不適合對實時追加日誌的檔案進行監聽並同步;

Taildir Source適合用於監聽多個實時追加的檔案,並且能夠實現斷點續傳。

aildir Source維護了一個json格式的position File,其會定期的往position File中更新每個檔案讀取到的最新的位置,因此能夠實現斷點續傳

Linux中儲存檔案元資料的區域就叫做inode,每個inode都有一個號碼,作業系統用inode號碼來識別不同的檔案,Unix/Linux系統內部不使用檔名,而使用inode號碼來識別檔案。

{"inode":2496272,"pos":12,"file":"/opt/module/flume/files/file1.txt"}
{"inode":2496275,"pos":12,"file":"/opt/module/flume/files/file2.txt"}

三、Flume進階

1、事務

ChannelSelector選出Event將要被髮往哪個Channel,兩種型別,分別是Replicating(複製)和Multiplexing(多路複用)。

SinkProcessor共有三種類型,分別是DefaultSinkProcessor(單個sink)、(Sink Group)LoadBalancingSinkProcessor【負載均衡】和FailoverSinkProcessor【錯誤恢復】

2、Flume拓撲結構

簡單串聯

複製和多路複用

負載均衡和故障轉移(將多個sink邏輯上分到一個sink組)

聚合方式:多對多。每臺伺服器部署一個flume採集日誌,傳送到一個集中收集日誌的flume,再由此flume上傳到hdfs、hive、hbase等,進行日誌分析

3、開發案例

3.1複製和多路複用

檔案變動,Flume-1將變動內容傳遞給Flume-2,Flume-2負責儲存到HDFS。同時Flume-1將變動內容傳遞給Flume-3,Flume-3負責輸出到Local FileSystem。

配置1個接收日誌檔案的source和兩個channel、兩個sink,分別輸送給flume-flume-hdfs和flume-flume-dir。

需要配置三個配置檔案

3.2負載均衡和故障處理

sink組中的sink分別對接Flume2和Flume3,採用FailoverSinkProcessor,實現故障轉移

Flume2 kill,觀察Flume3的控制檯列印情況

注:使用jps -ml檢視Flume程序

3.3聚合

Flume-1與Flume-2(分別監控日誌檔案和資料流)將資料傳送給hadoop104上的Flume-3,Flume-3將最終資料列印到控制檯。

分發flume:xsync flume

接收flume1與flume2傳送過來的資料流【在1和2內配置】

4、自定義Interceptor:按照日誌型別的不同,將不同種類的日誌發往不同的分析系統

以數字(單個)和字母(單個)模擬不同型別的日誌

 @Override
    public Event intercept(Event event) {

        byte[] body = event.getBody();
        if (body[0] < 'z' && body[0] > 'a') {
            event.getHeaders().put("type", "letter");
        } else if (body[0] > '0' && body[0] < '9') {
            event.getHeaders().put("type", "number");
        }
        return event;

    }

5、自定義Source:給每條資料新增字首

匯入依賴flume-ng-core

@Override
    public Status process() throws EventDeliveryException {

        try {
            //建立事件頭資訊
            HashMap<String, String> hearderMap = new HashMap<>();
            //建立事件
            SimpleEvent event = new SimpleEvent();
            //迴圈封裝事件
            for (int i = 0; i < 5; i++) {
                //給事件設定頭資訊
                event.setHeaders(hearderMap);
                //給事件設定內容
                event.setBody((field + i).getBytes());
                //將事件寫入channel
                getChannelProcessor().processEvent(event);
                Thread.sleep(delay);
            }
        } catch (Exception e) {
            e.printStackTrace();
            return Status.BACKOFF;
        }
        return Status.READY;
    }

並在配置檔案中配置

# Name the components on this agent
a1.sources = r1
a1.sinks = k1
a1.channels = c1

# Describe/configure the source
a1.sources.r1.type = com.atguigu.MySource

6、自定義sink:並在Sink端給每條資料新增字首和字尾,輸出到控制檯

@Override
    public Status process() throws EventDeliveryException {

        //宣告返回值狀態資訊
        Status status;

        //獲取當前Sink繫結的Channel
        Channel ch = getChannel();

        //獲取事務
        Transaction txn = ch.getTransaction();

        //宣告事件
        Event event;

        //開啟事務
        txn.begin();

        //讀取Channel中的事件,直到讀取到事件結束迴圈
        while (true) {
            event = ch.take();
            if (event != null) {
                break;
            }
        }
        try {
            //處理事件(列印)
            LOG.info(prefix + new String(event.getBody()) + suffix);

            //事務提交
            txn.commit();
            status = Status.READY;
        } catch (Exception e) {

            //遇到異常,事務回滾
            txn.rollback();
            status = Status.BACKOFF;
        } finally {

            //關閉事務
            txn.close();
        }
        return status;
    }

7、Flume資料流監控

Ganglia的安裝與部署

操作Flume測試監控,啟動Flume任務:

nc localhost 44444傳送資料並觀察截圖

四、問答題

Flume採集資料會丟失嗎?

不丟失,但會重複
根據Flume的架構原理,Flume是不可能丟失資料的,其內部有完善的事務機制,Source到Channel是事務性的,Channel到Sink是事務性的,因此這兩個環節不會出現資料的丟失,唯一可能丟失資料的情況是Channel採用memoryChannel,agent宕機導致資料丟失,或者Channel儲存資料已滿,導致Source不再寫入,未寫入的資料丟失。
Flume不會丟失資料,但是有可能造成資料的重複,例如資料已經成功由Sink發出,但是沒有接收到響應,Sink會再次傳送資料,此時可能會導致資料的重複。

本文來自部落格園,作者:劉金輝,轉載請註明原文連結:https://www.cnblogs.com/liujinhui/p/15536120.html