1. 程式人生 > 程式設計 >分散式日誌收集框架 Flume

分散式日誌收集框架 Flume

1 需求分析

WebServer/ApplicationServer分散在各個機器上,然而我們依舊想在Hadoop平臺上進行統計分析,如何將日誌收集到Hadoop平臺呢?

  • 簡單的這樣嗎?
shell cp hadoop叢集的機器上;
hadoop fs -put ... /複製程式碼

顯然該法面臨著容錯、負載均衡、高延遲、資料壓縮等一系列問題這顯然已經無法滿足需求了!

不如問問神奇的Flume呢???

只需要配置檔案,輕鬆解決以上問題!

2 Flume概述

2.1 官網

  • Flume是一種分散式,可靠且可用的服務,用於有效地收集,聚合和移動大量日誌資料。
  • 它具有基於流式資料流的簡單靈活的架構。
  • 它具有可靠的可靠性機制和許多故障轉移和恢復機制,具有強大的容錯性。
  • 它使用簡單的可擴充套件資料模型,允許線上分析應用程式。

2.2 設計目標

  • 可靠性
    當節點出現故障時,日誌能夠被傳送到其他節點上而不會丟失。Flume提供了三種級別的可靠性保障,從強到弱依次分別為:end-to-end(收到資料agent首先將event寫到磁碟上,當資料傳送成功後,再刪除;如果資料傳送失敗,可以重新傳送。),Store on failure(這也是scribe採用的策略,當資料接收方crash時,將資料寫到本地,待恢復後,繼續傳送),Best effort(資料傳送到接收方後,不會進行確認)。
  • 擴充套件性
    Flume採用了三層架構,分別為agent,collector和storage,每一層均可以水平擴充套件。
    其中,所有agent和collector由master統一管理,這使得系統容易監控和維護,且master允許有多個(使用ZooKeeper進行管理和負載均衡),這就避免了單點故障問題。
  • 管理性
    所有agent和colletor由master統一管理,這使得系統便於維護。多master情況,Flume利用ZooKeeper和gossip,保證動態配置資料的一致性。使用者可以在master上檢視各個資料來源或者資料流執行情況,且可以對各個資料來源配置和動態載入。Flume提供了web 和shell script command兩種形式對資料流進行管理。
  • 功能可擴充套件性
    使用者可以根據需要新增自己的agent,collector或者storage。此外,Flume自帶了很多元件,包括各種agent(file, syslog等),collector和storage(file,HDFS等)。

2.3 主流競品對比

其他的還有比如:

  • Logstash: ELK(ElasticsSearch,Logstash,Kibana)
  • Chukwa: Yahoo/Apache,使用Java語言開發,負載均衡不是很好,已經不維護了。
  • Fluentd: 和Flume類似,Ruby開發。

2.4 發展史

  • Cloudera公司提出0.9.2,叫Flume-OG
  • 2011年Flume-728編號,重要里程碑(Flume-NG),貢獻給Apache社群
  • 2012年7月 1.0版本
  • 2015年5月 1.6版本
  • ~ 1.9版本

3 核心架構及其元件

3.1 core架構

在這裡插入圖片描述

3.2 核心的元件

順便來看看官方檔案

3.2.1 Source - 收集

指定資料來源(Avro,Thrift,Spooling,Kafka,Exec)

3.2.2 Channel - 聚集

把資料暫存(Memory,File,Kafka等用的比較多)

3.2.3 Sink - 輸出

把資料寫至某處(HDFS,Hive,Logger,Avro,ES,HBase,Kafka等)

multi-agent flow

為了跨多個代理或跳資料流,先前代理的接收器和當前跳的源需要是avro型別,接收器指向源的主機名(或IP地址)和埠。

Consolidation合併

日誌收集中非常常見的情況是大量日誌生成客戶端將資料傳送到連線到儲存子系統的少數消費者代理。 例如,從數百個Web伺服器收集的日誌傳送給寫入HDFS叢集的十幾個代理。這可以通過使用avro接收器配置多個第一層代理在Flume中實現,所有這些代理都指向單個代理的avro源(同樣,您可以在這種情況下使用thrift源/接收器/客戶端)。 第二層代理上的此源將接收的事件合併到單個通道中,該通道由信宿器消耗到其最終目的地。

