1. 程式人生 > 實用技巧 >22.Flume監控、自定義元件、面試題

22.Flume監控、自定義元件、面試題

一、Flume監控之Ganglia

1.1 前言

GangliaUC Berkeley發起的一個開源監視專案,設計用於測量數以千計的節點。每臺計算機都執行一個收集和傳送度量資料(如處理器速度、記憶體使用量等)的名為gmond的守護程序。它將從作業系統和指定主機中收集。接收所有度量資料的主機可以顯示這些資料並且可以將這些資料的精簡表單傳遞到層次結構中。正因為有這種層次結構模式,才使得Ganglia可以實現良好的擴充套件。gmond帶來的系統負載非常少,這使得它成為在叢集中各臺計算機上執行的一段程式碼,而不會影響使用者效能。

Gangliagmondgmetadgweb三部分組成。

gmond(Ganglia Monitoring Daemon)

:一種輕量級服務,安裝在每臺需要收集指標資料的節點主機上。使用gmond,你可以很容易收集很多系統指標資料,如CPU、記憶體、磁碟、網路和活躍程序的資料等。

gmetad(Ganglia Meta Daemon):整合所有資訊,並將其以RRD格式儲存至磁碟的服務。

gweb(Ganglia Web)Ganglia視覺化工具,gweb是一種利用瀏覽器顯示gmetad所儲存資料的PHP前端。在Web介面中以圖表方式展現叢集的執行狀態下收集的多種不同指標資料

1.2 Ganglia的安裝與部署

  1. 安裝httpd服務與php
[root@hadoop101 ~]# yum -y install httpd php
  1. 預設源找不到安裝包,所以要安裝epel
[root@hadoop101 ~]# yum -y install epel-release
  1. 安裝ganglia-gmetadganglia-webganglia-gmond
[root@hadoop101 ~]# yum -y install ganglia-gmetad
[root@hadoop101 ~]# yum -y install ganglia-web
[root@hadoop101 ~]# yum -y install ganglia-gmond
  1. 修改配置檔案/etc/httpd/conf.d/ganglia.conf
[root@hadoop101 ~]# vi /etc/httpd/conf.d/ganglia.conf
# Ganglia monitoring system php web frontend
Alias /ganglia /usr/share/ganglia
<Location /ganglia>
  #Order deny,allow
  #Deny from all
  #Allow from all
  #Allow from 127.0.0.1
  #Allow from ::1
  # Allow from.example.com
  Require all granted
</Location>
  1. 修改配置檔案/etc/ganglia/gmetad.conf
[root@hadoop101 ~]# vim /etc/ganglia/gmetad.conf

修改為:data_source "hadoop101" 192.168.182.101

  1. 修改配置檔案/etc/ganglia/gmond.conf
cluster {
  name = "hadoop101"
  owner = "unspecified"
  latlong = "unspecified"
  url = "unspecified"
}
udp_send_channel {
  #bind_hostname = yes # Highly recommended, soon to be default.
                       # This option tells gmond to use a source address
                       # that resolves to the machine's hostname.  Without
                       # this, the metrics may appear to come from any
                       # interface and the DNS names associated with
                       # those IPs will be used to create the RRDs.
  # mcast_join = 239.2.11.71
  host = 192.168.182.101
  port = 8649
  ttl = 1
}
udp_recv_channel {
  # mcast_join = 239.2.11.71
  port = 8649
  bind = 192.168.182.101
  retry_bind = true
  # Size of the UDP buffer. If you are handling lots of metrics you really
  # should bump it up to e.g. 10MB or even higher.
  # buffer = 10485760
}

  1. 修改配置檔案/etc/selinux/config
[root@hadoop101 ~]# vim /etc/selinux/config

修改:SELINUX=disabled後重啟

  1. 啟動ganglia
[root@hadoop101 ~]# systemctl start httpd
[root@hadoop101 ~]# systemctl start gmetad
[root@hadoop101 ~]# systemctl start gmond
  1. 開啟網頁瀏覽ganglia頁面:http://192.168.182.101/ganglia
    注:如果完成以上操作依然出現許可權不足錯誤,請修改/var/lib/ganglia目錄的許可權:
[root@hadoop101 ~]# udo chmod -R 777 /var/lib/ganglia

1.3 操作Flume測試監控

  1. 修改/opt/module/flume/conf目錄下的flume-env.sh配置:
JAVA_OPTS="-Dflume.monitoring.type=ganglia
-Dflume.monitoring.hosts=192.168.182.101:8649
-Xms100m
-Xmx200m"
  1. 啟動Flume任務
[root@hadoop100 flume]$ bin/flume-ng agent \
--conf conf/ \
--name a1 \
--conf-file job/flume-netcat-logger.conf \
-Dflume.root.logger==INFO,console \
-Dflume.monitoring.type=ganglia \
-Dflume.monitoring.hosts=192.168.1.101:8649
  1. 傳送資料觀察ganglia監測圖
