22.Flume監控、自定義元件、面試題
一、Flume監控之Ganglia
1.1 前言
Ganglia
是UC Berkeley
發起的一個開源監視專案,設計用於測量數以千計的節點。每臺計算機都執行一個收集和傳送度量資料(如處理器速度、記憶體使用量等)的名為gmond
的守護程序。它將從作業系統和指定主機中收集。接收所有度量資料的主機可以顯示這些資料並且可以將這些資料的精簡表單傳遞到層次結構中。正因為有這種層次結構模式,才使得Ganglia
可以實現良好的擴充套件。gmond
帶來的系統負載非常少,這使得它成為在叢集中各臺計算機上執行的一段程式碼,而不會影響使用者效能。
Ganglia
由gmond
、gmetad
和gweb
三部分組成。
gmond(Ganglia Monitoring Daemon)
gmond
,你可以很容易收集很多系統指標資料,如CPU
、記憶體、磁碟、網路和活躍程序的資料等。
gmetad(Ganglia Meta Daemon):整合所有資訊,並將其以RRD
格式儲存至磁碟的服務。
gweb(Ganglia Web):Ganglia
視覺化工具,gweb
是一種利用瀏覽器顯示gmetad
所儲存資料的PHP
前端。在Web
介面中以圖表方式展現叢集的執行狀態下收集的多種不同指標資料
1.2 Ganglia的安裝與部署
- 安裝
httpd
服務與php
[root@hadoop101 ~]# yum -y install httpd php
- 預設源找不到安裝包,所以要安裝
epel
源
[root@hadoop101 ~]# yum -y install epel-release
- 安裝
ganglia-gmetad
、ganglia-web
、ganglia-gmond
[root@hadoop101 ~]# yum -y install ganglia-gmetad
[root@hadoop101 ~]# yum -y install ganglia-web
[root@hadoop101 ~]# yum -y install ganglia-gmond
- 修改配置檔案
/etc/httpd/conf.d/ganglia.conf
[root@hadoop101 ~]# vi /etc/httpd/conf.d/ganglia.conf # Ganglia monitoring system php web frontend Alias /ganglia /usr/share/ganglia <Location /ganglia> #Order deny,allow #Deny from all #Allow from all #Allow from 127.0.0.1 #Allow from ::1 # Allow from.example.com Require all granted </Location>
- 修改配置檔案
/etc/ganglia/gmetad.conf
[root@hadoop101 ~]# vim /etc/ganglia/gmetad.conf
修改為:data_source "hadoop101" 192.168.182.101
- 修改配置檔案
/etc/ganglia/gmond.conf
cluster {
name = "hadoop101"
owner = "unspecified"
latlong = "unspecified"
url = "unspecified"
}
udp_send_channel {
#bind_hostname = yes # Highly recommended, soon to be default.
# This option tells gmond to use a source address
# that resolves to the machine's hostname. Without
# this, the metrics may appear to come from any
# interface and the DNS names associated with
# those IPs will be used to create the RRDs.
# mcast_join = 239.2.11.71
host = 192.168.182.101
port = 8649
ttl = 1
}
udp_recv_channel {
# mcast_join = 239.2.11.71
port = 8649
bind = 192.168.182.101
retry_bind = true
# Size of the UDP buffer. If you are handling lots of metrics you really
# should bump it up to e.g. 10MB or even higher.
# buffer = 10485760
}
- 修改配置檔案
/etc/selinux/config
[root@hadoop101 ~]# vim /etc/selinux/config
修改:SELINUX=disabled
後重啟
- 啟動
ganglia
[root@hadoop101 ~]# systemctl start httpd
[root@hadoop101 ~]# systemctl start gmetad
[root@hadoop101 ~]# systemctl start gmond
- 開啟網頁瀏覽
ganglia
頁面:http://192.168.182.101/ganglia
注:如果完成以上操作依然出現許可權不足錯誤,請修改/var/lib/ganglia
目錄的許可權:
[root@hadoop101 ~]# udo chmod -R 777 /var/lib/ganglia
1.3 操作Flume測試監控
- 修改
/opt/module/flume/conf
目錄下的flume-env.sh
配置:
JAVA_OPTS="-Dflume.monitoring.type=ganglia
-Dflume.monitoring.hosts=192.168.182.101:8649
-Xms100m
-Xmx200m"
- 啟動
Flume
任務
[root@hadoop100 flume]$ bin/flume-ng agent \
--conf conf/ \
--name a1 \
--conf-file job/flume-netcat-logger.conf \
-Dflume.root.logger==INFO,console \
-Dflume.monitoring.type=ganglia \
-Dflume.monitoring.hosts=192.168.1.101:8649
- 傳送資料觀察
ganglia
監測圖
[root@hadoop100 flume]$ nc localhost 44444
圖例說明:
欄位(圖表名稱) | 欄位含義 |
---|---|
EventPutAttemptCount | source嘗試寫入channel的事件總數量 |
EventPutSuccessCount | 成功寫入channel且提交的事件總數量 |
EventTakeAttemptCount | sink嘗試從channel拉取事件的總數量。這不意味著每次事件都被返回,因為sink拉取的時候channel可能沒有任何資料。 |
EventTakeSuccessCount | sink成功讀取的事件的總數量 |
StartTime | channel啟動的時間(毫秒) |
StopTime | channel停止的時間(毫秒) |
ChannelSize | 目前channel中事件的總數量 |
ChannelFillPercentage | channel佔用百分比 |
ChannelCapacity | channel的容量 |
二、自定義Source
2.1 介紹
Source
是負責接收資料到Flume Agent
的元件。Source
元件可以處理各種型別、各種格式的日誌資料,包括avro
、thrift
、exec
、jms
、spooling directory
、netcat
、sequence generator
、syslog
、http
、legacy
。
官方提供的source
型別已經很多,但是有時候並不能滿足實際開發當中的需求,此時我們就需要根據實際需求自定義某些source
。官方也提供了自定義source
需要繼承AbstractSource
類並實現Configurable
和PollableSource
介面。
2.2 編碼
需求: 使用flume
接收資料,並給每條資料新增字尾,輸出到控制檯。字首可從flume
配置檔案中配置。
引入依賴:
<dependency>
<groupId>org.apache.flume</groupId>
<artifactId>flume-ng-core</artifactId>
<version>1.7.0</version>
</dependency>
編碼:
public class MySource extends AbstractSource implements Configurable,
PollableSource {
/**
* 定義需要從配置中讀取的欄位
*/
//兩條資料之間的間隔
private Long delay;
private String field;
/**
* 接收資料,將資料封裝成一個個的Event,寫入Channel。
*/
@Override
public Status process() throws EventDeliveryException {
try {
Map<String, String> header = new HashMap<>();
SimpleEvent event = new SimpleEvent();
for (int i = 0; i < 5; i++) {
event.setHeaders(header);
event.setBody((field + i).getBytes());
getChannelProcessor().processEvent(event);
Thread.sleep(delay);
}
} catch (Exception e) {
return Status.BACKOFF;
}
return Status.READY;
}
@Override
public long getBackOffSleepIncrement() {
return 0;
}
@Override
public long getMaxBackOffSleepInterval() {
return 0;
}
/**
*讀取配置檔案(xx.conf)中的配置資訊
*/
@Override
public void configure(Context context) {
delay = context.getLong("delay", 2000l);
field = context.getString("field", "Custom Source");
}
}
2.3 測試
打包: 將寫好的程式碼打包,並放到flume
的lib
目錄下。
配置檔案
# Name the components on this agent
a1.sources = r1
a1.sinks = k1
a1.channels = c1
# Describe/configure the source
a1.sources.r1.type = com.hucheng.flume.MySource
a1.sources.r1.delay = 1000
#a1.sources.r1.field = hello
# Describe the sink
a1.sinks.k1.type = logger
# Use a channel which buffers events in memory
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
# Bind the source and sink to the channel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
開啟任務:
[root@hadoop100 flume]$ bin/flume-ng agent -c conf/ -f job/mysource.conf
-n a1 -Dflume.root.logger=INFO,console
三、自定義Sink
2.1 介紹
Sink
不斷地輪詢Channel
中的事件且批量地移除它們,並將這些事件批量寫入到儲存或索引系統、或者被髮送到另一個Flume Agent
。
Sink
是完全事務性的。在從Channel
批量刪除資料之前,每個Sink
用Channel
啟動一個事務。批量事件一旦成功寫出到儲存系統或下一個Flume Agent
,Sink
就利用Channel
提交事務。事務一旦被提交,該Channel
從自己的內部緩衝區刪除事件。
Sink
元件目的地包括hdfs
、logger
、avro
、thrift
、ipc
、file
、null
、HBase
、solr
、自定義。官方提供的Sink
型別已經很多,但是有時候並不能滿足實際開發當中的需求,此時我們就需要根據實際需求自定義某些Sink
,官方也提供了自定義Sink
的介面需要繼承AbstractSink
類並實現Configurable
介面。
2.2 編碼
需求: 使用flume
接收資料,並給每條資料新增字尾,輸出到控制檯。字首可從flume
配置檔案中配置。
public class MySink extends AbstractSink implements Configurable {
//建立Logger物件
private static final Logger LOG =
LoggerFactory.getLogger(AbstractSink.class);
private String prefix;
private String suffix;
@Override
public Status process() throws EventDeliveryException {
//宣告返回值狀態資訊
Status status;
//獲取當前Sink繫結的Channel
Channel channel = getChannel();
//獲取事務
Transaction transaction = channel.getTransaction();
//宣告事件
Event event;
//開啟事務
transaction.begin();
//讀取Channel中的事件,直到讀取到事件結束迴圈
while (true) {
event = channel.take();
if (event != null) {
break;
}
}
try {
//處理事件(列印)
LOG.info(prefix + new String(event.getBody()) + suffix);
//事務提交
transaction.commit();
status = Status.READY;
} catch (Exception e) {
//遇到異常,事務回滾
transaction.rollback();
status = Status.BACKOFF;
} finally {
transaction.close();
}
return status;
}
@Override
public void configure(Context context) {
//讀取配置檔案內容,有預設值
prefix = context.getString("prefix", "hello:");
//讀取配置檔案內容,無預設值
suffix = context.getString("suffix");
}
}
2.3 測試
打包: 將寫好的程式碼打包,並放到flume
的lib
目錄下。
配置檔案
# Name the components on this agent
a1.sources = r1
a1.sinks = k1
a1.channels = c1
# Describe/configure the source
a1.sources.r1.type = netcat
a1.sources.r1.bind = localhost
a1.sources.r1.port = 44444
# Describe the sink
a1.sinks.k1.type = com.hucheng.flume.MySink
#a1.sinks.k1.prefix = hello:
a1.sinks.k1.suffix = :hello
# Use a channel which buffers events in memory
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
# Bind the source and sink to the channel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
開啟任務:
[root@hadoop100 flume]# bin/flume-ng agent -c conf/ -f job/mysink.conf -n a1
-Dflume.root.logger=INFO,console
[root@hadoop100 ~]# nc localhost 44444
hello
四、企業真實面試題
4.1 你是如何實現Flume資料傳輸的監控的
使用第三方框架Ganglia
實時監控Flume
。
4.2 Flume的Source,Sink,Channel的作用?你們Source是什麼型別?
①作用
Source
元件是專門用來收集資料的,可以處理各種型別、各種格式的日誌資料,包括avro
、thrift
、exec
、jms
、spooling directory
、netcat
、sequence generator
、syslog
、http
、legacy
Channel
元件對採集到的資料進行快取,可以存放在Memory
或File
中。Sink
元件是用於把資料傳送到目的地的元件,目的地包括Hdfs
、Logger
、avro
、thrift
、ipc
、file
、Hbase
、solr
、自定義。
②我公司採用的Source
型別為:
- 監控後臺日誌:
exec
- 監控後臺產生日誌的埠:
netcat
、Exec
、spooldir
4.3 Flume的Channel Selectors
Channel Selectors
可以讓不同的專案日誌通過不同的ChanneI
到不同的Sink
中去。Channel Selectors
有兩種型別:Replicating Channel Selector (default)
和Multiplex ing Channel Selector
。
這兩種Selector
的區別是:Replicating
會將source
過來的events
發往所有channel
,而Multiplexing
可以選擇該發往哪些Channel
。
4.4 Flume引數調優
①Source
增加Source
個數(使用Tair Dir Source
時可增加FileGroups
個數)可以增大Source
的讀取資料的能力。例如:當某一個目錄產生的檔案過多時需要將這個檔案目錄拆分成多個檔案目錄,同時配置好多個Source
以保證Source
有足夠的能力獲取到新產生的資料。
batchSize
引數決定Source
一次批量運輸到Channel
的event
條數,適當調大這個引數可以提高Source
搬運Event
到Channel
時的效能。
②Channel
type
選擇memory
時Channel
的效能最好,但是如果Flume
程序意外掛掉可能會丟失資料。type
選擇file
時Channel
的容錯性更好,但是效能上會比memory channel
差。
使用file Channel
時dataDirs
配置多個不同盤下的目錄可以提高效能。
Capacity
引數決定Channel
可容納最大的event
條數。transactionCapacity
引數決定每次Source
往channel
裡面寫的最大event
條數和每次Sink
從channel
裡面讀的最大event
條數。transactionCapacity
需要大於Source和Sink
的batchSize
引數。
③Sink
增加Sink
的個數可以增加Sink
消費event
的能力。Sink
也不是越多越好夠用就行,過多的Sink
會佔用系統資源,造成系統資源不必要的浪費。
batchSize
引數決定Sink
一次批量從Channel
讀取的event
條數,適當調大這個引數可以提高Sink
從Channel
搬出event
的效能。
4.5 Flume的事務機制
Flume
的事務機制(類似資料庫的事務機制):Flume
使用兩個獨立的事務分別負責從Soucrce
到Channel
,以及從Channel
到Sink
的事件傳遞。比如spooling directory source
為檔案的每一行建立一個事件,一旦事務中所有的事件全部傳遞到Channel
且提交成功,那麼Soucrce
就將該檔案標記為完成。同理,事務以類似的方式處理從Channel
到Sink
的傳遞過程,如果因為某種原因使得事件無法記錄,那麼事務將會回滾。且所有的事件都會保持到Channel
中,等待重新傳遞。
4.6 Flume採集資料會丟失嗎?
不會,Channel
儲存可以儲存在File
中,資料傳輸自身有事務。