Multiplexing the flow

Flume支援將事件流多路複用到一個或多個目的地。 這是通過定義可以複製或選擇性地將事件路由到一個或多個通道的流複用器來實現的。上面的例子顯示了來自代理“foo”的原始碼將流程擴充套件到三個不同的通道。 扇出可以複製或多路複用。 在複製流的情況下,每個事件被髮送到所有三個通道。 對於多路複用情況,當事件的屬性與預配置的值匹配時,事件將被傳遞到可用通道的子集。 例如,如果一個名為“txnType”的事件屬性設定為“customer”,那麼它應該轉到channel1和channel3,如果它是“vendor”,那麼它應該轉到channel2,否則轉到channel3。 可以在代理的配置檔案中設定對映。

4 環境配置與部署

4.1 系統需求

  • 系統
    macOS 10.14.14
  • Java執行時環境
    Java 1.8或更高版本
  • 記憶體源
    通道或接收器使用的配置的足夠記憶體
  • 磁碟空間
    通道或接收器使用的配置的足夠磁碟空間
  • 目錄許可權
    代理使用的目錄的讀/寫許可權

4.2 下載與安裝

4.3 配置

  • 檢視安裝路徑
  • 系統配置檔案
export FLUME_VERSION=1.9.0
export FLUME_HOME=/usr/local/Cellar/flume/1.9.0/libexec
export FLUME_CONF_DIR=$FLUME_HOME/conf
export PATH=$FLUME_HOME/bin:$PATH複製程式碼

  • flume配置檔案
    配置JAVA_HOME
  • 驗證
    bin下的命令執行檔案

安裝成功

5 實戰

使用Flume的核心就在於配置檔案

  • 配置Source
  • 配置Channel
  • 配置Sink
  • 組織在一起

5.1 場景1 - 從指定網路埠收集資料輸出到控制檯

看看官網的第一個案例

# example.conf: A single-node Flume configuration

# Name the components on this agent
a1.sources = r1
a1.sinks = k1
a1.channels = c1

# Describe/configure the source
a1.sources.r1.type = netcat
a1.sources.r1.bind = localhost
a1.sources.r1.port = 44444

# Describe the sink
a1.sinks.k1.type = logger

# Use a channel which buffers events in memory
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100

# Bind the source and sink to the channel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1複製程式碼

a1:agent名稱r1:Source名稱k1:Sink名稱c1:Channel名稱

看看其中的

Sources : netcat

類似於netcat的源,它偵聽給定埠並將每行文字轉換為事件。 像nc -k -l [host] [port]這樣的行為。 換句話說,它開啟一個指定的埠並偵聽資料。 期望是提供的資料是換行符分隔的文字。 每行文字都轉換為Flume事件,並通過連線的通道傳送。

必需屬性以粗體顯示。

Sinks:logger

在INFO級別記錄事件。 通常用於測試/除錯目的。 必需屬性以粗體顯示。 此接收器是唯一的例外,它不需要在“記錄原始資料”部分中說明的額外配置。

channel:memor

事件儲存在具有可配置最大大小的記憶體中佇列中。 它非常適用於需要更高吞吐量的流量,並且在代理髮生故障時準備丟失分階段資料。 必需屬性以粗體顯示。

實戰

新建example.conf配置

在conf目錄下

啟動一個agent

使用名為flume-ng的shell指令碼啟動代理程式,該指令碼位於Flume發行版的bin目錄中。 您需要在命令列上指定代理名稱,config目錄和配置檔案:

bin/flume-ng agent -n $agent_name -c conf -f conf/flume-conf.properties.template複製程式碼
  • 回顧命令引數的意義
bin/flume-ng agent \
--name a1 \
--conf $FLUME_HOME/conf \
--conf-file $FLUME_HOME/conf/example.conf \
-Dflume.root.logger=INFO,console複製程式碼

現在,代理將開始執行在給定屬性檔案中配置的源和接收器。

使用telnet進行測試驗證

  • 注意
