1. 程式人生 > >Flume與實際專案中用到的

Flume與實際專案中用到的

http://flume.apache.org/

安裝
1、上傳
2、解壓
3、修改conf/flume-env.sh  檔案中的JDK目錄
 注意:JAVA_OPTS 配置  如果我們傳輸檔案過大 報記憶體溢位時 需要修改這個配置項
4、驗證安裝是否成功  ./flume-ng version
5、配置環境變數
    export FLUME_HOME=/home/apache-flume-1.6.0-bin


Source、Channel、Sink有哪些型別
    Flume Source
    Source型別                   | 說明
    Avro Source                 | 支援Avro協議(實際上是Avro RPC),內建支援
    Thrift Source               | 支援Thrift協議,內建支援
    Exec Source                 | 基於Unix的command在標準輸出上生產資料
    JMS Source                   | 從JMS系統(訊息、主題)中讀取資料
    Spooling Directory Source | 監控指定目錄內資料變更
    Twitter 1% firehose Source|    通過API持續下載Twitter資料,試驗性質
    Netcat Source               | 監控某個埠,將流經埠的每一個文字行資料作為Event輸入
    Sequence Generator Source | 序列生成器資料來源,生產序列資料
    Syslog Sources               | 讀取syslog資料,產生Event,支援UDP和TCP兩種協議
    HTTP Source                 | 基於HTTP POST或GET方式的資料來源,支援JSON、BLOB表示形式
    Legacy Sources               | 相容老的Flume OG中Source(0.9.x版本)

    Flume Channel
    Channel型別       說明
    Memory Channel                | Event資料儲存在記憶體中
    JDBC Channel                  | Event資料儲存在持久化儲存中,當前Flume Channel內建支援Derby
    File Channel                  | Event資料儲存在磁碟檔案中
    Spillable Memory Channel   | Event資料儲存在記憶體中和磁碟上,當記憶體佇列滿了,會持久化到磁碟檔案
    Pseudo Transaction Channel | 測試用途
    Custom Channel                | 自定義Channel實現

    Flume Sink
    Sink型別     說明
    HDFS Sink             | 資料寫入HDFS
    Logger Sink           | 資料寫入日誌檔案
    Avro Sink             | 資料被轉換成Avro Event,然後傳送到配置的RPC埠上
    Thrift Sink           | 資料被轉換成Thrift Event,然後傳送到配置的RPC埠上
    IRC Sink              | 資料在IRC上進行回放
    File Roll Sink         | 儲存資料到本地檔案系統
    Null Sink             | 丟棄到所有資料
    HBase Sink             | 資料寫入HBase資料庫
    Morphline Solr Sink | 資料傳送到Solr搜尋伺服器(叢集)
    ElasticSearch Sink     | 資料傳送到Elastic Search搜尋伺服器(叢集)
    Kite Dataset Sink     | 寫資料到Kite Dataset,試驗性質的
    Custom Sink           | 自定義Sink實現

專案中用到的:1、 A simple example
    http://flume.apache.org/FlumeUserGuide.html#a-simple-example
    
    配置檔案
    ############################################################
    # Name the components on this agent
    a1.sources = r1
    a1.sinks = k1
    a1.channels = c1

    # Describe/configure the source
    a1.sources.r1.type = netcat
    a1.sources.r1.bind = localhost
    a1.sources.r1.port = 44444

    # Describe the sink
    a1.sinks.k1.type = logger

    # Use a channel which buffers events in memory
    a1.channels.c1.type = memory
    a1.channels.c1.capacity = 1000
    a1.channels.c1.transactionCapacity = 100

    # Bind the source and sink to the channel
    a1.sources.r1.channels = c1
    a1.sinks.k1.channel = c1
    ############################################################

啟動flume
flume-ng agent -n a1 -c conf -f simple.conf -Dflume.root.logger=INFO,console

安裝telnet
yum install telnet
退出 ctrl+]  quit

Memory Chanel 配置
  capacity:預設該通道中最大的可以儲存的event數量是100,
  trasactionCapacity:每次最大可以source中拿到或者送到sink中的event數量也是100
  keep-alive:event新增到通道中或者移出的允許時間
  byte**:即event的位元組量的限制,只包括eventbody

專案中用到的:2、兩個flume做叢集

    node01伺服器中,配置檔案
    ############################################################
    # Name the components on this agent
    a1.sources = r1
    a1.sinks = k1
    a1.channels = c1

    # Describe/configure the source
    a1.sources.r1.type = netcat
    a1.sources.r1.bind = node1
    a1.sources.r1.port = 44444

    # Describe the sink
    # a1.sinks.k1.type = logger
    a1.sinks.k1.type = avro
    a1.sinks.k1.hostname = node2
    a1.sinks.k1.port = 60000

    # Use a channel which buffers events in memory
    a1.channels.c1.type = memory
    a1.channels.c1.capacity = 1000
    a1.channels.c1.transactionCapacity = 100

    # Bind the source and sink to the channel
    a1.sources.r1.channels = c1
    a1.sinks.k1.channel = c1
    ############################################################
    
    node02伺服器中,安裝Flume(步驟略)
    配置檔案
    ############################################################
    # Name the components on this agent
    a1.sources = r1
    a1.sinks = k1
    a1.channels = c1

    # Describe/configure the source
    a1.sources.r1.type = avro
    a1.sources.r1.bind = node2
    a1.sources.r1.port = 60000

    # Describe the sink
    a1.sinks.k1.type = logger

    # Use a channel which buffers events in memory
    a1.channels.c1.type = memory
    a1.channels.c1.capacity = 1000
    a1.channels.c1.transactionCapacity = 100

    # Bind the source and sink to the channel
    a1.sources.r1.channels = c1
    a1.sinks.k1.channel = c1
    ############################################################
    
    先啟動node02的Flume
    flume-ng agent  -n a1 -c conf -f avro.conf -Dflume.root.logger=INFO,console
    
    再啟動node01的Flume
    flume-ng agent  -n a1 -c conf -f simple.conf2 -Dflume.root.logger=INFO,console
    
    開啟telnet 測試  node02控制檯輸出結果


