1. 程式人生 > >資料收集之Flume

資料收集之Flume

Flume最初由Cloudera開發,於2011年6月貢獻給Apache,於2012成為頂級專案。在孵化這一年,基於老版本的Flume(Flume OG:Flume Original Generation 即Flume 0.9.x版本)進行重構,摒棄了Zookeeper和Master,升級為現在的輕量級的Flume(Flume NG:Flume Next Generation,即Flume 1.x版本),

架構要點

核心概念

  1. Agent:Flume程序,包含元件Source、Channel、Sink。
  2. Source:源。收集資料,傳送給Channel。
  3. Sink:輸出。從Channel取資料,傳送到目標地。
  4. Channel:緩衝。快取Source傳遞過來的Event。
  5. Event:事件。Flume處理資料的最小單元。由鍵值對的Header和位元組陣列Body組成。

基本架構

單個Flume Agent基本架構。

flume_struct

執行機制

提到Flume的執行機制,就不得不提Flume的事務機制和可靠性。

Flume最核心的就是把資料從資料來源收集過來,再送達到目的地。為了保證At-Least-Once投送,Agent會事先把Events快取起來並採用兩階段事務提交的方式。每一批次對應兩個事務,Source-Channel事務,保證這一Batch放入Channel的操作是原子的,要麼全部放入Channel,要麼一個不放;Channel-Sink事務,保證事務的投送是原子的,要麼全部投送成功,要麼全部回滾。

主要元件

Source

HTTP Source

某些環境可能不能部署Flume,此時可用HTTP Source接收資料倒Flume中。
在${FlUME_HOME}/conf目錄下建立http_source.conf,內容如下:

agent.sources = s1
agent.channels = c1
agent.sinks = r1

#source
agent.sources.s1.type = http
agent.sources.s1.bind = 0.0.0.0
agent.sources.s1.port = 9600

#channel
agent.channels
.c1.type = memory #sink agent.sinks.r1.type = logger agent.sources.s1.channels = c1 agent.sinks.r1.channel = c1

在${FlUME_HOME}目錄下啟動agent

bin/flume-ng agent -c conf -f conf/http_source.conf -Dflume.root.logger=DEBUG,console -n agent

傳送POST請求,觀察日誌變化。

curl -X POST -d'[{"headers":{"Header1":"value1","Header2":"value2"},"body":"this is http source"}]'  http://192.168.113.102:9600
Avro Source

在${FlUME_HOME}/conf目錄下建立avro_source.conf,內容如下:

agent.sources = s1
agent.channels = c1
agent.sinks = r1

#source
agent.sources.s1.type = avro
agent.sources.s1.bind = 0.0.0.0
agent.sources.s1.port = 4141

#channel
agent.channels.c1.type = memory

#sink
agent.sinks.r1.type = logger

agent.sources.s1.channels = c1
agent.sinks.r1.channel = c1

在${FlUME_HOME}目錄下啟動agent

bin/flume-ng agent -c conf -f conf/avro_source.conf -Dflume.root.logger=DEBUG,console -n agent

在${FlUME_HOME}目錄下啟動一個avro-client 客戶端生產資料

bin/flume-ng avro-client -H localhost -p 4141 -F README.md

可以看到,README.md中的內容被接收並打印出來了。

Exec Source

Exec Source可執行Linux命令,並將輸出同步給Sink。
在${FlUME_HOME}/conf目錄下建立exec_source.conf,內容如下:

agent.sources = s1
agent.channels = c1
agent.sinks = r1

#source
agent.sources.s1.type = exec
agent.sources.s1.command = tail -f /data/software/apache-flume-1.7.0-bin/data/access.log

#channel
agent.channels.c1.type = memory

#sink
agent.sinks.r1.type = logger

agent.sources.s1.channels = c1
agent.sinks.r1.channel = c1

在${FlUME_HOME}目錄下啟動agent

bin/flume-ng agent -c conf -f conf/exec_source.conf -Dflume.root.logger=DEBUG,console -n agent

向/data/software/apache-flume-1.7.0-bin/data/access.log中追加資料,觀察日誌變化。
注意:agent重啟會重複消費。

Spooling Directory Source

Spooling Directory Source可監聽一個目錄,同步目錄中的新檔案到sink,被同步完的檔案可被立即刪除或被打上標記。適合用於同步新檔案,但不適合對實時追加日誌的檔案進行監聽並同步。
在${FlUME_HOME}/conf目錄下建立spooling_directory_source.conf ,內容如下:

