實時流計算---資料採集工具Flume
Flume最早是Cloudera提供的一個高可用的,高可靠的,分散式的海量日誌採集、聚合和傳輸的系統。
Flume特性
1.提供上下文路由特徵
2.Flume的管道是基於事務,保證了資料在傳送和接收時的一致性
3.Flume是可靠的,容錯性高的,可升級的,易管理的,並且可定製的
4.Flume可用將應用產生的資料儲存到任何集中儲存器中,比如HDFS,HBase
5.可以被水平擴充套件
6.當收集資料的速度超過將寫入資料的時候,也就是當收集資訊遇到峰值時,這時候收集的資訊非常大,甚至超過了系統的寫入資料能力,這時候,Flume會在資料生產者和資料收容器間做出調整,保證其能夠在兩者之間提供平穩的資料
Flume核心概念
-
agent
flume最核心的角色就是agent。flume日誌採集系統是由一個個agent連線起來的資料傳輸通道
對於每個agent來說就是一個獨立的守護程序(JVM)。它負責從資料來源接收資料,併發送到下一個目的地。
agent內部有三個重要的元件:source、channel、sink -
source
從資料發生器接收資料,並將接收的資料以Event的形式傳遞給一個或多個channel,Flume提供多種資料接收方式,比如Avro,Thrift等。 -
channel
channel是一種短暫的儲存容器,它從source處接收到event格式資料後進行快取,直到被消費掉。
它在source和sink之間起到了橋樑作用,channel是一個完整的事務,這一點保障了資料在收發時的一致性,並且可以和任意數量的source和sink連線。
支援的型別有:JDBC channel,FileSystem Channel, Memory channel等。 -
sink
sink將資料儲存到集中儲存器比如Hbase和HDFS,它從channels消費資料(events)並將其傳遞給目標地,目標地可能是另一個sink,也可能HDFS,HBase -
Event
資料在flume內部是以Event封裝的形式存在的。因此source元件在獲取到原始資料後,需要封裝成Event後傳送到channel中,然後sink從channel取出Event後,根據配置要求再轉成其他的形式進行資料輸出。
Event封裝的物件主要有兩部分:Headers和Body
Headers是一個集合Map型別,用於儲存元資料(如標誌、描述等)
Body就是一個位元組陣列,裝載具體的資料內容 -
transaction
Flume的事務機制,類似於資料庫的事務機制
Flume使用獨立的事務分別從source到channel,以及從channel到sink的event傳遞。
注意:在任何時刻,Event至少在一個Channel是完整有效的 -
Interceptor
攔截器,攔截工作在source元件之後,source產生的event會被傳入的攔截器根據需要進行攔截處理。
攔截器可以組成攔截器鏈。
Flume元件詳解
Source
Source | Desc |
---|---|
Avor Source | 通過監聽一個網路埠來接受資料,而且接受的資料必須是使用Avor序列化框架序列化後的資料。 |
Thrift Source | 監聽Thrift埠並從外部Thrift客戶端流接收事件 |
Exec Source | 啟動一個使用者所指定的linux shell命令,採集這個Linux shell命令的標準輸出,作出收集到的資料,轉為event寫入channel |
JMS Source | 從JMS目標(例如佇列或主題)讀取訊息;作為JMS應用程式,它應可與任何JMS提供程式一起使用,但僅經過ActiveMQ的測試;注意,應該使用plugins.d目錄(首選),命令列上的-classpath或通過flume-env.sh中的FLUME_CLASSPATH變數將提供的JMS jar包含在Flume類路徑中。 |
Spooling Directory Source | 監視一個指定的資料夾,如果資料夾下有沒采集過的新檔案,則將這些新檔案中的資料採集,並轉成event寫入channel。(注意:spooling目錄中的檔案必須是不可變的,而且是不能重名的!否則,source會loudly fail !) |
Taildir Source | 監視指定目錄下的一批檔案,只要某個檔案中有新寫入的行,則會被tail到;它會記錄每一個檔案所tail到的位置,記錄到一個指定的positionfile儲存目錄中,格式為json(如果需要的時候,可以人為修改,就可以讓source從任意指定的位置開始讀取資料);它對採集完成的檔案,不會做任何修改。(公司專案採用的Taildir Source) |
Kafka Source | 就是用Kafka Consumer連線Kafka,讀取資料,然後轉換成event,寫入channel |
NetCat Source | 啟動一個socket服務,監聽一個埠,將埠上收到的資料,轉成event寫入channel |
Sequence Generator Source | 一個簡單的序列生成器,它使用從0開始,遞增1並在totalEvents處停止的計數器連續生成事件;當無法傳送event到channel時會進行重試。通常用於測試。 |
Syslog Sources | 讀取系統日誌資料生成event |
Http Source | 通過http post/get來接收資料,通常get用於測試,該source基於Jetty9.4,並提供了設定其他特定於Jetty的引數的功能,這些引數將直接傳遞給Jetty元件 |
Stress Source | 主要用於壓測,用於可以配置要發生的event總數以及要傳送成功event的最大數 |
Custom Source | 自定義Source |
taildir Source | 監聽指定目錄的一批檔案,只要某個檔案被寫入,那麼就會被tail到。這裡原理其實就是source會記錄每個檔案所讀取到的位置,然後記錄到一個指定的positionfile目錄檔案中,通常為json格式,而且是可見的,因此可以人為修改。由於該種機制,可以實現從任意指定位置讀取資料,所以這個source是可以保障可靠性的。但是會有資料重複的問題。 |
Channel
Channel | Desc |
---|---|
Memory Channel | event儲存在記憶體中,且可以配置最大值。對於需要高吞吐而且可以容忍資料丟失的情況下,可以選擇該channel |
JDBC Channel | event被持久到資料庫中,目前支援derby.適用於可恢復的場景 |
Kafka Channel | agent利用Kafka作為channel資料快取,Kafka Channel要跟Kafka Source,Kafka sink區別開來,Kafka Cannel在應用時,可以沒有source |
File Channel | event被快取在本地磁碟檔案中,可靠性高,不會丟失;但在極端情況下可能會重複資料 |
Spillable Memory Channel | event儲存在記憶體和磁碟上。記憶體充當主儲存,磁碟充當溢位 |
Sink
Sink | Desc |
---|---|
HDFS Sink | 資料最終被髮往hdfs,可以生成text檔案或sequence檔案,而且支援壓縮;支援生成檔案的週期性roll機制;基於檔案size,或者時間間隔,或者event數量;目標路徑,可以使用動態萬用字元替換,比如用%D代表當前日期;當然,它也能從event的header中,取到一些標記來作為萬用字元替換 |
Hive Sink | 可將text或json資料直接儲存到hive分割槽表 |
Logger Sink | 資料輸出到日誌中,通常用於debug |
Avro Sink | avro sink用來向avro source 傳送avro序列化資料,這樣就可以實現agent之間的級聯 |
Thrift Sink | 同avro sink |
IRC Sink | 同avro sink |
File Roll Sink | 資料儲存到本地檔案系統 |
Null Sink | 直接丟棄 |
Hbase Sink | 資料儲存到hbase中 |
Hbase2 Sink | 等同於hbase 2版本的HBaseSink |
AsyncHBaseSink | 非同步模式寫入hbase |
MorphlineSolrSink | 該接收器從Flume事件中提取資料,對其進行轉換,並將其幾乎實時地載入到Apache Solr伺服器中,後者再為終端使用者或搜尋應用程式提供查詢 |
ElasticSearchSink | 直接儲存到es中 |
Kite Dataset Sink | 將事件寫入Kite資料集。該接收器將反序列化每個傳入事件的主體,並將結果記錄儲存在Kite資料集中。它通過按URI載入資料集來確定目標資料集 |
Kafka Sink | 儲存到Kafka中 |
HTTP Sink | 將接收到的資料通過post請求傳送到遠端服務,event內容作為請求體傳送 |
Custom Sink | 自定義Sink |
- Interceptor
攔截器,就是工作在source之後,可以從source獲得event,做一個邏輯處理,然後再返回處理之後的event。這也就可以讓使用者不需要改動source程式碼的情況下,插入一些處理邏輯。
Interceptor | Desc |
---|---|
host | 往event的header中插入主機名資訊 |
timestamp | 向event中,寫入一個kv到header裡;key的名稱可以隨意配置,value就是當前時間戳 |
static | 讓使用者往event中新增一個自定義header,key-value形式的,當然這個kv在配置檔案中是寫死的 |
regex_filter | 將event中的body內容和指定的正則表示式進行匹配 |
custom type as FQCN | 自定義實現攔截器 |
uuid | 用於在每個event header中生成一個uuid字串 |
search_replace | 該攔截器基於Java正則表示式提供簡單的基於字串的搜尋和替換功能,類似於Java中的Matcher.replaceAll方法 |
RegexExtractorInterceptorMillisSerializer | 該攔截器使用指定的正則表示式提取正則表示式匹 |
配組,並將匹配組附加到事件的header裡
- Selector
一個source可以對接多個channel,那麼問題來了,source的資料是怎麼在多個channel之間進行傳遞的呢?這就是selector的功能了,通過selector選擇器根據策略可以將event從source傳遞到指定的channel中去。
Selector | DESC |
---|---|
replication selector | 預設的選擇器,將event進行復制分發給下游所有的節點 |
Multiplexing selector | 多路選擇器,可以根據event中的一個指定key對應的value來決定這條訊息會被寫入到哪個channel中 |
Custom Selector | 自定義選擇器 |
- Processor
一個agent中,多個sink可以被組裝到一個組中,而資料在組內多個sink之間傳送。接收處理器可以在組內提供負載均衡的功能,或者是在臨時故障的情況下實現從一個接收器轉移到另一個接收器上。
Processor | DESC |
---|---|
default | 預設的接收處理器僅接受一個sink,當然使用者也沒有必要為了一個sink去建立processor |
Failover | 故障轉移模式,即一個組內只有優先順序高的sink在工作,而其他的sink處於等待中 |
load_balance | 負載均衡模式,允許channel中的資料在一組sink中的多個sink之間進行輪轉,具體的策略有:round-robin(輪流傳送);random(隨機發送) |
Custom processor | 自定義處理器 |
Flume安裝部署與使用