[root@hadoop100 flume]$ nc localhost 44444



圖例說明:

欄位(圖表名稱) 欄位含義
EventPutAttemptCount source嘗試寫入channel的事件總數量
EventPutSuccessCount 成功寫入channel且提交的事件總數量
EventTakeAttemptCount sink嘗試從channel拉取事件的總數量。這不意味著每次事件都被返回,因為sink拉取的時候channel可能沒有任何資料。
EventTakeSuccessCount sink成功讀取的事件的總數量
StartTime channel啟動的時間(毫秒)
StopTime channel停止的時間(毫秒)
ChannelSize 目前channel中事件的總數量
ChannelFillPercentage channel佔用百分比
ChannelCapacity channel的容量

二、自定義Source

2.1 介紹

Source是負責接收資料到Flume Agent的元件。Source元件可以處理各種型別、各種格式的日誌資料,包括avrothriftexecjmsspooling directorynetcatsequence generatorsysloghttplegacy

官方提供的source型別已經很多,但是有時候並不能滿足實際開發當中的需求,此時我們就需要根據實際需求自定義某些source。官方也提供了自定義source需要繼承AbstractSource類並實現ConfigurablePollableSource介面。

2.2 編碼

需求: 使用flume接收資料,並給每條資料新增字尾,輸出到控制檯。字首可從flume配置檔案中配置。

引入依賴:

<dependency>
	<groupId>org.apache.flume</groupId>
	<artifactId>flume-ng-core</artifactId>
	<version>1.7.0</version>
</dependency>

編碼:

public class MySource extends AbstractSource implements Configurable, 
	PollableSource {
    /**
     * 定義需要從配置中讀取的欄位
     */
    //兩條資料之間的間隔
    private Long delay;
    private String field;
	
	/**
	* 接收資料,將資料封裝成一個個的Event,寫入Channel。
	*/
    @Override
    public Status process() throws EventDeliveryException {
        try {
            Map<String, String> header = new HashMap<>();
            SimpleEvent event = new SimpleEvent();
            for (int i = 0; i < 5; i++) {
                event.setHeaders(header);
                event.setBody((field + i).getBytes());
                getChannelProcessor().processEvent(event);
                Thread.sleep(delay);
            }
        } catch (Exception e) {
            return Status.BACKOFF;
        }

        return Status.READY;
    }

    @Override
    public long getBackOffSleepIncrement() {
        return 0;
    }

    @Override
    public long getMaxBackOffSleepInterval() {
        return 0;
    }
    
	/**
	*讀取配置檔案(xx.conf)中的配置資訊
	*/
    @Override
    public void configure(Context context) {
        delay = context.getLong("delay", 2000l);
        field = context.getString("field", "Custom Source");
    }
}

2.3 測試

打包: 將寫好的程式碼打包,並放到flumelib目錄下。

配置檔案

# Name the components on this agent
a1.sources = r1
a1.sinks = k1
a1.channels = c1

# Describe/configure the source
a1.sources.r1.type = com.hucheng.flume.MySource
a1.sources.r1.delay = 1000
#a1.sources.r1.field = hello

# Describe the sink
a1.sinks.k1.type = logger

# Use a channel which buffers events in memory
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100

# Bind the source and sink to the channel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1

開啟任務:

[root@hadoop100 flume]$ bin/flume-ng agent -c conf/ -f job/mysource.conf
 -n a1 -Dflume.root.logger=INFO,console

三、自定義Sink

2.1 介紹

Sink不斷地輪詢Channel中的事件且批量地移除它們,並將這些事件批量寫入到儲存或索引系統、或者被髮送到另一個Flume Agent

Sink是完全事務性的。在從Channel批量刪除資料之前,每個SinkChannel啟動一個事務。批量事件一旦成功寫出到儲存系統或下一個Flume AgentSink就利用Channel提交事務。事務一旦被提交,該Channel從自己的內部緩衝區刪除事件。

Sink元件目的地包括hdfsloggeravrothriftipcfilenullHBasesolr、自定義。官方提供的Sink型別已經很多,但是有時候並不能滿足實際開發當中的需求,此時我們就需要根據實際需求自定義某些Sink,官方也提供了自定義Sink的介面需要繼承AbstractSink類並實現Configurable介面。

2.2 編碼

需求: 使用flume接收資料,並給每條資料新增字尾,輸出到控制檯。字首可從flume配置檔案中配置。

public class MySink extends AbstractSink implements Configurable {

    //建立Logger物件
    private static final Logger LOG = 
            LoggerFactory.getLogger(AbstractSink.class);

    private String prefix;
    private String suffix;