agent.sources = s1
agent.channels = c1
agent.sinks = r1

#source
agent.sources.s1.type = spooldir
#監控目錄
agent.sources.s1.spoolDir = /data/software/apache-flume-1.7.0-bin/data
#Event Header中新增檔案絕對路徑
agent.sources.s1.fileHeader = true

#channel
agent.channels.c1.type = memory

#sink
agent.sinks.r1.type = logger

agent.sources.s1.channels = c1
agent.sinks.r1.channel = c1

在${FlUME_HOME}目錄下啟動agent

bin/flume-ng agent -c conf -f conf/spooling_directory_source.conf -Dflume.root.logger=DEBUG,console -n agent

cp一個檔案到/data/software/apache-flume-1.7.0-bin/data,觀察agent日誌變化,且能看到被同步的檔案被打上.COMPLETED字尾。

Taildir Source
Kafka Source

在${FlUME_HOME}/conf目錄下建立kafka_source.conf ,內容如下:

agent.sources = s1
agent.channels = c1
agent.sinks = r1

#source
agent.sources.s1.type = org.apache.flume.source.kafka.KafkaSource
agent.sources.s1.kafka.bootstrap.servers = localhost:9092
agent.sources.s1.kafka.topics = testTopic3
agent.sources.s1.kafka.consumer.group.id = consumer_testTopic3

#channel
agent.channels.c1.type = memory

#sink
agent.sinks.r1.type = logger

agent.sources.s1.channels = c1
agent.sinks.r1.channel = c1

在${FlUME_HOME}目錄下啟動agent

bin/flume-ng agent -c conf -f conf/kafka_source.conf -Dflume.root.logger=DEBUG,console -n agent

向testTopic3傳送資料,可以看到訊息被打印出來。如{ headers:{topic=testTopic3, partition=2, timestamp=1534177649622} body: 6E 6E 6E nnn }

Sink

Logger Sink

主要用於測試。將收到的Events以Logger INFO Level的方式打印出來。

在Source中多次使用,這裡不再贅述。

File Roll Sink

Events存放本地檔案系統。

在${FlUME_HOME}/conf目錄下建立file_roll_sink.conf ,內容如下:

agent.sources = s1
agent.channels = c1
agent.sinks = r1

#source
agent.sources.s1.type = http
agent.sources.s1.bind = 0.0.0.0
agent.sources.s1.port = 9600

#channel
agent.channels.c1.type = memory

#sink
agent.sinks.r1.type = file_roll
#檔案存放目錄
agent.sinks.r1.sink.directory = /data/software/apache-flume-1.7.0-bin/data/logs
#多久生成一個新檔案,單位秒。指定0將禁用滾動並導致所有事件都寫入單個檔案。
agent.sinks.r1.rollInterval = 30

agent.sources.s1.channels = c1
agent.sinks.r1.channel = c1

在${FlUME_HOME}目錄下啟動agent

bin/flume-ng agent -c conf -f conf/file_roll_sink.conf -Dflume.root.logger=DEBUG,console -n agent

向9600埠傳送資料,觀察日誌及 /data/software/apache-flume-1.7.0-bin/data目錄變化。可看到每隔30秒生成一個新檔案。

for i in `seq 1 100`;do curl -X POST -d'[{"headers":{"Header1":"value1","Header2":"value2"},"body":"Hello"}]'  http://192.168.113.102:9600;done
HDFS Sink

Events寫到HDFS 分散式檔案系統中。

在${FlUME_HOME}/conf目錄下建立hdfs_sink.conf ,內容如下:

agent.sources = s1
agent.channels = c1
agent.sinks = r1

#source
agent.sources.s1.type = exec
agent.sources.s1.command = tail -f /data/software/apache-flume-1.7.0-bin/data/access.log

#channel
agent.channels.c1.type = memory

