1. 程式人生 > >flume-ng收集windows日誌筆記

flume-ng收集windows日誌筆記

flume-ng收集windows下的日誌並且使用SPILLABLEMEMORY 方式,日誌實時收集使用spoolDirTailFile並且寫入到rabbitmq

1.下載spoolDirTailFile 用到的jar 地址https://github.com/ningg/flume-ng-extends-source

2.下載rabbitmq sink用到的 jar  地址https://github.com/aweber/rabbitmq-flume-plugin

3.配置 flume-ng-config

agent.channels = c1
agent.sources = spoolDirTailFile

agent.channels = c1
agent.channels.c1.type = SPILLABLEMEMORY
agent.channels.c1.memoryCapacity = 10000
agent.channels.c1.overflowCapacity = 1000000
agent.channels.c1.byteCapacity = 800000
agent.channels.c1.checkpointDir =C:\\log\\checkpoint
agent.channels.c1.dataDirs = C:\\log\\data


agent.sinks.k1.channel = c1
agent.sinks.k1.type =com.aweber.flume.sink.rabbitmq.RabbitMQSink
agent.sinks.k1.host = 主機地址
agent.sinks.k1.port = 5672
agent.sinks.k1.virtual-host =mq-host
agent.sinks.k1.username = 使用者名稱
agent.sinks.k1.password = 密碼
agent.sinks.k1.exchange = mq-佇列名稱
agent.sinks.k1.routing-key = mq-key
agent.sinks.k1.publisher-confirms = true


agent.sinks=k1


# Spooling dir and tail file Source 
agent.sources.spoolDirTailFile.type = com.github.ningg.flume.source.SpoolDirectoryTailFileSource
# on WIN plantform spoolDir should be format like: E:/program files/spoolDir
# Note: the value of spoolDir MUST NOT be surrounded by quotation marks.
agent.sources.spoolDirTailFile.spoolDir = C:\\log\\file
agent.sources.spoolDirTailFile.fileSuffix = .COMPLETED
agent.sources.spoolDirTailFile.deletePolicy = never 
agent.sources.spoolDirTailFile.ignorePattern = ^$
agent.sources.spoolDirTailFile.targetPattern = .*(\\d){10}.*
agent.sources.spoolDirTailFile.targetFilename = yyyyMMddhh
agent.sources.spoolDirTailFile.trackerDir = .flumespooltail
agent.sources.spoolDirTailFile.consumeOrder = oldest
agent.sources.spoolDirTailFile.batchSize = 100
agent.sources.spoolDirTailFile.inputCharset = UTF-8
agent.sources.spoolDirTailFile.decodeErrorPolicy = REPLACE
agent.sources.spoolDirTailFile.deserializer = LINE
agent.sources.spoolDirTailFile.channels= c1
agent.sources.spoolDirTailFile.fileHeader= true




以下是配置多個agent 供大家參考
agent2.channels = c2
agent2.sources = spoolDirTailFile2


agent2.channels.c2.type = memory

agent2.sinks.k2.channel = c2
agent2.sinks.k2.type = logger


agent2.sinks=k2




# Spooling dir and tail file Source 
agent2.sources.spoolDirTailFile2.type = com.github.ningg.flume.source.SpoolDirectoryTailFileSource
# on WIN plantform spoolDir should be format like: E:/program files/spoolDir
# Note: the value of spoolDir MUST NOT be surrounded by quotation marks.
agent2.sources.spoolDirTailFile2.spoolDir = C:\\log1
agent2.sources.spoolDirTailFile2.fileSuffix = .COMPLETED
agent2.sources.spoolDirTailFile2.deletePolicy = never 
agent2.sources.spoolDirTailFile2.ignorePattern = ^$
agent2.sources.spoolDirTailFile2.targetPattern = .*(\\d){4}-(\\d){2}-(\\d){2}.*
agent2.sources.spoolDirTailFile2.targetFilename = yyyy-MM-dd
agent2.sources.spoolDirTailFile2.trackerDir = .flumespooltail
agent2.sources.spoolDirTailFile2.consumeOrder = oldest
agent2.sources.spoolDirTailFile2.batchSize = 100
agent2.sources.spoolDirTailFile2.inputCharset = UTF-8
agent2.sources.spoolDirTailFile2.decodeErrorPolicy = REPLACE
agent2.sources.spoolDirTailFile2.deserializer = LINE
agent2.sources.spoolDirTailFile2.channels= c2
agent2.sources.spoolDirTailFile2.fileHeader= true









