Flume與實際專案中用到的
http://flume.apache.org/
安裝
1、上傳
2、解壓
3、修改conf/flume-env.sh 檔案中的JDK目錄
注意:JAVA_OPTS 配置 如果我們傳輸檔案過大 報記憶體溢位時 需要修改這個配置項
4、驗證安裝是否成功 ./flume-ng version
5、配置環境變數
export FLUME_HOME=/home/apache-flume-1.6.0-bin
Source、Channel、Sink有哪些型別
Flume Source
Source型別 | 說明
Avro Source | 支援Avro協議(實際上是Avro RPC),內建支援
Thrift Source | 支援Thrift協議,內建支援
Exec Source | 基於Unix的command在標準輸出上生產資料
JMS Source | 從JMS系統(訊息、主題)中讀取資料
Spooling Directory Source | 監控指定目錄內資料變更
Twitter 1% firehose Source| 通過API持續下載Twitter資料,試驗性質
Netcat Source | 監控某個埠,將流經埠的每一個文字行資料作為Event輸入
Sequence Generator Source | 序列生成器資料來源,生產序列資料
Syslog Sources | 讀取syslog資料,產生Event,支援UDP和TCP兩種協議
HTTP Source | 基於HTTP POST或GET方式的資料來源,支援JSON、BLOB表示形式
Legacy Sources | 相容老的Flume OG中Source(0.9.x版本)
Flume Channel
Channel型別 說明
Memory Channel | Event資料儲存在記憶體中
JDBC Channel | Event資料儲存在持久化儲存中,當前Flume Channel內建支援Derby
File Channel | Event資料儲存在磁碟檔案中
Spillable Memory Channel | Event資料儲存在記憶體中和磁碟上,當記憶體佇列滿了,會持久化到磁碟檔案
Pseudo Transaction Channel | 測試用途
Custom Channel | 自定義Channel實現
Flume Sink
Sink型別 說明
HDFS Sink | 資料寫入HDFS
Logger Sink | 資料寫入日誌檔案
Avro Sink | 資料被轉換成Avro Event,然後傳送到配置的RPC埠上
Thrift Sink | 資料被轉換成Thrift Event,然後傳送到配置的RPC埠上
IRC Sink | 資料在IRC上進行回放
File Roll Sink | 儲存資料到本地檔案系統
Null Sink | 丟棄到所有資料
HBase Sink | 資料寫入HBase資料庫
Morphline Solr Sink | 資料傳送到Solr搜尋伺服器(叢集)
ElasticSearch Sink | 資料傳送到Elastic Search搜尋伺服器(叢集)
Kite Dataset Sink | 寫資料到Kite Dataset,試驗性質的
Custom Sink | 自定義Sink實現
專案中用到的:1、 A simple example
http://flume.apache.org/FlumeUserGuide.html#a-simple-example
配置檔案
############################################################
# Name the components on this agent
a1.sources = r1
a1.sinks = k1
a1.channels = c1
# Describe/configure the source
a1.sources.r1.type = netcat
a1.sources.r1.bind = localhost
a1.sources.r1.port = 44444
# Describe the sink
a1.sinks.k1.type = logger
# Use a channel which buffers events in memory
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
# Bind the source and sink to the channel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
############################################################
啟動flume
flume-ng agent -n a1 -c conf -f simple.conf -Dflume.root.logger=INFO,console
安裝telnet
yum install telnet
退出 ctrl+] quit
Memory Chanel 配置
capacity:預設該通道中最大的可以儲存的event數量是100,
trasactionCapacity:每次最大可以source中拿到或者送到sink中的event數量也是100
keep-alive:event新增到通道中或者移出的允許時間
byte**:即event的位元組量的限制,只包括eventbody
專案中用到的:2、兩個flume做叢集
node01伺服器中,配置檔案
############################################################
# Name the components on this agent
a1.sources = r1
a1.sinks = k1
a1.channels = c1
# Describe/configure the source
a1.sources.r1.type = netcat
a1.sources.r1.bind = node1
a1.sources.r1.port = 44444
# Describe the sink
# a1.sinks.k1.type = logger
a1.sinks.k1.type = avro
a1.sinks.k1.hostname = node2
a1.sinks.k1.port = 60000
# Use a channel which buffers events in memory
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
# Bind the source and sink to the channel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
############################################################
node02伺服器中,安裝Flume(步驟略)
配置檔案
############################################################
# Name the components on this agent
a1.sources = r1
a1.sinks = k1
a1.channels = c1
# Describe/configure the source
a1.sources.r1.type = avro
a1.sources.r1.bind = node2
a1.sources.r1.port = 60000
# Describe the sink
a1.sinks.k1.type = logger
# Use a channel which buffers events in memory
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
# Bind the source and sink to the channel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
############################################################
先啟動node02的Flume
flume-ng agent -n a1 -c conf -f avro.conf -Dflume.root.logger=INFO,console
再啟動node01的Flume
flume-ng agent -n a1 -c conf -f simple.conf2 -Dflume.root.logger=INFO,console
開啟telnet 測試 node02控制檯輸出結果
專案中用到的:3、Exec Source
http://flume.apache.org/FlumeUserGuide.html#exec-source
配置檔案
############################################################
a1.sources = r1
a1.sinks = k1
a1.channels = c1
# Describe/configure the source
a1.sources.r1.type = exec
a1.sources.r1.command = tail -F /home/flume.exec.log
# Describe the sink
a1.sinks.k1.type = logger
# Use a channel which buffers events in memory
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
# Bind the source and sink to the channel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
############################################################
啟動Flume
flume-ng agent -n a1 -c conf -f exec.conf -Dflume.root.logger=INFO,console
建立空檔案演示 touch flume.exec.log
迴圈新增資料
for i in {1..50}; do echo "$i hi flume" >> flume.exec.log ; sleep 0.1; done
專案中用到的:4、Spooling Directory Source
http://flume.apache.org/FlumeUserGuide.html#spooling-directory-source
配置檔案
############################################################
a1.sources = r1
a1.sinks = k1
a1.channels = c1
# Describe/configure the source
a1.sources.r1.type = spooldir
a1.sources.r1.spoolDir = /home/logs
a1.sources.r1.fileHeader = true
# Describe the sink
a1.sinks.k1.type = logger
# Use a channel which buffers events in memory
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
# Bind the source and sink to the channel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
############################################################
啟動Flume
flume-ng agent -n a1 -c conf -f spool.conf -Dflume.root.logger=INFO,console
拷貝檔案演示
mkdir logs
cp flume.exec.log logs/
專案中用到的:5、hdfs sink
http://flume.apache.org/FlumeUserGuide.html#hdfs-sink
配置檔案
############################################################
a1.sources = r1
a1.sinks = k1
a1.channels = c1
# Describe/configure the source
a1.sources.r1.type = spooldir
a1.sources.r1.spoolDir = /home/logs
a1.sources.r1.fileHeader = true
# Describe the sink
***只修改上一個spool sink的配置程式碼塊 a1.sinks.k1.type = logger
a1.sinks.k1.type=hdfs
a1.sinks.k1.hdfs.path=hdfs://bjsxt/flume/%Y-%m-%d/%H%M
##每隔60s或者檔案大小超過10M的時候產生新檔案
# hdfs有多少條訊息時新建檔案,0不基於訊息個數
a1.sinks.k1.hdfs.rollCount=0
# hdfs建立多長時間新建檔案,0不基於時間
a1.sinks.k1.hdfs.rollInterval=60
# hdfs多大時新建檔案,0不基於檔案大小
a1.sinks.k1.hdfs.rollSize=10240
# 當目前被開啟的臨時檔案在該引數指定的時間(秒)內,沒有任何資料寫入,則將該臨時檔案關閉並重命名成目標檔案
a1.sinks.k1.hdfs.idleTimeout=3
a1.sinks.k1.hdfs.fileType=DataStream
a1.sinks.k1.hdfs.useLocalTimeStamp=true
## 每五分鐘生成一個目錄:
# 是否啟用時間上的”捨棄”,這裡的”捨棄”,類似於”四捨五入”,後面再介紹。如果啟用,則會影響除了%t的其他所有時間表達式
a1.sinks.k1.hdfs.round=true
# 時間上進行“捨棄”的值;
a1.sinks.k1.hdfs.roundValue=5
# 時間上進行”捨棄”的單位,包含:second,minute,hour
a1.sinks.k1.hdfs.roundUnit=minute
# Use a channel which buffers events in memory
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
# Bind the source and sink to the channel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
############################################################
建立HDFS目錄
hadoop fs -mkdir /flume
啟動Flume
flume-ng agent -n a1 -c conf -f hdfs.conf -Dflume.root.logger=INFO,console
檢視hdfs檔案
hadoop fs -ls /flume/...
hadoop fs -get /flume/...