1. 程式人生 > >Flume概述和簡單實例

Flume概述和簡單實例

可用 日誌采集 解壓 mar vid except null lose provider

Flume概述

Flume是一個分布式、可靠、和高可用的海量日誌采集、聚合和傳輸的系統。支持在日誌系統中定制各類數據發送方,用於收集數據;同時,Flume提供對數據進行簡單處理,並寫到各種數據接受方(比如文本、HDFS、Hbase等)的能力 。

Flume主要由3個重要的組件購成:

  • Source:完成對日誌數據的收集,分成transtion 和 event 打入到channel之中。
  • Channel:主要提供一個隊列的功能,對source提供中的數據進行簡單的緩存。
  • Sink:取出Channel中的數據,進行相應的存儲文件系統,數據庫,或者提交到遠程服務器。

對現有程序改動最小的使用方式是使用是直接讀取程序原來記錄的日誌文件,基本可以實現無縫接入,不需要對現有程序進行任何改動。
對於直接讀取文件Source,有兩種方式:

  • ExecSource:以運行Linux命令的方式,持續的輸出最新的數據,如tail -F 文件名指令,在這種方式下,取的文件名必須是指定的。
  • SpoolSource:是監測配置的目錄下新增的文件,並將文件中的數據讀取出來。
    需要註意兩點:
  1. 拷貝到spool目錄下的文件不可以再打開編輯。

  2. spool目錄下不可包含相應的子目錄。

在實際使用的過程中,可以結合log4j使用,使用log4j的時候,將log4j的文件分割機制設為1分鐘一次,將文件拷貝到spool的監控目錄。log4j有一個TimeRolling的插件,可以把log4j分割的文件到spool目錄。基本實現了實時的監控。
Flume在傳完文件之後,將會修改文件的後綴,變為.COMPLETED(後綴也可以在配置文件中靈活指定)
ExecSource,SpoolSource對比:
ExecSource可以實現對日誌的實時收集,但是存在Flume不運行或者指令執行出錯時,將無法收集到日誌數據,無法何證日誌數據的完整性。SpoolSource雖然無法實現實時的收集數據,但是可以使用以分鐘的方式分割文件,趨近於實時。如果應用無法實現以分鐘切割日誌文件的話,可以兩種收集方式結合使用。

Channel有多種方式:
有MemoryChannel,JDBC Channel,MemoryRecoverChannel,FileChannel。MemoryChannel可以實現高速的吞吐,但是無法保證數據的完整性。MemoryRecoverChannel在官方文檔的建議上已經建義使用FileChannel來替換。FileChannel保證數據的完整性與一致性。在具體配置不現的FileChannel時,建議FileChannel設置的目錄和程序日誌文件保存的目錄設成不同的磁盤,以便提高效率。

Sink在設置存儲數據時,可以向文件系統中,數據庫中,hadoop中儲數據,在日誌數據較少時,可以將數據存儲在文件系中,並且設定一定的時間間隔保存數據。在日誌數據較多時,可以將相應的日誌數據存儲到Hadoop中,便於日後進行相應的數據分析。

flume安裝配置

flume安裝配置比較簡單,下載flume1.5.0二進制包 http://www.apache.org/dyn/closer.cgi/flume/1.5.0/apache-flume-1.5.0-bin.tar.gz
解壓即可 tar -zvxf apache-flume-1.5.0-bin.tar.gz

簡單實例

進入flume目錄,新建example.conf

# example.conf: A single-node Flume configuration
# Name the components on this agent
a1.sources = r1
a1.sinks = k1
a1.channels = c1
# Describe/configure the source
a1.sources.r1.type = exec
a1.sources.r1.command = echo ‘hello‘
# Describe the sink
a1.sinks.k1.type = logger
# Use a channel which buffers events in memory
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
# Bind the source and sink to the channel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1

啟動flume: bin/flume-ng agent --f example.conf --name a1 -Dflume.root.logger=INFO,console

輸出日誌:

14/06/19 18:16:29 INFO node.PollingPropertiesFileConfigurationProvider: Configuration provider starting
14/06/19 18:16:29 INFO node.PollingPropertiesFileConfigurationProvider: Reloading configuration file:example.conf
14/06/19 18:16:29 INFO conf.FlumeConfiguration: Added sinks: k1 Agent: a1
14/06/19 18:16:29 INFO conf.FlumeConfiguration: Processing:k1
14/06/19 18:16:29 INFO conf.FlumeConfiguration: Processing:k1
14/06/19 18:16:29 WARN conf.FlumeConfiguration: Invalid property specified: conf
14/06/19 18:16:29 WARN conf.FlumeConfiguration: Configuration property ignored: mple.conf = A single-node Flume configuration
14/06/19 18:16:29 WARN conf.FlumeConfiguration: Agent configuration for mple‘ does not contain any channels. Marking it as invalid.
14/06/19 18:16:29 WARN conf.FlumeConfiguration: Agent configuration invalid for agent mple‘. It will be removed.
14/06/19 18:16:29 INFO conf.FlumeConfiguration: Post-validation flume configuration contains configuration for agents: [a1]
14/06/19 18:16:29 INFO node.AbstractConfigurationProvider: Creating channels
14/06/19 18:16:29 INFO channel.DefaultChannelFactory: Creating instance of channel c1 type memory
14/06/19 18:16:29 INFO node.AbstractConfigurationProvider: Created channel c1
14/06/19 18:16:29 INFO source.DefaultSourceFactory: Creating instance of source r1, type exec
14/06/19 18:16:29 INFO sink.DefaultSinkFactory: Creating instance of sink: k1, type: logger
14/06/19 18:16:29 INFO node.AbstractConfigurationProvider: Channel c1 connected to [r1, k1]
14/06/19 18:16:29 INFO node.Application: Starting new configuration:{ sourceRunners:{r1=EventDrivenSourceRunner: { source:org.apache.flume.source.ExecSource{name:r1,state:IDLE} }} sinkRunners:{k1=SinkRunner: { policy:org.apache.flume.sink.DefaultSinkProcessor@1730d54 counterGroup:{ name:null counters:{} } }} channels:{c1=org.apache.flume.channel.MemoryChannel{name: c1}} }
14/06/19 18:16:29 INFO node.Application: Starting Channel c1
14/06/19 18:16:29 INFO instrumentation.MonitoredCounterGroup: Monitored counter group for type: CHANNEL, name: c1: Successfully registered new MBean.
14/06/19 18:16:29 INFO instrumentation.MonitoredCounterGroup: Component type: CHANNEL, name: c1 started
14/06/19 18:16:29 INFO node.Application: Starting Sink k1
14/06/19 18:16:29 INFO node.Application: Starting Source r1
14/06/19 18:16:29 INFO source.ExecSource: Exec source starting with command:echo hello‘
14/06/19 18:16:29 INFO instrumentation.MonitoredCounterGroup: Monitored counter group for type: SOURCE, name: r1: Successfully registered new MBean.
14/06/19 18:16:29 INFO instrumentation.MonitoredCounterGroup: Component type: SOURCE, name: r1 started
14/06/19 18:16:29 INFO source.ExecSource: Command [echo ‘hello‘] exited with 0
14/06/19 18:16:29 INFO sink.LoggerSink: Event: { headers:{} body: 27 68 65 6C 6C 6F 27                            hello‘ }

參考文檔

Flume(ng) 自定義sink實現和屬性註入

問題導讀:
1.如何實現flume端自定一個sink,來按照我們的規則來保存日誌?
2.想從flume的配置文件中獲取rootPath的值,該如何配置?

最近需要利用flume來做收集遠端日誌,所以學習一些flume最基本的用法。這裏僅作記錄。

遠端日誌收集的整體思路是遠端自定義實現log4j的appender把消息發送到flume端,flume端自定義實現一個sink來按照我們的規則保存日誌。

自定義Sink代碼:

public class LocalFileLogSink extends AbstractSink implements Configurable {
     private static final Logger logger = LoggerFactory
              . getLogger(LocalFileLogSink .class );
            private static final String PROP_KEY_ROOTPATH = "rootPath";
      private String rootPath;
     @Override
     public void configure(Context context) {
          String rootPath = context.getString(PROP_KEY_ROOTPATH );
          setRootPath(rootPath);
     }
           
          @Override
          public Status process() throws EventDeliveryException {
           logger .debug("Do process" );
       }
}

實現Configurable接口,即可在初始化時,通過configure方法從context中獲取配置的參數的值。這裏,我們是想從flume的配置文件中獲取rootPath的值,也就是日誌保存的根路徑。在flume-conf.properties中配置如下:
agent.sinks = loggerSink
agent.sinks.loggerSink.rootPath = ./logs
復制代碼

loggerSink是自定義sink的名稱,我們取值時的key,只需要loggerSink後面的部分即可,即這裏的rootPath。

實際業務邏輯的執行,是通過繼承復寫AbstractSink中的process方法實現。從基類的getChannel方法中獲取信道,從中取出Event處理即可。

Channel ch = getChannel();
           Transaction txn = ch.getTransaction();
         txn.begin();
          try {
              logger .debug("Get event." );
             Event event = ch.take();
             txn.commit();
             status = Status. READY ;
             return status;
                   }finally {
             Log. info( "trx close.");
             txn.close();
         }

Flume概述和簡單實例