4.新建bat,windows下啟動和linux下使用flume-ng 稍有不同

set FLUME_HOME=C:\apache-flume-1.6.0-bin
set JAVA="%JAVA_HOME%\bin\java.exe"
set JAVA_OPTS=-Xmx512m
set CONF=%FLUME_HOME%\conf\flume-conf.properties
set AGENT=agent
%JAVA%  %JAVA_OPTS%  -Dlog4j.configuration=file:\\\%FLUME_HOME%\conf\log4j.properties -cp "%FLUME_HOME%\lib\*" org.apache.flume.node.Application -f %FLUME_HOME%\conf\flume-conf.properties -n   agent
:: %JAVA%  %JAVA_OPTS%  -Dlog4j.configuration=file:\\\%FLUME_HOME%\conf\log4j.properties -cp "%FLUME_HOME%\lib\*" org.apache.flume.node.Application -f %FLUME_HOME%\conf\flume-conf.properties -n   agent2

最後附上一些蒐集的使用心得:

一、關於Source:

  1、spool-source:適合靜態檔案,即檔案本身不是動態變化的;

  2、avro source可以適當提高執行緒數量來提高此source效能;

  3、ThriftSource在使用時有個問題需要注意,使用批量操作時出現異常並不會列印異常內容而是'Thrift source %s could not append events to the channel.',這是因為原始碼中在出現異常時,它並未捕獲異常而是獲取元件名稱,這是原始碼中的一個bug,也可以說明thrift很少有人用,否則這個問題也不會存在在很多版本中;

  4、如果一個source對應多個channel

,預設就是每個channel是同樣的一份資料,會把這批資料複製N份傳送到N個channel中,所以如果某個channel滿了會影響整體的速度的哦;

  5、ExecSource官方文件已經說明是非同步的,可能會丟資料哦,儘量使用tail -F,注意是大寫的;

二、關於Channel

  1、採集節點建議使用新的複合型別的SpillableMemoryChannel,彙總節點建議採用memory channel,具體還要看實際的資料量,一般每分鐘資料量超過120MB大小的flume agent都建議用memory channel(自己測的file channel處理速率大概是2M/s,不同機器、不同環境可能不同,這裡只提供參考),因為一旦此agent的channel出現溢位情況,將會導致大多數時間處於file channel(SpillableMemoryChannel本身是file channel的一個子類,而且複合channel會保證一定的event的順序的使得讀完記憶體中的資料後,再需要把溢位的拿走,可能這時記憶體已滿又會溢位。。。),效能大大降低,彙總一旦成為這樣後果可想而知;

  2、調整memory 佔用實體記憶體空間,需要兩個引數byteCapacityBufferPercentage(預設是20)和byteCapacity(預設是JVM最大可用記憶體的0.8)來控制,計算公式是:byteCapacity = (int)((context.getLong('byteCapacity', defaultByteCapacity).longValue() * (1 - byteCapacityBufferPercentage * .01 )) /byteCapacitySlotSize),很明顯可以調節這兩個引數來控制,至於byteCapacitySlotSize預設是100,將實體記憶體轉換成槽(slot)數,這樣易於管理,但是可能會浪費空間,至少我是這樣想的。。。;

  3、還有一個有用的引數'keep-alive'這個引數用來控制channel滿時影響source的傳送,channel空時影響sink的消費,就是等待時間,預設是3s,超過這個時間就甩異常,一般不需配置,但是有些情況很有用,比如你得場景是每分鐘開頭集中發一次資料,這時每分鐘的開頭量可能比較大,後面會越來越小,這時你可以調大這個引數,不至於出現channel滿了得情況;

