Apache Flume 入門教程
概要
Apache Flume 是一個分散式,可靠且可用的系統,用於有效地從許多不同的源收集、聚合和移動大量日誌資料到一個集中式的資料儲存區。
Flume 的使用不只限於日誌資料。因為資料來源可以定製,flume 可以被用來傳輸大量事件資料,這些資料不僅僅包括網路通訊資料、社交媒體產生的資料、電子郵件資訊等等。
Apache Flume 是 Apache 基金會的頂級專案,在加入 Apache 之前由 cloudera 公司開發以及維護。 Apache Flume 目前有兩種主版本: 0.9.x 和 1.x。 其中 0.9.x 是歷史版本,我們稱之為 Flume OG(original generation)。2011 年 10 月 22 號,cloudera 完成了 Flume-728,對 Flume 進行了里程碑式的改動:重構核心元件、核心配置以及程式碼架構,重構後的版本統稱為 Flume NG(next generation),也就是這裡說的 1.x 版本。
本文主要對 Flume 的作用以及核心概念進行介紹,通過本文讀者可以大致瞭解 flume 的使用場景、核心元件以及各元件的執行機制。關於如何配置 flume 以適應不同場景,我們會在另一篇文章中詳細解讀。
架構
資料流模型
一個 Flume 事件被定義為一個數據流單元。Flume agent 其實是一個 JVM 程序,該程序中包含完成任務所需要的各個元件,其中最核心的三個元件是 Source、Chanel 以及 Slink。
Source 消費由外部源(如Web伺服器)傳遞給它的事件。外部源以一定的格式傳送資料給 Flume,這個格式的定義由目標 Flume Source 來確定。例如,一個 Avro Flume source 可以從 Avro(Avro是一個基於二進位制資料傳輸的高效能中介軟體,是 hadoop 的一個子專案) 客戶端接收 Avro 事件,也可以從其他 Flume agents (該 Flume agents 有 Avro sink)接收 Avro 事件。 同樣,我們可以定義一個 Thrift Flume Source 接收來自 Thrift Sink、Flume Thrift RPC 客戶端或者其他任意客戶端(該客戶端可以使用任何語言編寫,只要滿足 Flume thrift 協議)的事件。
channel 可以理解為快取區,用來儲存從 Source 那拿到的資料,直到 Flume slink 將資料消費。file chanel 是一個例子,它將資料儲存在檔案系統中(當然你可以將資料放在記憶體中)。
slink 從 channel 消費完資料就會將資料從 channel 中清除,隨後將資料放到外部儲存系統例如 HDFS (使用 Flume HDFS sink)或傳送到其他 Flume agent 的 source 中。不管是 Source 還是 Slink 都是非同步傳送和消費資料。
複雜的流
Flume 允許使用者構建一個複雜的資料流,比如資料流經多個 agent 最終落地。It also allows fan-in and fan-out flows, contextual routing and backup routes (fail-over) for failed hops.
可靠性
事件被儲存在每個 agent 的 channel 中。隨後這些事件會發送到流中的下一個 agent 或者裝置儲存中(例如 HDFS)。只有事件已經被儲存在下一個 agent 的 channel 中 或裝置儲存中時,當前 channel 才會清除該事件。這種機制保證了流在端到端的傳輸中具有可靠性。
Flume使用事務方法(transactional approach)來保證事件的可靠傳輸。在 source 和 slink 中,事件的儲存以及恢復作為事務進行封裝,存放事件到 channel 中以及從 channel 中拉取事件均是事務性的。這保證了流中的事件在節點之間傳輸是可靠的。
可恢復
事件在 channel 中進行,該 channel 負責保障事件從故障中恢復。Flume 支援一個由本地檔案系統支援的持久化檔案(檔案模式:channel.type = "file") channel。同樣也支援記憶體模式(channel.type = "memmory"),即將事件儲存在記憶體佇列中。顯然,記憶體模式相對與檔案模型效能會更好,但是當 agent 程序不幸掛掉時,記憶體模式下儲存在 channel 中的事件將丟失,無法進行恢復。
構建
構建一個 agent
Flume agent 的配置儲存在一個本地配置檔案中。它是一個 text 文字,java 程式可以直接方便地讀取其屬性。可以在同一配置檔案中指定一個或多個 agent 的配置。配置檔案指定了 agnet 中每個 source、channel、slink 的屬性,以及三者如何組合形成資料流。
配置單個元件
流中的每一個元件(source、channel、slink)都有自己的名稱、型別以及一系列配置屬性。例如,一個 Avro source 需要配置 hostname (或者 IP 地址)以及埠號來接收資料。一個記憶體模式 channel 可以有最大佇列長度的屬性("capacity": channel 中最大容納多少事件)。一個 HDFS slink 則需要知道檔案系統的 URL(hdfs://****)、檔案落地的路徑、檔案回滾的評率("hdfs.rollInterval": 每隔多少秒將零時檔案回滾成最終檔案儲存到 HDFS 中)。所有這些關於各個元件的屬性需要在配置檔案中進行指定。
將各個部分組合起來
Agent 需要知道載入哪些元件以及如何將這些元件組合起來形成資料流。Flume 指定每個元件的名稱(source、channel、slink),同時明確地告訴我們 channel 與 哪些 source 和 slink 連線,這樣各個元件就能組合起來。例如,一個叫 "avroWeb" 的 source 通過一個叫 "file-channel" 的channel 將事件傳遞到 HDFS sink 中。配置檔案需包含這些元件的名稱以及組合關係。
開始一個 agent
我們可以通過 Flume bin 目錄下的指令碼檔案(flume-ng)來啟動 agent。在命令後面,你需要指定 agent 的名稱、配置檔案:
$ bin/flume-ng agent -n $agent_name -c conf -f conf/flume-conf.properties.template
複製程式碼
執行以上命令,agent 將會按照配置檔案裡描述的方式來執行元件。
一個簡單的示例
這裡,我們給出一個配置檔案的示例,該示例為 flume 單節點部署的配置方式。
# example.conf: A single-node Flume configuration
# Name the components on this agent
a1.sources = r1
a1.sinks = k1
a1.channels = c1
# Describe/configure the source
a1.sources.r1.type = netcat
a1.sources.r1.bind = localhost
a1.sources.r1.port = 44444
# Describe the sink
a1.sinks.k1.type = logger
# Use a channel which buffers events in memory
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
# Bind the source and sink to the channel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
複製程式碼
看看這個配置檔案,我們可以發現這個 agent 的名稱是 a1。其中該 agent 的 source 監聽 44444 埠。channel 採用記憶體模式,而 slink 直接輸出資料到 控制檯上(logger)。配置檔案指定了各個元件的名稱,並描述了它們的型別以及其他屬性。當然,一個配置檔案可以配置多個 agent 屬性,當希望執行指定 agent 程序時,我們需要在命令列中顯示的給出該 agent 的名稱:
$ bin/flume-ng agent --conf conf --conf-file example.conf --name a1 -Dflume.root.logger=INFO,console
複製程式碼
注意,在實際部署中,我們通常會包含一個選項: --conf-file = 。 目錄將包含一個 shell 指令碼 flume-env.sh 以及一個 log4j 屬性檔案。 在這個例子中,我們傳遞一個 Java 選項來強制 Flume 將日誌輸出到控制檯。
下面的例子中,我們可以遠端 telnet 訪問 44444 埠來向 agent 傳送資料:
$ telnet localhost 44444
Trying 127.0.0.1...
Connected to localhost.localdomain (127.0.0.1).
Escape character is '^]'.
Hello world! <ENTER>
OK
複製程式碼
agent 程序的控制檯將會列印通過 telnet 傳送的資料:
12/06/19 15:32:19 INFO source.NetcatSource: Source starting
12/06/19 15:32:19 INFO source.NetcatSource: Created serverSocket:sun.nio.ch.ServerSocketChannelImpl[/127.0.0.1:44444]
12/06/19 15:32:34 INFO sink.LoggerSink: Event: { headers:{} body: 48 65 6C 6C 6F 20 77 6F 72 6C 64 21 0D Hello world!. }
複製程式碼
完成這一步,恭喜你已經成功地配置以及部署一個 flume agent。
資料獲取(Data ingestion)
Flume 支援許多從外部源獲取資料的機制。
RPC
一個 Avro client 可以使用 rpc 機制傳送指定的檔案到 source 中:
$ bin/flume-ng avro-client -H localhost -p 41414 -F /usr/logs/log.10
複製程式碼
上面的命令會將 /usr/logs/log.10 傳送到監聽 41414 埠的 source 上。
網路流(Network streams)
Flume 支援從一些流行的日誌流中讀取資料,例如:
- Avro
- Thrift
- Syslog
- Netcat
設定多 agent 流(Setting multi-agent flow)
Flume 支援將多個 agent 串聯起來,完成這項操作。合併(Consolidation)
當需要從眾多主機上收集日誌資訊時,我們可以在每臺主機上部署 agent,這些主機的 slink 均連線到最終日誌落地主機的 source 上。落地主機將所有資料進行組合,落地到 HDFS 上。
掃碼關注微信公眾號"Kooola大資料",聊人生|聊技術