#sink
agent.sinks.r1.type = hdfs
#使用伺服器本地時間代替Event Header中的timestamp
#注意:HDFS Sink中所有與時間相關的轉義序列,事件標題中必須存在帶有“timestamp”鍵的標頭(除非hdfs.useLocalTimeStamp設定為true)>。自動新增timestamp的一種方法是使用TimestampInterceptor
agent.sinks.r1.hdfs.useLocalTimeStamp = true
#hdfs 目錄 這裡%Y%m%d取自timestamp
agent.sinks.r1.hdfs.path = hdfs://node1:8020/test/%Y%m%d/accessLog
#hdfs 目錄下檔案字首
agent.sinks.r1.hdfs.filePrefix = data
#hdfs 目錄下檔案字尾
agent.sinks.r1.hdfs.fileSuffix = .log
#多少秒生成一個新檔案 0:不根據時間滾動
agent.sinks.r1.hdfs.rollInterval = 60
#檔案達到多少位元組後生成一個新檔案 0:不根據檔案大小滾動
agent.sinks.r1.hdfs.rollSize = 1024
#多個個Event生成一個新檔案 0:不根據事件數量滾動
agent.sinks.r1.hdfs.rollCount = 10
#Event數量達到多少向hdfs重新整理一次
agent.sinks.r1.hdfs.batchSize = 100
#指定壓縮格式 支援gzip, bzip2, lzo, lzop, snappy
#agent.sinks.r1.hdfs.codeC =
#檔案型別 支援SequenceFile, DataStream or CompressedStream
#DataStream 不啟用壓縮
agent.sinks.r1.hdfs.fileType = DataStream
#檔案格式
agent.sinks.r1.hdfs.writeFormat = Text

agent.sources.s1.channels = c1
agent.sinks.r1.channel = c1

在${FlUME_HOME}目錄下啟動agent

bin/flume-ng agent -c conf -f conf/hdfs_sink.conf -Dflume.root.logger=DEBUG,console -n agent

向/data/software/apache-flume-1.7.0-bin/data/access.log中追加檔案,觀察日誌和HDFS目錄變化。

for i in `seq 1 1000`;do echo "Hello Flume ${i}">> /data/software/apache-flume-1.7.0-bin/data/access.log;sleep 1;done

可以看到,HDFS中生成了新檔案。內容如下:

hdfs dfs -cat /test/20180814/accessLog/data.1534211236747.log
Hello Flume 21
Hello Flume 22
Hello Flume 23
Hello Flume 24
Hello Flume 25
Hello Flume 26
Hello Flume 27
Hello Flume 28
Hello Flume 29
Hello Flume 30
Kafka Sink

Events 寫到Hive 分割槽或Hive表中。
在${FlUME_HOME}/conf目錄下建立kafka_sink.conf ,內容如下:

agent.sources = s1
agent.channels = c1
agent.sinks = r1

#source
agent.sources.s1.type = netcat
agent.sources.s1.bind = localhost
agent.sources.s1.port = 6666

#channel
agent.channels.c1.type = memory

#sink
agent.sinks.r1.type = org.apache.flume.sink.kafka.KafkaSink
agent.sinks.r1.kafka.bootstrap.servers = node2:6667,node3:6667,node1:6667
agent.sinks.r1.kafka.topic = testTopic

agent.sources.s1.channels = c1
agent.sinks.r1.channel = c1

在${FlUME_HOME}目錄下啟動agent

bin/flume-ng agent -c conf -f conf/kafka_sink.conf -Dflume.root.logger=DEBUG,console -n agent

向埠6666傳送資料,觀察到kafka-console-consumer消費出了資料。

telnet localhost 6666
Hello Flume!

其他Sink

  • Hive Sink
    通過Hive的事務,將Events近實時寫到Hive分割槽中。注意:Hive必須開啟事務,Hive的表必須是內部表,且是桶表,需儲存為ORC格式。
  • HBase Sink
    將Events儲存到HBase中。寫入HBase不成功會嘗試重寫。
  • Async HBase Sink
    以非同步的方式將Events儲存到HBase中,比普通的HBase Sink要快。
  • Elasticsearch Sink
    Flume 支援的Elasticsearch版本較老。各種不相容問題。
  • Avro Sink
    Avro Sink多用於多Agent節點間資料傳輸。
  • File Roll Sink
    把Events儲存到本地檔案系統。
  • Null Sink
    丟棄從Channel中取到的Events。

Channel

Channel臨時快取Events。Source向Channel新增事件,Sink從Channel刪除事件。

Memory Channel

