1. 程式人生 > >Flume怎麼保證資料傳輸的完整性

Flume怎麼保證資料傳輸的完整性

@Author  : Spinach | GHB
@Link    : http://blog.csdn.net/bocai8058

Flume的事物機制

Flume使用兩個獨立的事務分別負責從soucrce到channel,以及從channel到sink的事件傳遞。

比如:spooling directory source 為檔案的每一行建立一個事件,一旦事務中所有的事件全部傳遞到channel且提交成功,那麼source就將該檔案標記為完成。同理,事務以類似的方式處理從channel到sink的傳遞過程,如果因為某種原因使得事件無法記錄,那麼事務將會回滾。且所有的事件都會保持到channel中,等待重新傳遞。

# example.conf: A single-node Flume configuration

# Name the components on this agent
a1.sources = r1
a1.channels = c1
a1.sinks = k1

# Describe/configure the source
a1.sources.r1.type = spooldir
a1.sources.r1.spoolDir = /home/ghb/HadoopCluster/Call
a1.sources.r1.fileHeader = true
a1.sources.r1.fileSuffix = .delete
a1.sources.r1.batchSize = 100

# Use file channel
a1.channels.c1.type = file 
a1.channels.c1.checkpointDir=/home/ghb/HadoopCluster/flume-1.6.0/checkpoint 
a1.channels.c1.dataDirs=/home/ghb/HadoopCluster/flume-1.6.0/dataDir

# Describe the sink
a1.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink
a1.sinks.k1.kafka.topic = CallLog
a1.sinks.k1.kafka.bootstrap.servers =127.0.0.1:6667
a1.sinks.k1.kafka.flumeBatchSize = 20
a1.sinks.k1.kafka.producer.acks=1

Flume的At-least-once提交方式

Flume的事務機制,總的來說,保證了source產生的每個事件都會傳送到sink中。但是值得一說的是,實際上Flume作為高容量並行採集系統採用的是At-least-once(傳統的企業系統採用的是exactly-once機制)提交方式,這樣就造成每個source產生的事件至少到達sink一次,換句話說就是同一事件有可能重複到達。這樣雖然看上去是一個缺陷,但是相比為了保證Flume能夠可靠地將事件從source,channel傳遞到sink,這也是一個可以接受的權衡。如spooldir的使用,Flume會對已經處理完的資料進行標記。

Flume的批處理機制

為了提高效率,Flume儘可能的以事務為單位來處理事件,而不是逐一基於事件進行處理。比如上篇部落格提到的spooling directory source以100行文字作為一個批次來讀取(BatchSize屬性來配置,類似資料庫的批處理模式)。批處理的設定尤其有利於提高file channle的效率,這樣整個事務只需要寫入一次本地磁碟,或者呼叫一次fsync,速度回快很多。

channel配置說明

  1. MemoryChannel可以實現高速的吞吐, 但是無法保證資料完整性
  2. MemoryRecoverChannel在官方文件的建議上已經建義使用FileChannel來替換。FileChannel保證資料的完整性與一致性。在具體配置不現的FileChannel時,建議FileChannel設定的目錄和程式日誌檔案儲存的目錄設成不同的磁碟,以便提高效率。