大資料實踐(十二)Flume入門
技術標籤:Hadoop
Flume 1.6.0
Flume簡介
Apache Flume 是一個分散式,高可用的資料收集系統。它可以從不同的資料來源收集資料,經過聚合後傳送到儲存系統中,通常用於日誌資料的收集。Flume 分為 NG 和 OG (1.0 之前) 兩個版本,NG 在 OG 的基礎上進行了完全的重構,是目前使用最為廣泛的版本。下面的介紹均以 NG 為基礎。
外部資料來源以特定格式向 Flume 傳送 events
(事件),當 source
接收到 events
時,它將其儲存到一個或多個 channel
,channe
會一直儲存 events
直到它被 sink
所消費。sink
channel
中讀取 events
,並將其存入外部儲存系統或轉發到下一個 source
,成功後再從 channel
中移除 events
。
1、 基本概念
1. Event
Event
是 Flume NG 資料傳輸的基本單元。類似於 JMS 和訊息系統中的訊息。一個 Event
由標題和正文組成:前者是鍵/值對映,後者是任意位元組陣列。
2. Source
資料收集元件,從外部資料來源收集資料,並存儲到 Channel 中。
3. Channel
Channel
是源和接收器之間的管道,用於臨時儲存資料。可以是記憶體或持久化的檔案系統:
Memory Channel
: 使用記憶體,優點是速度快,但資料可能會丟失 (如突然宕機);File Channel
: 使用持久化的檔案系統,優點是能保證資料不丟失,但是速度慢。
4. Sink
Sink
的主要功能從 Channel
中讀取 Event
,並將其存入外部儲存系統或將其轉發到下一個 Source
,成功後再從 Channel
中移除 Event
。
5. Agent
是一個獨立的 (JVM) 程序,包含 Source
、 Channel
、 Sink
等元件。
2、元件種類
Flume 中的每一個元件都提供了豐富的型別,適用於不同場景:
- Source 型別 :內建了幾十種類型,如
Avro Source
,Thrift Source
,Kafka Source
,JMS Source
- Sink 型別 :
HDFS Sink
,Hive Sink
,HBaseSinks
,Avro Sink
等; - Channel 型別 :
Memory Channel
,JDBC Channel
,Kafka Channel
,File Channel
等。
Flume安裝
1、解壓、加入環境變數
完成後使用flume-ng version
可以檢視版本。
>flume-ng version
Flume 1.6.0
Source code repository: https://git-wip-us.apache.org/repos/asf/flume.git
Revision: 2561a23240a71ba20bf288c7c2cda88f443c2080
Compiled by hshreedharan on Mon May 11 11:15:44 PDT 2015
From source with checksum b29e416802ce9ece3269d34233baf43f
2、修改配置檔案
進入conf目錄,將配置檔案複製一份。
cp flume-env.sh.template flume-env.sh
修改其中的JAVA_HOME
和 FLUME_CLASSPATH
。
(不修改也可,會找環境中的jdk)
export JAVA_HOME=/usr/local/jdk
# Give Flume more memory and pre-allocate, enable remote monitoring via JMX
# export JAVA_OPTS="-Xms100m -Xmx2000m -Dcom.sun.management.jmxremote"
# Note that the Flume conf directory is always included in the classpath.
FLUME_CLASSPATH="/usr/local/bigdata/flume"
案例一
監聽檔案,將資料採集到控制檯。
編寫配置檔案,將/tmp/log.txt作為source。console作為sink.
#指定agent的sources,sinks,channels
a1.sources = s1
a1.sinks = k1
a1.channels = c1
#配置sources屬性
a1.sources.s1.type = exec
a1.sources.s1.command = tail -F /tmp/log.txt
a1.sources.s1.shell = /bin/bash -c
#將sources與channels進行繫結
a1.sources.s1.channels = c1
#配置sink
a1.sinks.k1.type = logger
#將sinks與channels進行繫結
a1.sinks.k1.channel = c1
#配置channel型別
a1.channels.c1.type = memory
執行agent,處理資料。
flume-ng agent --conf conf --conf-file ../file/exec-memory-logger.properties --name a1 -Dflume.root.logger=INFO,console
案例二
監聽資料夾,將檔案上傳至hdfs
#指定agent的sources,sinks,channels
a1.sources = s1
a1.sinks = k1
a1.channels = c1
#配置sources屬性
a1.sources.s1.type =spooldir
a1.sources.s1.spoolDir =/tmp/logs
a1.sources.s1.basenameHeader = true
a1.sources.s1.basenameHeaderKey = fileName
#將sources與channels進行繫結
a1.sources.s1.channels =c1
#配置sink
a1.sinks.k1.type = hdfs
a1.sinks.k1.hdfs.path = /flume/events/%y-%m-%d/%H/
a1.sinks.k1.hdfs.filePrefix = %{fileName}
#生成的檔案型別,預設是Sequencefile,可用DataStream,則為普通文字
a1.sinks.k1.hdfs.fileType = DataStream
a1.sinks.k1.hdfs.useLocalTimeStamp = true
#將sinks與channels進行繫結
a1.sinks.k1.channel = c1
#配置channel型別
a1.channels.c1.type = memory
執行監聽。
flume-ng agent \
--conf-file /usr/local/bigdata/file/flume-hdfs.properties \
--name a1 -Dflume.root.logger=INFO,console