1. 程式人生 > >Flume學習筆記

Flume學習筆記

flume學習筆記

一、什麼是Flume?     Apache Flume 是一個從可以收集例如日誌,事件等資料資源,並將這些數量龐大的資料從各項資料資源中集中起來儲存的工具/服務,或者數集中機制。flume具有高可用,分散式,配置工具,其設計的原理也是基於將資料流,如日誌資料從各種網站伺服器上彙集起來儲存到HDFS,HBase等集中儲存器中。二、flume特性     Flume是一個分散式、可靠、和高可用的海量日誌採集、聚合和傳輸的系統。     Flume可以採集檔案,socket資料包、檔案、資料夾、kafka等各種形式源資料,又可以將採集到的資料(下沉sink)輸出到HDFS、hbase、hive、kafka等眾多外部儲存系統中     一般的採集需求,通過對flume的簡單配置即可實現     Flume針對特殊場景也具備良好的自定義擴充套件能力,因此,flume可以適用於大部分的日常資料採集場景三、flume元件解析

    對於每一個Agent來說,它就是一共獨立的守護程序(JVM),它從客戶端接收資料     1、Flume分散式系統中最核心的角色是agent,flume採集系統就是由一個個agent所連線起來形成     2、每一個agent相當於一個數據(被封裝成Event物件)傳遞員,內部有三個元件:         a)Source:採集元件,用於跟資料來源對接,以獲取資料         b)Sink:下沉元件,用於往下一級agent傳遞資料或者往最終儲存系統傳遞資料         c)Channel:傳輸通道元件,用於從source將資料傳遞到sink         d)event(所傳的訊息就是event)一行文字內容會被反序列化成一個event(event的最大定義為2048位元組,超過,則會切割,剩下的會被放到下一個event中,預設編碼是UTF-8。四、flume安裝
    1)解壓

        tar -zxvf apache-flume-1.6.0-bin.tar.gz -C /root/app/

    2)修改配置檔案

        將flume-env.sh.template  改為 flume-env.sh

        在裡面配置JAVA_HOME五、測試     建立一個myconf資料夾,在裡邊寫配置檔案          1.使用telnet 傳送訊息 然後輸出到控制檯         1)建立netcat-console.conf檔案           

            # 定義這個agent中各元件的名字
            a1.sources = r1
            a1.sinks = k1
            a1.channels = c1

            # 描述和配置source元件:r1
            a1.sources.r1.type = netcat
            a1.sources.r1.bind = hadoop01
            a1.sources.r1.port = 44444

            # 描述和配置sink元件:k1
            a1.sinks.k1.type = logger

            # 描述和配置channel元件,此處使用是記憶體快取的方式
            a1.channels.c1.type = memory
            a1.channels.c1.capacity = 1000
            a1.channels.c1.transactionCapacity = 100

            # 描述和配置source  channel   sink之間的連線關係
            a1.sources.r1.channels = c1
            a1.sinks.k1.channel = c1

        2)啟動           

 bin/flume-ng agent -c conf -f myconf/socket-file.conf -n a1 -Dflume.root.logger=INFO,console

            在另一個能跟agent節點聯網的機器上執行  

              telnet hadoop01 44444

        2.採集目錄到hdfs         需求:某伺服器的某特定目錄下,會不斷產生新的檔案,每當有新檔案出現,就需要把檔案採集到HDFS中去             根據需求,首先定義以下3大要素             資料來源元件,即source ——監控檔案目錄 :  spooldir             spooldir特性:                1、監視一個目錄,只要目錄中出現新檔案,就會採集檔案中的內容                2、採集完成的檔案,會被agent自動新增一個字尾:COMPLETED                3、所監視的目錄中不允許重複出現相同檔名的檔案             下沉元件,即sink——HDFS檔案系統  :  hdfs sink             通道元件,即channel——可用file channel 也可以用記憶體channel                          配置檔案 directory-hdfs.conf             

               # 定義這個agent中各元件的名字
               a1.sources = r1
               a1.sinks = k1
               a1.channels = c1
               
               # 描述和配置source元件:r1
               a1.sources.r1.type = spooldir
               a1.sources.r1.spoolDir = /root/data/flumedata
               # 描述和配置sink元件:k1
               a1.sinks.k1.type = hdfs
               a1.sinks.k1.hdfs.path = /flumeday15/events
               a1.sinks.k1.hdfa.filePrefix = events-
               #每隔N s將臨時檔案滾動成一個目標檔案
               a1.sinks.k1.hdfs.rollInterval = 60
               #當檔案大小為1048576個位元組時,將檔案滾動成一個檔案
               a1.sinks.k1.hdfs.rollSize = 1048576
               #event數量達到該數量的時候,將臨時檔案滾動成目標檔案
               a1.sinks.k1.hdfs.rollCount = 500000
              
               # 描述和配置channel元件,此處使用是記憶體快取的方式
               a1.channels.c1.type = memory
               a1.channels.c1.capacity = 1000
               a1.channels.c1.transactionCapacity = 100
              
               # 描述和配置source  channel   sink之間的連線關係
               a1.sources.r1.channels = c1
               a1.sinks.k1.channel = c1

         啟動hadoop環境          另外建立一個資料夾 /root/data/flumedata   往裡邊新增檔案,檢視hdfs變化     3.採集檔案到kafka         採集需求:比如業務系統使用log4j生成的日誌,日誌內容不斷增加,需要把追加到日誌檔案中的資料實時採集到kafka                  根據需求,首先定義以下3大要素         採集源,即source——監控檔案內容更新 :  exec  ‘tail -F file’         下沉目標,即sink——kafka檔案系統  :  org.apache.flume.sink.kafka.KafkaSink         Source和sink之間的傳遞通道——channel,可用file channel 也可以用 記憶體channel                  配置檔案編寫(exec-kafka.conf)           

            # 定義這個agent中各元件的名字
            a1.sources = r1
            a1.sinks = k1
            a1.channels = c1

            # 描述和配置source元件:r1
            a1.sources.r1.type = exec
            a1.sources.r1.command = tail -F /opt/datas/tmp.log

            # 描述和配置sink元件:k1
            a1.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink
            a1.sinks.k1.kafka.topic = flumeTopic
            a1.sinks.k1.kafka.bootstrap.servers = hadoop01:9092,hadoop02:9092,hadoop03:9092
            a1.sinks.k1.kafka.flumeBatchSize = 20
            a1.sinks.k1.kafka.producer.acks = 1
            a1.sinks.k1.kafka.producer.linger.ms = 1
            a1.sinks.ki.kafka.producer.compression.type = snappy


            # 描述和配置channel元件,此處使用是記憶體快取的方式
            a1.channels.c1.type = memory
            a1.channels.c1.capacity = 1000
            a1.channels.c1.transactionCapacity = 100

            # 描述和配置source  channel   sink之間的連線關係
            a1.sources.r1.channels = c1
            a1.sinks.k1.channel = c1

                 執行:啟動zookeeper               

          zkServer.sh start 

              啟動kafka                   

          /root/app/kafka_2.11-0.10.2.1/bin/kafka-server-start.sh -daemon 