    @Override
    public Status process() throws EventDeliveryException {
        //宣告返回值狀態資訊
        Status status;

        //獲取當前Sink繫結的Channel
        Channel channel = getChannel();
        //獲取事務
        Transaction transaction = channel.getTransaction();
        //宣告事件
        Event event;
        //開啟事務
        transaction.begin();

        //讀取Channel中的事件,直到讀取到事件結束迴圈
        while (true) {
            event = channel.take();
            if (event != null) {
                break;
            }
        }
        try {
            //處理事件(列印)
            LOG.info(prefix + new String(event.getBody()) + suffix);
            //事務提交
            transaction.commit();
            status = Status.READY;
        } catch (Exception e) {
            //遇到異常,事務回滾
            transaction.rollback();
            status = Status.BACKOFF;
        } finally {
            transaction.close();
        }
        return status;
    }

    @Override
    public void configure(Context context) {
        //讀取配置檔案內容,有預設值
        prefix = context.getString("prefix", "hello:");

        //讀取配置檔案內容,無預設值
        suffix = context.getString("suffix");
    }
}

2.3 測試

打包: 將寫好的程式碼打包,並放到flumelib目錄下。

配置檔案

# Name the components on this agent
a1.sources = r1
a1.sinks = k1
a1.channels = c1

# Describe/configure the source
a1.sources.r1.type = netcat
a1.sources.r1.bind = localhost
a1.sources.r1.port = 44444

# Describe the sink
a1.sinks.k1.type = com.hucheng.flume.MySink
#a1.sinks.k1.prefix = hello:
a1.sinks.k1.suffix = :hello

# Use a channel which buffers events in memory
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100

# Bind the source and sink to the channel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1

開啟任務:

[root@hadoop100 flume]# bin/flume-ng agent -c conf/ -f job/mysink.conf -n a1
 -Dflume.root.logger=INFO,console
[root@hadoop100 ~]# nc localhost 44444
hello

四、企業真實面試題

4.1 你是如何實現Flume資料傳輸的監控的

使用第三方框架Ganglia實時監控Flume

4.2 Flume的Source,Sink,Channel的作用?你們Source是什麼型別?

①作用

  1. Source元件是專門用來收集資料的,可以處理各種型別、各種格式的日誌資料,包括avrothriftexecjmsspooling directorynetcatsequence generatorsysloghttplegacy
  2. Channel元件對採集到的資料進行快取,可以存放在MemoryFile中。
  3. Sink元件是用於把資料傳送到目的地的元件,目的地包括HdfsLoggeravrothriftipcfileHbasesolr、自定義。

②我公司採用的Source型別為:

  1. 監控後臺日誌:exec
  2. 監控後臺產生日誌的埠:netcatExecspooldir

4.3 Flume的Channel Selectors


Channel Selectors可以讓不同的專案日誌通過不同的ChanneI到不同的Sink中去。Channel Selectors有兩種型別:Replicating Channel Selector (default)Multiplex ing Channel Selector

這兩種Selector的區別是:Replicating會將source過來的events發往所有channel,而Multiplexing可以選擇該發往哪些Channel

4.4 Flume引數調優

①Source

增加Source個數(使用Tair Dir Source時可增加FileGroups個數)可以增大Source的讀取資料的能力。例如:當某一個目錄產生的檔案過多時需要將這個檔案目錄拆分成多個檔案目錄,同時配置好多個Source以保證Source有足夠的能力獲取到新產生的資料。

batchSize引數決定Source一次批量運輸到Channelevent條數,適當調大這個引數可以提高Source搬運EventChannel時的效能。

②Channel

type選擇memoryChannel的效能最好,但是如果Flume程序意外掛掉可能會丟失資料。type選擇fileChannel的容錯性更好,但是效能上會比memory channel差。

使用file ChanneldataDirs配置多個不同盤下的目錄可以提高效能。

Capacity引數決定Channel可容納最大的event條數。transactionCapacity引數決定每次Sourcechannel裡面寫的最大event條數和每次Sinkchannel裡面讀的最大event條數。transactionCapacity需要大於Source和SinkbatchSize引數。

③Sink

增加Sink的個數可以增加Sink消費event的能力。Sink也不是越多越好夠用就行,過多的Sink會佔用系統資源,造成系統資源不必要的浪費。

batchSize引數決定Sink一次批量從Channel讀取的event條數,適當調大這個引數可以提高SinkChannel搬出event的效能。

4.5 Flume的事務機制

Flume的事務機制(類似資料庫的事務機制):Flume使用兩個獨立的事務分別負責從SoucrceChannel,以及從ChannelSink的事件傳遞。比如spooling directory source為檔案的每一行建立一個事件,一旦事務中所有的事件全部傳遞到Channel且提交成功,那麼Soucrce就將該檔案標記為完成。同理,事務以類似的方式處理從ChannelSink的傳遞過程,如果因為某種原因使得事件無法記錄,那麼事務將會回滾。且所有的事件都會保持到Channel中,等待重新傳遞。

4.6 Flume採集資料會丟失嗎?

不會,Channel儲存可以儲存在File中,資料傳輸自身有事務。