Events儲存在記憶體中。吞吐量高,但Agent重啟、JVM崩潰會丟資料。
關鍵引數:

  • agent.channels.c1.type = memory
    channel的型別是memory

  • agent.channels.c1.capacity
    預設100。channel中儲存的最大事件數

  • agent.channels.c1.transactionCapacity
    預設100。指源Source單次事務可以寫入的最大事件數,也指Sink單次事務所能讀取的最大事件數。可以增大該值,但要注意還要同時增加JVM堆空間大小,因為事件是以Event物件的形式存在於堆中的。同時還要注意,增大該值可以提升速度,但事務失敗就要回滾更多的資料。

  • agent.channels.c1.keep-alive
    單位是秒,預設3秒。指channel已滿,Source執行緒將Events寫入到channel中的等待時間,這個值設的太大容易導致Events堵在Source端。會丟擲異常。

  • agent.channels.c1.byteCapacityBufferPercentage與agent.channels.c1.byteCapacity
    使用位元組而非數量來控制記憶體中事件的總大小。避免OutOfMemoryError。

File Channel

Events持久化到檔案中。多個Channel,應為每個Channel顯示指定檢查點目錄和資料目錄,且儘量在不同磁碟。
關鍵引數:

  • agent.channels.c1.type = file

  • agent.channels.c1.dataDirs
    資料持久化目錄。逗號分隔。在不同磁碟上使用多個目錄可以提高檔案通道的效能。

  • agent.channels.c1.checkpointDir
    檢查點目錄。

  • agent.channels.c1.checkpointInterval
    兩個檢查點之間的毫秒數。

  • agent.channels.c1.useDualCheckpoints與agent.channels.c1.backupCheckpointDir
    備份檢查點目錄。

  • agent.channels.c1.capacity
    預設1000000。同Memory Channel。

  • agent.channels.c1.transactionCapacity
    預設10000。同Memory Channel。

  • agent.channels.c1.keep-alive
    單位是秒,預設3秒。同Memory Channel。

在${FlUME_HOME}/conf目錄下建立file_channel_sink.conf ,內容如下:

agent.sources = s1
agent.channels = c1
agent.sinks = r1

#source
agent.sources.s1.type = netcat
agent.sources.s1.bind = localhost
agent.sources.s1.port = 6666

#channel
agent.channels.c1.type = file
agent.channels.c1.dataDirs = /data/software/apache-flume-1.7.0-bin/data/fileChannel/dataDirs
agent.channels.c1.checkpointDir = /data/software/apache-flume-1.7.0-bin/data/fileChannel/checkpointDir
agent.channels.c1.capacity = 1000
agent.channels.c1.transactionCapacity = 100

#sink
agent.sinks.r1.type = logger

agent.sources.s1.channels = c1
agent.sinks.r1.channel = c1

在${FlUME_HOME}目錄下啟動agent

bin/flume-ng agent -c conf -f conf/file_channel_sink.conf -Dflume.root.logger=DEBUG,console -n agent

向6666埠傳送一段時間資料後,將agent關掉,然後重啟。觀察變化。

for i in `seq 1 100000`;do echo ${i} > /dev/tcp/localhost/6666;echo ${i};done

重啟後可以看到,flume會接著消費。

其他Channel
  • Kafka Channel
    Events快取在Kafka中,但需要Kafka 0.9以上的版本。

  • JDBC Channel
    Events快取在資料庫中。目前只支援Derby。

  • Spillable Memory Channel
    Events同時快取在記憶體和檔案中。記憶體做主存。實驗性的,不建議生產下使用。

  • Pseudo Transaction Channel
    僅用於測試,不用於生產。

Channel Selector

一個源Source可以被選擇性的寫入多個Channel中,叫Channel選擇器。

多路複製

同一Source資料,複製多份,即複製到多個Channel,每個Channel最終分別發向不同Sink,如HDFS、Kafka、本地檔案系統等。
關鍵引數:

  • agent.sources.s1.selector.type = replicating
    channel selector為多路複製。

  • agent.sources.s1.selector.optional = c1
    如下c1是可選channel。無法寫入c1的事務會被忽略。c2和c3未標記為可選,無法寫入這些channel將導致事務失敗。

在${FlUME_HOME}/conf目錄下建立replicating_channel_selector.conf 內容如下:

agent.sources = s1
agent.channels = c1 c2 c3
agent.sinks = r1 r2 r3

#source 配置
#source
agent.sources.s1.type = netcat
agent.sources.s1.bind = localhost
agent.sources.s1.port = 6666

#配置channel selector
agent.sources.s1.selector.type = replicating
agent.sources.s1.channels = c1 c2 c3
agent.sources.s1.selector.optional = c1

#channel 配置
#channel c1
agent.channels.c1.type = file
agent.channels.c1.dataDirs = /data/software/apache-flume-1.7.0-bin/data/fileChannel/dataDirs
agent.channels.c1.checkpointDir = /data/software/apache-flume-1.7.0-bin/data/fileChannel/checkpointDir
agent.channels.c1.capacity = 1000
agent.channels.c1.transactionCapacity = 100