/root/app/kafka_2.11-0.10.2.1/config/server.properties

              建立Topic                 

         /root/app/kafka_2.11-0.10.2.1/bin/kafka-topics.sh --create
         --zookeeper 192.168.1.3:2181,192.168.1.4:2181,192.168.1.5:2181
         --replication-factor 3 --partitions 3 --topic flumeTopic

              啟動flume    

                bin/flume-ng agent -c conf -f myconf/exec-kafka.conf -n a1 -Dflume.root.logger=INFO,console

              輸出日誌                 

                while true
                >do
                >echo `date` >> /opt/datas/tmp.log
                >sleep 0.5
                >done

              檢視kafka資料               

         /root/app/kafka_2.11-0.10.2.1/bin/kafka-topics.sh --describe --zookeeper localhost:2181 --topic flumeTopic

    4.寫到磁碟中         # 定義這個agent中各元件的名字         

        a1.sources = r1
        a1.sinks = k1
        a1.channels = c1

        # 描述和配置source元件:r1
        a1.sources.r1.type = netcat
        a1.sources.r1.bind = bigdata01
        a1.sources.r1.port = 44444

        # 描述和配置sink元件:k1
        a1.sinks.k1.type = file_roll
        a1.sinks.k1.sink.directory = /opt/datas/flumelog


        # 描述和配置channel元件,此處使用是記憶體快取的方式
        a1.channels.c1.type = memory
        a1.channels.c1.capacity = 1000
        a1.channels.c1.transactionCapacity = 100

        # 描述和配置source  channel   sink之間的連線關係
        a1.sources.r1.channels = c1
        a1.sinks.k1.channel = c1

    5.Flume HA配置         hadoop01  -->| sink1 --->  hadopp02                      | sink2 --->  hadoop03         hadoop02宕機後 hadoop01 輸出到 hadoop03                  hadoop01  的 spooldir-avroMultiSink.conf                      

            # 定義這個agent中各元件的名字
            a1.sources = r1
            a1.sinks = k1 k2
            a1.channels = c1

            #set gruop
            a1.sinkgroups = g1

            # 描述和配置source元件:r1
            a1.sources.r1.type = spooldir
            a1.sources.r1.spoolDir = /root/data/flumedata
            a1.sources.r1.fileHeader = true


            # 描述和配置sink元件:k1
            # set sink1
            a1.sinks.k1.channel = c1
            a1.sinks.k1.type = avro
            a1.sinks.k1.hostname = hadoop02
            a1.sinks.k1.port = 4545

            # set sink2
            a1.sinks.k2.channel = c1
            a1.sinks.k2.type = avro
            a1.sinks.k2.hostname = hadoop03
            a1.sinks.k2.port = 4545

            #set sink group
            a1.sinkgroups.g1.sinks = k1 k2


            # 描述和配置channel元件,此處使用是記憶體快取的方式
            a1.channels.c1.type = memory
            a1.channels.c1.capacity = 1000
            a1.channels.c1.transactionCapacity = 100

            # 描述和配置source  channel   sink之間的連線關係
            a1.sources.r1.channels = c1

            #set failover
            a1.sinkgroups.g1.processor.type = failover
            a1.sinkgroups.g1.processor.priority.k1 = 10
            a1.sinkgroups.g1.processor.priority.k2 = 1
            a1.sinkgroups.g1.processor.maxpenalty = 10000

                 hadoop02  avro-logger.conf         

            # 定義這個agent中各元件的名字
            a1.sources = r1
            a1.sinks = k1
            a1.channels = c1

            # 描述和配置source元件:r1
            a1.sources.r1.type = avro
            a1.sources.r1.bind = hadoop02
            a1.sources.r1.port = 4545

            # 描述和配置sink元件:k1
            a1.sinks.k1.type = logger

            # 描述和配置channel元件,此處使用是記憶體快取的方式
            a1.channels.c1.type = memory
            a1.channels.c1.capacity = 1000
            a1.channels.c1.transactionCapacity = 100

            # 描述和配置source  channel   sink之間的連線關係
            a1.sources.r1.channels = c1
            a1.sinks.k1.channel = c

                     hadoop03  avro-logger.conf         

            # 定義這個agent中各元件的名字
            a1.sources = r1
            a1.sinks = k1
            a1.channels = c1

            # 描述和配置source元件:r1
            a1.sources.r1.type = avro
            a1.sources.r1.bind = hadoop03
            a1.sources.r1.port = 4545

            # 描述和配置sink元件:k1
            a1.sinks.k1.type = logger

            # 描述和配置channel元件,此處使用是記憶體快取的方式
            a1.channels.c1.type = memory
            a1.channels.c1.capacity = 1000
            a1.channels.c1.transactionCapacity = 100

            # 描述和配置source  channel   sink之間的連線關係
            a1.sources.r1.channels = c1
            a1.sinks.k1.channel = c

        啟動:         hadoop02 

            bin/flume-ng agent -c conf -f myconf/avro-logger.conf -n a1 -Dflume.root.logger=INFO,console

        hadoop03  

           bin/flume-ng agent -c conf -f myconf/avro-logger.conf -n a1 -Dflume.root.logger=INFO,console

        hadoop01

            bin/flume-ng agent -c conf -f myconf/spooldir-avroMultiSink.conf -n a1 -Dflume.root.logger=INFO,console