大資料學習之Flume篇——未完待續
Flume作為現在最常用的日誌收集工具之一,目前已經更新到了1.8.0版本,我們以最新版本開始進行我們的學習。整個學習過程以官方文件為主,輔助是參考了SteveHoffman編寫的《Flume日誌收集與MapReduce模式》一書。
首先要明確幾個定義: Source、Channel、Sink
Source:源;表示我們收集日誌的資料來源
Channel:通道;表示資料傳輸過程中途徑的通道,我們可以在源到Sink中間進行一些處理操作
Sink:目的地;表示我們需要最終要將收集到的日誌輸出到的地方
Flume官方支援的幾種常見Source的實現:(這裡面我只詳細寫了我應用過/測試過的Source型別)
- Avro Source:Avro Netty RPC event source
- exec Source:Execute a long-lived Unix process and read from stdout
- Thrift Source
- JMS Source
- Spooling Directory Source:
- Taildir Source
- Twitter 1% firehose Source
- Kafka Source
- NetCat TCP Source
- NetCat UDP Source
- Sequence Generator Source
- Syslog Sources
- HTTP Source
- Stress Source
- Legacy Sources
- Custom Source
- Scribe Source
Flume官方支援的幾種常見Sink的實現:
- Flume Sinks
- HDFS Sink
- Hive Sink
- Logger Sink
- Avro Sink
- Thrift Sink
- IRC Sink
- File Roll Sink
- Null Sink
- HBaseSinks
- HBaseSink
- AsyncHBaseSink
- MorphlineSolrSink
- ElasticSearchSink
- Kite Dataset Sink
- Kafka Sink
- HTTP Sink
- Custom Sink
Flume官方支援的Channel的實現:
- Flume Channels
- Memory Channel
- JDBC Channel
- Kafka Channel
- File Channel
- Spillable Memory Channel
- Pseudo Transaction Channel
- Custom Channel
我用過的幾種組合:
- Spooldir / Exec / Avro Source + Memory Channel + Kafka Sink
瞭解Flume基本的組成之後,我以實際的業務場景為例來講述最終如何落地實踐,(關於Flume運維的知識我們放在最後講)我們現在就以單點為基礎,目的是讓大家快速的瞭解、實踐。
專案背景: 目前公司整體架構採用的微服務架構、分散式部署,各個業務由多個服務組成,由於業務的不斷迭代和複雜性日益加深,在各個服務呼叫的邏輯上越來越難梳理,排查問題也越來越困難(有人會說,不是可以做分散式跟蹤嗎?這個應該是微服務架構生態下所具備的一個功能啊),但是由於企業的快速發展,這部分建設並不完善,也就是沒有對應的平臺來支援,同時類似於阿里系的APM應用級的監控,也並不能完全解決我們日常迭代中的需要,因為們可能需要的時候更多的是業務日誌,也就是服務內部處理過程中對資料操作的日誌,這部分需要開發同學自主的去記錄。那麼我們現在就需要這樣一個平臺/環境來對這部分日誌進行收集、清洗、展示,來輔助我們開發同學定位、排查問題,幫助測試同學提升測試效率,做到儘可能的全面覆蓋。
技術選型:在方案調研期間,曾經考慮過多種,結合實際場景分析,期望對業務日誌直接使用,並且儘可能的減少對RD程式碼的修改工作。現有的日誌記錄分為兩種:1.log4j 2.IO寫檔案(不要追問為什麼…遺留問題…);於是各種方案應運而生:
- Flume基於檔案目錄的模式對日誌檔案進行收集
- 使用Exec + tail 模式對日誌檔案監聽,實時收集
- 使用log4j2直接分發至Flume的Avro源,實時收集
下面分析一下為什麼選擇了這幾種方案: (最終沒有采用Flume,後面會講,當前只是結合講解Flume的使用)
第一種方案是最直接的也就是不用動腦子的,也是最容易的;
參考配置:
第一種方案分析:此時Flume會監控指定的目錄檔案,也就是說當有日誌檔案產生的時候,Flume就會讀取資料,並在完成時修改檔案狀態或者刪除(加標識表示已經處理完成),但是這樣會影響真實業務的資料日誌記錄,如果說是隔日提取,這種方案是可以的,也就是說提取的是前一天或者N天的資料,原因是我們一般都是按天+檔案大小去記錄日誌的。所以這種方案想做實時被Pass了。
第二種方案是在第一種方案失敗後想到的,因為想做實時,那麼就用了tail -f的模式:
參考配置:
第二種方案分析:雖然表面上完成了實時日誌的收集,但是存在著大量的隱患工作,我們在查閱官方文件時方向,Flume已經摒棄了這種tail模式,從而衍生出了exec + tail 模式,但是這樣依舊會存線上程後臺存活的問題,也就是說當tail -f這個執行緒出現異常或者Flume代理關閉或者重啟時,派生出來的程序不能保證100%關閉,就會產生永不退出的孤立tail程序,那麼其佔用的系統資源就一直無法被釋放,根據定義,tail -f是沒有結束的,即使是刪掉了被tail的檔案,執行中的tail程序也會一直開啟該檔案控制代碼,直至系統資源被耗盡。在官方文件中,明確的指出:非常不建議使用該模式,可能會引發未知的災難。
第三種方案分析: 能否讓日誌直接寫入Flume的Agent呢?這樣就可以解決實時的問題,又不會帶來額外的風險。答案是可以的,Log4J2已經支援了這種模式,此時就需要在log4j2上做了配置了,這種方案的利弊又是如何呢?
log4j2參考配置如下:
A sample FlumeAppender configuration that is configured with a primary and a secondary agent using Flume configuration properties, compresses the body, formats the body using RFC5424Layout and passes the events to an embedded Flume Agent.
<?xml version="1.0" encoding="UTF-8"?>
<Configuration status="error" name="MyApp" packages="">
<Appenders>
<Flume name="eventLogger" compress="true" type="Embedded">
<Property name="channels">file</Property>
<Property name="channels.file.type">file</Property>
<Property name="channels.file.checkpointDir">target/file-channel/checkpoint</Property>
<Property name="channels.file.dataDirs">target/file-channel/data</Property>
<Property name="sinks">agent1 agent2</Property>
<Property name="sinks.agent1.channel">file</Property>
<Property name="sinks.agent1.type">avro</Property>
<Property name="sinks.agent1.hostname">192.168.10.101</Property>
<Property name="sinks.agent1.port">8800</Property>
<Property name="sinks.agent1.batch-size">100</Property>
<Property name="sinks.agent2.channel">file</Property>
<Property name="sinks.agent2.type">avro</Property>
<Property name="sinks.agent2.hostname">192.168.10.102</Property>
<Property name="sinks.agent2.port">8800</Property>
<Property name="sinks.agent2.batch-size">100</Property>
<Property name="sinkgroups">group1</Property>
<Property name="sinkgroups.group1.sinks">agent1 agent2</Property>
<Property name="sinkgroups.group1.processor.type">failover</Property>
<Property name="sinkgroups.group1.processor.priority.agent1">10</Property>
<Property name="sinkgroups.group1.processor.priority.agent2">5</Property>
<RFC5424Layout enterpriseNumber="18060" includeMDC="true" appName="MyApp"/>
</Flume>
<Console name="STDOUT">
<PatternLayout pattern="%d [%p] %c %m%n"/>
</Console>
</Appenders>
<Loggers>
<Logger name="EventLogger" level="info">
<AppenderRef ref="eventLogger"/>
</Logger>
<Root level="warn">
<AppenderRef ref="STDOUT"/>
</Root>
</Loggers>
</Configuration>
上述的三種配置Channel都是基於記憶體的模式,這是為了保證傳輸速度,當然現在固態硬碟的速度已經很快了,用檔案的模式也是可以的,而我在使用的時候,由於偷懶,只使用了記憶體的通道。
相關推薦
大資料學習之Flume篇——未完待續
Flume作為現在最常用的日誌收集工具之一,目前已經更新到了1.8.0版本,我們以最新版本開始進行我們的學習。整個學習過程以官方文件為主,輔助是參考了SteveHoffman編寫的《Flume日誌收集與MapReduce模式》一書。 首先要明確幾個定義: Sou
大資料學習之小白如何學大資料?(詳細篇)
大資料這個話題熱度一直高居不下,不僅是國家政策的扶持,也是科技順應時代的發展。想要學習大資料,我們該怎麼做呢?大資料學習路線是什麼?先帶大家瞭解一下大資料的特徵以及發展方向。 大資料的三個發展方向,平臺搭建/優化/運維/監控、大資料開發/設計/架構、資料分析/挖掘。 先說一下大資料的4V特徵: 資料
19.大資料學習之旅——flume介紹
flume介紹 概述 Flume最早是Cloudera提供的日誌收集系統,後貢獻給Apache。所以目前是Apache下的專案,Flume支援在日誌 系統中定製各類資料傳送方,用於收集資料。 Flume是一個高可用的,高可靠的魯棒性(robust 健壯性),分散式的海量日誌採集、聚合
大資料學習筆記——Java篇之集合框架(ArrayList)
Java集合框架學習筆記 1. Java集合框架中各介面或子類的繼承以及實現關係圖: 2. 陣列和集合類的區別整理: 陣列: 1. 長度是固定的 2. 既可以存放基本資料型別又可以存放引用資料型別 3. 存放進陣列的必須是相同型別的資料 VS 集合類: 1. 長度是可變的 2. 只能存放物件的
大資料學習之SPARK計算天下
學習大資料技術,SPARK無疑是繞不過去的技術之一,它的重要性不言而喻,本文將通過提問的形式圍繞著SPARK進行介紹,希望對大家有幫助,與此同時,感謝為本文提供素材的科多大資料的武老師。 為了輔助大家更好去了解大資料技術,本文集中討論Spark的一系列技術問題,大家在學習過程中如果遇到困難,可以
大資料學習之HDP SANDBOX開始學習
大資料學習之HDP SANDBOX開始學習 2017年05月07日 17:33:45 三名狂客 閱讀數:2167 HDP HDP是什麼? HDP全稱叫做Hortonworks Data Platform。 Hortonworks資料平臺是一款基於Apa
大資料學習之路87-SparkSQL的執行結果以不同方式寫出,及載入
我們可以將我們之前寫的wordcount的結果寫成各種格式: csv格式: 程式碼如下: package com.test.SparkSQL import org.apache.avro.generic.GenericData.StringType import org.apach
大資料學習之路91-Hadoop的高可用
我們之前一直沒有配置過hadoop的高可用,今天我們就來配置一下 之前我們的namenode只要一掛,則整個hdfs叢集就完蛋。雖然我們可以通過重啟的方式來恢復,可是我們重啟好之前,我們的hdfs叢集就不能提供服務了。所以它存在單點故障問題。 我們可以設定兩臺namenode ,一臺為a
大資料學習之路90-sparkSQL自定義聚合函式UDAF
什麼是UDAF?就是輸入N行得到一個結果,屬於聚合類的。 接下來我們就寫一個求幾何平均數的一個自定義聚合函式的例子 我們從開頭寫起,先來看看需要進行計算的數如何產生: package com.test.SparkSQL import java.lang import org
大資料學習之路89-sparkSQL自定義函式計算ip歸屬地
使用sparkSQL當遇到業務邏輯相關的時候,就有可能會搞不定。因為業務l邏輯需要寫很多程式碼,呼叫很多介面。這個時候sql就搞不定了。那麼這個時候我們就會想能不能將業務邏輯嵌入到sql中? 這種就類似於我們在hive中使用過的自定義函式UDF(user define function使用者
大資料學習之路95-SparkStreaming寫WordCount
程式如下: package com.test.sparkStreaming import org.apache.spark.streaming.dstream.{DStream, ReceiverInputDStream} import org.apache.spark.streaming
大資料學習之路94-kafka叢集安裝
解壓 Kafka 安裝包 修改配置檔案 config/server.properties vi server.properties broker.id=0 //為依次增長的:0、1、2、3、4,叢集中唯一id log.dirs=/kafkaData/logs // Kafka
大資料學習之路93-kafka簡介
kafka是實時計算中用來做資料收集的,它是一個訊息佇列。它使用scala開發的。 那麼我們就會想我們這裡能不能用hdfs做資料儲存呢?它是分散式的,高可用的。 但是它還缺少一些重要的功能:比如說我們往hdfs中寫資料,之後我們需要實時的讀取。當我們讀到某一行的時候斷掉了,假如說這個讀取
大資料學習之路92-sparkSQL整合hive
我們知道sparkSQL跟hive是相容的,他支援hive的元資料庫,sql語法,多種型別的UDF, 而且還支援hive的序列化和反序列化方式,意思就是hive寫的自定義函式,spark拿過來就能用。 最重要的就是MetaStore元資料庫,以後一旦我們使用hive的Meta
大資料學習之路98-Zookeeper管理Kafka的OffSet
我們之前的OffSet都是交給broker自己管理的,現在我們希望自己管理。 我們可以通過zookeeper進行管理。 我們在程式中想要使用zookeeper,那麼就肯定會有api允許我們操作。 new ZKGroupTopicDirs() 注意:這裡使用客戶端的時候導包為:
大資料學習之路97-kafka直連方式(spark streaming 整合kafka 0.10版本)
我們之前SparkStreaming整合Kafka的時候用的是傻瓜式的方式-----createStream,但是這種方式的效率很低。而且在kafka 0.10版本之後就不再提供了。 接下來我們使用Kafka直連的方式,這種方式其實是呼叫Kafka底層的消費資料的API,我們知道,越底層的東
大資料學習之路96-SparkStreaming整合Kafka
我們前面SparkStreaming獲取資料的來源是TCP,但是平常是不會這麼用的,我們通常用的是Kafka。 SparkStreamingContext是不直接提供對Kafka的訪問的。 這個時候就有KafkaUtils 這裡有兩個方法 1.createDirectStream
大資料學習之路103-redis的分片代理
哨兵的出現是為了實現主節點的HA,那麼從節點會不會出現問題呢? 假如所有的讀取操作都在從節點6380上,那麼6380節點就會很累,而6381節點就會很清閒。 這個時候就需要負載均衡,我們這裡的負載均衡需要通過代理伺服器來實現。我們可以將需要訪問的從節點的位置配置在代理伺服器上。
大資料學習之路102-redis的哨兵機制
哨兵的HA會通過修改配置檔案來實現主節點的切換。 只有主節點有寫許可權,從節點只能讀。 我們接下來實驗一下: 首先我們將redis的三個埠的服務啟動起來: 然後我們啟動哨兵: 接下來我們強制將主節點關掉,看哨兵會做什麼事? 我們可以看到此時的
大資料學習之路101-redis的持久化詳解及主從複製
接下來我們配置一下主從結構的星型模型: 首先將配置檔案複製3份, 然後修改主節點的配置檔案: 首先關閉RDB: 然後關閉AOF: 修改第二個配置檔案: 先修改埠,他不能和主節點的埠衝突: 為了區分是哪個節點打的日誌,我們還需要