telnet 127.0.0.1 44444複製程式碼
  • 傳送了兩條資料
  • 這邊接收到了資料

    讓我們詳細分析下上圖中的資料資訊
2019-06-12 17:52:39,711 (SinkRunner-PollingRunner-DefaultSinkProcessor)
[INFO - org.apache.flume.sink.LoggerSink.process(LoggerSink.java:95)] 
Event: { headers:{} body: 4A 61 76 61 45 64 67 65 0D                      JavaEdge. }複製程式碼

其中的Event是Fluem資料傳輸的基本單元Event = 可選的header + byte array

5.2 場景2 - 監控一個檔案實時採集新增的資料輸出到控制檯

Exec Source

Exec源在啟動時執行給定的Unix命令,並期望該程式在標準輸出上連續生成資料(stderr被簡單地丟棄,除非屬性logStdErr設定為true)。 如果程式因任何原因退出,則源也會退出並且不會生成其他資料。 這意味著諸如cat [named pipe]或tail -F [file]之類的配置將產生所需的結果,而日期可能不會 - 前兩個命令產生資料流,而後者產生單個事件並退出

Agent 選型

exec source + memory channel + logger sink

配置檔案

# example.conf: A single-node Flume configuration

# Name the components on this agent
a1.sources = r1
a1.sinks = k1
a1.channels = c1

# Describe/configure the source
a1.sources.r1.type = exec
a1.sources.r1.command = tail -F /Volumes/doc/data/data.log
a1.sources.r1.shell = /bin/sh -c

# Describe the sink
a1.sinks.k1.type = logger

# Use a channel which buffers events in memory
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100

# Bind the source and sink to the channel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1複製程式碼

在conf下新建配置檔案如下:

  • data.log檔案內容
  • 成功接收
    在這裡插入圖片描述

5.3 應用場景3 - 將A伺服器上的日誌實時採集到B伺服器

技術選型

exec s + memory c + avro savro s + memory c + loger s

配置檔案

exec-memory-avro.conf

# Name the components on this agent
exec-memory-avro.sources = exec-source
exec-memory-avro.sinks = avro-sink
exec-memory-avro.channels = memory-channel

# Describe/configure the source
exec-memory-avro.sources.exec-source.type = exec
exec-memory-avro.sources.exec-source.command = tail -F /Volumes/doc/data/data.log
exec-memory-avro.sources.exec-source.shell = /bin/sh -c

# Describe the sink
exec-memory-avro.sinks.avro-sink.type = avro
exec-memory-avro.sinks.avro-sink.hostname = localhost
exec-memory-avro.sinks.avro-sink.port = 44444

# Use a channel which buffers events in memory
exec-memory-avro.channels.memory-channel.type = memory
exec-memory-avro.channels.memory-channel.capacity = 1000
exec-memory-avro.channels.memory-channel.transactionCapacity = 100

# Bind the source and sink to the channel
exec-memory-avro.sources.exec-source.channels = memory-channel
exec-memory-avro.sinks.avro-sink.channel = memory-channel複製程式碼

# Name the components on this agent
exec-memory-avro.sources = exec-source
exec-memory-avro.sinks = avro-sink
exec-memory-avro.channels = memory-channel

# Describe/configure the source
exec-memory-avro.sources.exec-source.type = exec
exec-memory-avro.sources.exec-source.command = tail -F /Volumes/doc/data/data.log
exec-memory-avro.sources.exec-source.shell = /bin/sh -c

# Describe the sink
exec-memory-avro.sinks.avro-sink.type = avro
exec-memory-avro.sinks.avro-sink.hostname = localhost
exec-memory-avro.sinks.avro-sink.port = 44444

# Use a channel which buffers events in memory
exec-memory-avro.channels.memory-channel.type = memory
exec-memory-avro.channels.memory-channel.capacity = 1000
exec-memory-avro.channels.memory-channel.transactionCapacity = 100

# Bind the source and sink to the channel
exec-memory-avro.sources.exec-source.channels = memory-channel
exec-memory-avro.sinks.avro-sink.channel = memory-channel複製程式碼

參考

https://tech.meituan.com/2013/12/09/meituan-flume-log-system-architecture-and-design.html