1. 程式人生 > 其它 >大資料實踐(十二)Flume入門

大資料實踐(十二)Flume入門

技術標籤:Hadoop

Flume 1.6.0

Flume簡介

Apache Flume 是一個分散式,高可用的資料收集系統。它可以從不同的資料來源收集資料,經過聚合後傳送到儲存系統中,通常用於日誌資料的收集。Flume 分為 NG 和 OG (1.0 之前) 兩個版本,NG 在 OG 的基礎上進行了完全的重構,是目前使用最為廣泛的版本。下面的介紹均以 NG 為基礎。

外部資料來源以特定格式向 Flume 傳送 events (事件),當 source 接收到 events 時,它將其儲存到一個或多個 channelchanne 會一直儲存 events 直到它被 sink 所消費。sink

的主要功能從 channel 中讀取 events,並將其存入外部儲存系統或轉發到下一個 source,成功後再從 channel 中移除 events

1、 基本概念

1. Event

Event 是 Flume NG 資料傳輸的基本單元。類似於 JMS 和訊息系統中的訊息。一個 Event 由標題和正文組成:前者是鍵/值對映,後者是任意位元組陣列。

2. Source

資料收集元件,從外部資料來源收集資料,並存儲到 Channel 中。

3. Channel

Channel 是源和接收器之間的管道,用於臨時儲存資料。可以是記憶體或持久化的檔案系統:

  • Memory Channel : 使用記憶體,優點是速度快,但資料可能會丟失 (如突然宕機);
  • File Channel : 使用持久化的檔案系統,優點是能保證資料不丟失,但是速度慢。

4. Sink

Sink 的主要功能從 Channel 中讀取 Event,並將其存入外部儲存系統或將其轉發到下一個 Source,成功後再從 Channel 中移除 Event

5. Agent

是一個獨立的 (JVM) 程序,包含 SourceChannelSink 等元件。

2、元件種類

Flume 中的每一個元件都提供了豐富的型別,適用於不同場景:

  • Source 型別 :內建了幾十種類型,如 Avro SourceThrift SourceKafka SourceJMS Source
  • Sink 型別 :HDFS SinkHive SinkHBaseSinksAvro Sink 等;
  • Channel 型別 :Memory ChannelJDBC ChannelKafka ChannelFile Channel 等。

Flume安裝

1、解壓、加入環境變數

完成後使用flume-ng version可以檢視版本。

>flume-ng version

Flume 1.6.0
Source code repository: https://git-wip-us.apache.org/repos/asf/flume.git
Revision: 2561a23240a71ba20bf288c7c2cda88f443c2080
Compiled by hshreedharan on Mon May 11 11:15:44 PDT 2015
From source with checksum b29e416802ce9ece3269d34233baf43f

2、修改配置檔案

進入conf目錄,將配置檔案複製一份。

cp flume-env.sh.template  flume-env.sh

修改其中的JAVA_HOMEFLUME_CLASSPATH

(不修改也可,會找環境中的jdk)

export JAVA_HOME=/usr/local/jdk

# Give Flume more memory and pre-allocate, enable remote monitoring via JMX
# export JAVA_OPTS="-Xms100m -Xmx2000m -Dcom.sun.management.jmxremote"

# Note that the Flume conf directory is always included in the classpath.
FLUME_CLASSPATH="/usr/local/bigdata/flume"

案例一

監聽檔案,將資料採集到控制檯。

編寫配置檔案,將/tmp/log.txt作為source。console作為sink.

#指定agent的sources,sinks,channels
a1.sources = s1  
a1.sinks = k1  
a1.channels = c1  
   
#配置sources屬性
a1.sources.s1.type = exec
a1.sources.s1.command = tail -F /tmp/log.txt
a1.sources.s1.shell = /bin/bash -c

#將sources與channels進行繫結
a1.sources.s1.channels = c1
   
#配置sink 
a1.sinks.k1.type = logger

#將sinks與channels進行繫結  
a1.sinks.k1.channel = c1  
   
#配置channel型別
a1.channels.c1.type = memory

執行agent,處理資料。

flume-ng agent --conf conf --conf-file ../file/exec-memory-logger.properties --name a1 -Dflume.root.logger=INFO,console

案例二

監聽資料夾,將檔案上傳至hdfs

#指定agent的sources,sinks,channels
a1.sources = s1  
a1.sinks = k1  
a1.channels = c1  
   
#配置sources屬性
a1.sources.s1.type =spooldir  
a1.sources.s1.spoolDir =/tmp/logs
a1.sources.s1.basenameHeader = true
a1.sources.s1.basenameHeaderKey = fileName 
#將sources與channels進行繫結  
a1.sources.s1.channels =c1 

   
#配置sink 
a1.sinks.k1.type = hdfs
a1.sinks.k1.hdfs.path = /flume/events/%y-%m-%d/%H/
a1.sinks.k1.hdfs.filePrefix = %{fileName}
#生成的檔案型別,預設是Sequencefile,可用DataStream,則為普通文字
a1.sinks.k1.hdfs.fileType = DataStream  
a1.sinks.k1.hdfs.useLocalTimeStamp = true
#將sinks與channels進行繫結  
a1.sinks.k1.channel = c1
   
#配置channel型別
a1.channels.c1.type = memory

執行監聽。

flume-ng agent \
--conf-file /usr/local/bigdata/file/flume-hdfs.properties \
--name a1 -Dflume.root.logger=INFO,console