#channel c2
agent.channels.c2.type = memory

#channel c3
agent.channels.c3.type = memory

#sink 配置
#sink r1
agent.sinks.r1.type = logger

#sink r2
agent.sinks.r2.type = org.apache.flume.sink.kafka.KafkaSink
agent.sinks.r2.kafka.bootstrap.servers = node2:6667,node3:6667,node1:6667
agent.sinks.r2.kafka.topic = testTopic

#sink r3
agent.sinks.r3.type = file_roll
agent.sinks.r3.sink.directory = /data/software/apache-flume-1.7.0-bin/data/file_roll

#source/channel/sink繫結
agent.sinks.r1.channel = c1
agent.sinks.r2.channel = c2
agent.sinks.r3.channel = c3

在${FlUME_HOME}目錄下啟動agent

bin/flume-ng agent -c conf -f conf/replicating_channel_selector.conf -Dflume.root.logger=DEBUG,console -n agent

向6666埠傳送資料後,在控制檯、file_roll目錄、kafka中均收到一樣的資料。

for i in `seq 1 100000`;do echo '{"name":"'"name${i}"'","age":10}' > /dev/tcp/localhost/6666;echo ${i};done
多路複用

同一Source,根據某個Header值分別寫入到不同Channel中。

關鍵引數:

  • agent.sources.s1.selector.type = multiplexing
    channel selector為多路複用。

  • agent.sources.s1.selector.header
    用Header中哪個欄位的值分流。

  • agent.sources.s1.selector.mapping.*
    值匹配到*則傳送到相應channel。

在${FlUME_HOME}/conf目錄下建立multiplexing_channel_selector.conf 內容如下:

agent.sources = s1
agent.channels = c1 c2 c3
agent.sinks = r1 r2 r3

#source 配置
#source
agent.sources.s1.type = netcat
agent.sources.s1.bind = localhost
agent.sources.s1.port = 6666

#這裡使用攔截器向Event Header中新增欄位
agent.sources.s1.interceptors = i1
#基於正則從Event body中抽取欄位,新增到Event Header中
agent.sources.s1.interceptors.i1.type = regex_extractor
agent.sources.s1.interceptors.i1.regex = "log_type":"(\\w+)".*
agent.sources.s1.interceptors.i1.serializers = i1_ser1
agent.sources.s1.interceptors.i1.serializers.i1_ser1.name = log_type

#配置channel selector 為多路複用
agent.sources.s1.channels = c1 c2 c3
agent.sources.s1.selector.type = multiplexing
#使用Header中log_type欄位的值分流
agent.sources.s1.selector.header = log_type
#Header中log_type=AppError 傳送到channel c1
agent.sources.s1.selector.mapping.AppError = c1
#Header中log_type=UserInfo 傳送到channel c2
agent.sources.s1.selector.mapping.UserInfo = c2
#Header中log_type=AccessLog 傳送到channel c3
agent.sources.s1.selector.mapping.AccessLog = c3

#channel 配置
#channel c1
agent.channels.c1.type = file
agent.channels.c1.dataDirs = /data/software/apache-flume-1.7.0-bin/data/fileChannel/dataDirs
agent.channels.c1.checkpointDir = /data/software/apache-flume-1.7.0-bin/data/fileChannel/checkpointDir
agent.channels.c1.capacity = 1000
agent.channels.c1.transactionCapacity = 100

#channel c2
agent.channels.c2.type = memory

#channel c3
agent.channels.c3.type = memory

#sink 配置
#sink r1
agent.sinks.r1.type = logger

#sink r2
agent.sinks.r2.type = org.apache.flume.sink.kafka.KafkaSink
agent.sinks.r2.kafka.bootstrap.servers = node2:6667,node3:6667,node1:6667
agent.sinks.r2.kafka.topic = testTopic

#sink r3
agent.sinks.r3.type = file_roll
agent.sinks.r3.sink.directory = /data/software/apache-flume-1.7.0-bin/data/file_roll


#source/channel/sink繫結
agent.sinks.r1.channel = c1
agent.sinks.r2.channel = c2
agent.sinks.r3.channel = c3

在${FlUME_HOME}目錄下啟動agent

bin/flume-ng agent -c conf -f conf/multiplexing_channel_selector.conf -Dflume.root.logger=DEBUG,console -n agent