專案中用到的:3、Exec Source
        http://flume.apache.org/FlumeUserGuide.html#exec-source
        
    配置檔案
    ############################################################
    a1.sources = r1
    a1.sinks = k1
    a1.channels = c1

    # Describe/configure the source
    a1.sources.r1.type = exec
    a1.sources.r1.command = tail -F /home/flume.exec.log

    # Describe the sink
    a1.sinks.k1.type = logger
    
    # Use a channel which buffers events in memory
    a1.channels.c1.type = memory
    a1.channels.c1.capacity = 1000
    a1.channels.c1.transactionCapacity = 100

    # Bind the source and sink to the channel
    a1.sources.r1.channels = c1
    a1.sinks.k1.channel = c1
    ############################################################
    
    啟動Flume
    flume-ng agent -n a1 -c conf -f exec.conf -Dflume.root.logger=INFO,console
    
    建立空檔案演示 touch flume.exec.log
    迴圈新增資料
    for i in {1..50}; do echo "$i hi flume" >> flume.exec.log ; sleep 0.1; done
        
專案中用到的:4、Spooling Directory Source
        http://flume.apache.org/FlumeUserGuide.html#spooling-directory-source
    配置檔案
    ############################################################
    a1.sources = r1
    a1.sinks = k1
    a1.channels = c1

    # Describe/configure the source
    a1.sources.r1.type = spooldir
    a1.sources.r1.spoolDir = /home/logs
    a1.sources.r1.fileHeader = true

    # Describe the sink
    a1.sinks.k1.type = logger

    # Use a channel which buffers events in memory
    a1.channels.c1.type = memory
    a1.channels.c1.capacity = 1000
    a1.channels.c1.transactionCapacity = 100

    # Bind the source and sink to the channel
    a1.sources.r1.channels = c1
    a1.sinks.k1.channel = c1
    ############################################################

    啟動Flume
    flume-ng agent -n a1 -c conf -f spool.conf -Dflume.root.logger=INFO,console

    拷貝檔案演示
    mkdir logs
    cp flume.exec.log logs/


專案中用到的:5、hdfs sink
        http://flume.apache.org/FlumeUserGuide.html#hdfs-sink
    
        配置檔案
    ############################################################
    a1.sources = r1
    a1.sinks = k1
    a1.channels = c1

    # Describe/configure the source
    a1.sources.r1.type = spooldir
    a1.sources.r1.spoolDir = /home/logs
    a1.sources.r1.fileHeader = true

    # Describe the sink
    ***只修改上一個spool sink的配置程式碼塊 a1.sinks.k1.type = logger
    a1.sinks.k1.type=hdfs
    a1.sinks.k1.hdfs.path=hdfs://bjsxt/flume/%Y-%m-%d/%H%M
    
    ##每隔60s或者檔案大小超過10M的時候產生新檔案
    # hdfs有多少條訊息時新建檔案,0不基於訊息個數
    a1.sinks.k1.hdfs.rollCount=0
    # hdfs建立多長時間新建檔案,0不基於時間
    a1.sinks.k1.hdfs.rollInterval=60
    # hdfs多大時新建檔案,0不基於檔案大小
    a1.sinks.k1.hdfs.rollSize=10240
    # 當目前被開啟的臨時檔案在該引數指定的時間(秒)內,沒有任何資料寫入,則將該臨時檔案關閉並重命名成目標檔案
    a1.sinks.k1.hdfs.idleTimeout=3
    
    a1.sinks.k1.hdfs.fileType=DataStream
    a1.sinks.k1.hdfs.useLocalTimeStamp=true
    
    ## 每五分鐘生成一個目錄:
    # 是否啟用時間上的”捨棄”,這裡的”捨棄”,類似於”四捨五入”,後面再介紹。如果啟用,則會影響除了%t的其他所有時間表達式
    a1.sinks.k1.hdfs.round=true
    # 時間上進行“捨棄”的值;
    a1.sinks.k1.hdfs.roundValue=5
    # 時間上進行”捨棄”的單位,包含:second,minute,hour
    a1.sinks.k1.hdfs.roundUnit=minute

    # Use a channel which buffers events in memory
    a1.channels.c1.type = memory
    a1.channels.c1.capacity = 1000
    a1.channels.c1.transactionCapacity = 100

    # Bind the source and sink to the channel
    a1.sources.r1.channels = c1
    a1.sinks.k1.channel = c1
    ############################################################
    建立HDFS目錄
    hadoop fs -mkdir /flume
    
    啟動Flume
    flume-ng agent -n a1 -c conf -f hdfs.conf -Dflume.root.logger=INFO,console

    檢視hdfs檔案
    hadoop fs -ls /flume/...
    hadoop fs -get /flume/...