三、關於Sink:

  1、avro sink的batch-size可以設定大一點,預設是100,增大會減少RPC次數,提高效能;

  2、內建hdfs sink的解析時間戳來設定目錄或者檔案字首非常損耗效能,因為是基於正則來匹配的,可以通過修改原始碼來替換解析時間功能來極大提升效能,稍後我會寫一篇文章來專門說明這個問題;

  3、RollingFileSink檔名不能自定義,而且不能定時滾動檔案,只能按時間間隔滾動,可以自己定義sink,來做定時寫檔案;

  4、hdfs sink的檔名中的時間戳部分不能省去,可增加字首、字尾以及正在寫的檔案的前後綴等資訊;'hdfs.idleTimeout'這個引數很有意義,指的是正在寫的hdfs檔案多長時間不更新就關閉檔案,建議都配置上,比如你設定瞭解析時間戳存不同的目錄、檔名,而且rollInterval=0、rollCount=0、rollSize=1000000,如果這個時間內的資料量達不到rollSize的要求而且後續的寫入新的檔案中了,就是一直開啟,類似情景不注意的話可能很多;'hdfs.callTimeout'這個引數指的是每個hdfs操作(讀、寫、開啟、關閉等)規定的最長操作時間,每個操作都會放入'hdfs.threadsPoolSize'指定的執行緒池中得一個執行緒來操作;

  5、關於HBase sink(非非同步hbase sink:AsyncHBaseSink),rowkey不能自定義,而且一個serializer只能寫一列,一個serializer按正則匹配多個列,效能可能存在問題,建議自己根據需求寫一個hbase sink;

  6、avro sink可以配置failover和loadbalance,所用的元件和sinkgroup中的是一樣的,而且也可以在此配置壓縮選項,需要在avro source中配置解壓縮;

四、關於SinkGroup:

  1、不管是loadbalance或者是failover的多個sink需要共用一個channel

  2、loadbalance的多個sink如果都是直接輸出到同一種裝置,比如都是hdfs,效能並不會有明顯增加,因為sinkgroup是單執行緒的它的process方法會輪流呼叫每個sink去channel中take資料,並確保處理正確,使得是順序操作的,但是如果是傳送到下一級的flume agent就不一樣了,take操作是順序的,但是下一級agent的寫入操作是並行的,所以肯定是快的;

  3、其實用loadbalance在一定意義上可以起到failover的作用,生產環境量大建議loadbalance;

五、關於監控monitor:

  1、監控我這邊做得還是比較少的,但是目前已知的有以下幾種吧:cloudera manager(前提是你得安裝CDH版本)、ganglia(這個天生就是支援的)、http(其實就是將統計資訊jmx資訊,封裝成json串,使用jetty展示在瀏覽器中而已)、再一個就是自己實現收集監控資訊,自己做(可以收集http的資訊或者自己實現相應的介面實現自己的邏輯,具體可以參考我以前的部落格);

  2、簡單說一下cloudera manager這種監控,最近在使用,確實很強大,可以檢視實時的channel進出資料速率、channel實時容量、sink的出速率、source的入速率等等,圖形化的東西確實很豐富很直觀,可以提供很多flume agent整體執行情況的資訊和潛在的一些資訊;

六、關於flume啟動:

  1、flume元件啟動順序:channels——>sinks——>sources,關閉順序:sources——>sinks——>channels;

  2、自動載入配置檔案功能,會先關閉所有元件,再重啟所有元件;

  3、關於AbstractConfigurationProvider中的Map<Class<? extends Channel>, Map<String, Channel>> channelCache這個物件,始終儲存著agent中得所有channel物件,因為在動態載入時,channel中可能還有未消費完的資料,但是需要對channel重新配置,所以用以來快取channel物件的所有資料及配置資訊;

  4、通過在啟動命令中新增'no-reload-conf'引數為true來取消自動載入配置檔案功能;

七、關於interceptor:

  請看我的關於這個元件的部落格,傳送門;

八、關於自定義元件:sink、source、channel

  1、channel不建議自定義哦,這個要求比較高,其他倆都是框架式的開發,往指定的方法填充自己配置、啟動、關閉、業務邏輯即可,以後有機會單獨寫一篇文章來介紹;

  2、關於自定義元件請相信github,上面好多好多好多,可以直接用的自定義元件....;