向6666埠傳送資料後,在控制檯、file_roll目錄、kafka中分別收到各自的資料。

log1='{"log_type":"AppError","msg":"this is AppError !"}'
log2='{"log_type":"UserInfo","msg":"this is UserInfo !"}'
log3='{"log_type":"AccessLog","msg":"this is AccessLog !"}'

#將會在kafka中收到此類資料
for i in `seq 1 100000`;do echo "${log2}" > /dev/tcp/localhost/6666;echo ${i};done

Sink Processors

通過接收器組,來解決Sink的單點故障與負載均衡問題。

故障轉移 Failover Sink Processor

維護一個帶優先順序的sink列表,對失敗根據優先順序進行路由。若sinks都不可用,則事務會回滾。

關鍵引數:

  • agent.sinkgroups.g1.processor.type = failover
    配置接收器組中接收器之間處理方式為故障轉移。

  • agent.sinkgroups.g1.processor.priority.r1
    配置每個接收器sink的優先順序。

  • agent.sinkgroups.g1.processor.maxpenalty
    不可用接收器的最大等待毫秒數。首次失敗,間隔一秒後失敗sink才可使用,之後指數級等待直到達到最大上限maxpenalty。

在${FlUME_HOME}/conf目錄下建立failover_sink_processor.conf 內容如下:

agent.sources = s1
agent.channels = c1
agent.sinks = r1 r2

#source
agent.sources.s1.type = netcat
agent.sources.s1.bind = localhost
agent.sources.s1.port = 6666

#channel
agent.channels.c1.type = memory

#sink
agent.sinkgroups = g1
agent.sinkgroups.g1.sinks = r1 r2
#故障轉移
agent.sinkgroups.g1.processor.type = failover
#sink r1 的優先順序是10
agent.sinkgroups.g1.processor.priority.r1 = 10
#sink r2 的優先順序是100 會優先寫到檔案
agent.sinkgroups.g1.processor.priority.r2 = 100
#不可用接收器的最大等待毫秒數
agent.sinkgroups.g1.processor.maxpenalty = 10000

#sink r1
agent.sinks.r1.type = org.apache.flume.sink.kafka.KafkaSink
agent.sinks.r1.kafka.bootstrap.servers = node2:6667,node3:6667,node1:6667
agent.sinks.r1.kafka.topic = testTopic

#sink r2
agent.sinks.r2.type = file_roll
agent.sinks.r2.sink.directory = /data/software/apache-flume-1.7.0-bin/data/file_roll

agent.sources.s1.channels = c1
agent.sinks.r1.channel = c1
agent.sinks.r2.channel = c1

在${FlUME_HOME}目錄下啟動agent

bin/flume-ng agent -c conf -f conf/failover_sink_processor.conf -Dflume.root.logger=DEBUG,console -n agent

向埠傳送資料,可以看到,資料都寫進file_roll sink對應的本地目錄,然後刪掉file_roll sink本地目錄(模擬某個sink掛掉),資料傳送到kafka。

log2='{"log_type":"UserInfo","msg":"this is UserInfo !"}'
for i in `seq 1 100000`;do echo "${log2}" > /dev/tcp/localhost/6666;echo ${i};done
負載均衡 Load balancing Sink Processor

接收器組中的接收器之間根據負責均衡策略。
關鍵引數:

  • agent.sinkgroups.g1.processor.type = load_balance
    配置接收器組中接收器之間處理方式為負載均衡。

  • agent.sinkgroups.g1.processor.selector = round_robin
    負載均衡的方式為輪詢。也可配置成隨機random。

  • agent.sinkgroups.g1.processor.backoff = false
    值為true,某個sink失敗後會指數級等待並重試。

在${FlUME_HOME}/conf目錄下建立load_balancing_sink_processor.conf 內容如下:

agent.sources = s1
agent.channels = c1
agent.sinks = r1 r2

#source
agent.sources.s1.type = netcat
agent.sources.s1.bind = localhost
agent.sources.s1.port = 6666

#channel
agent.channels.c1.type = memory

#sink
agent.sinkgroups = g1
agent.sinkgroups.g1.sinks = r1 r2
#負載均衡
agent.sinkgroups.g1.processor.type = load_balance
#負載均衡方式 random/round_robin
agent.sinkgroups.g1.processor.selector = round_robin
agent.sinkgroups.g1.processor.backoff = false


#sink r1
agent.sinks.r1.type = org.apache.flume.sink.kafka.KafkaSink
agent.sinks