均衡負載方式搭建高可用的flume-ng環境寫入資訊到hadoop和kafka
應用場景為多臺agent推送本地日誌資訊到hadoop,由於agent和hadoop叢集處在不同的網段,資料量較大時可能出現網路壓力較大的情況,所以我們在hadoop一側的網段中部署了兩臺flume collector機器,將agent的資料傳送到collector上進行分流,分成2個collector的資料匯入hadoop,資料流圖如下:
圖中只畫了3個agent,實際應用場景中有多臺,但是collector只有兩臺
我們需要將agent的資料均衡地分發到兩臺collector機器上,agent的配置如下:
在collector1、collector2都正常的情況下,agent的資料隨機向兩臺機器分發,當collector任意一臺機器故障時,agent的資料會發送到另一臺正常的機器上#name the components on this agent 這裡宣告各個source、channel、sink的名稱 a1.sources = r1 a1.sinks = k1 k2 a1.channels = c1 # Describe/configure the source 宣告source的型別,此處是通過tcp的方式監聽本地埠5140 a1.sources.r1.type = syslogtcp a1.sources.r1.port = 5140 a1.sources.r1.host = localhost a1.sources.r1.channels = c1 #define sinkgroups 此處配置k1、k2的組策略,k1、k2合為一組,型別為均衡負載方式 a1.sinkgroups=g1 a1.sinkgroups.g1.sinks=k1 k2 a1.sinkgroups.g1.processor.type=load_balance a1.sinkgroups.g1.processor.backoff=true a1.sinkgroups.g1.processor.selector=round_robin #define the sink 1<span> </span>指定sink1、sink2的資料流向,都是通過avro方式發到兩臺collector機器 a1.sinks.k1.type=avro a1.sinks.k1.hostname=10.0.3.82 a1.sinks.k1.port=5150 #define the sink 2 a1.sinks.k2.type=avro a1.sinks.k2.hostname=10.0.3.83 a1.sinks.k2.port=5150 # Use a channel which buffers events in memory 指定channel的型別為記憶體channel a1.channels.c1.type = memory a1.channels.c1.capacity = 1000 a1.channels.c1.transactionCapacity = 100 # Bind the source and sink to the channel a1.sources.r1.channels = c1 a1.sinks.k1.channel = c1 a1.sinks.k2.channel=c1
collector1的配置
# The configuration file needs to define the sources, # the channels and the sinks. # Sources, channels and sinks are defined per agent, # in this case called 'agent' collector1.sources = r1 collector1.channels = c1 c2 collector1.sinks = k1 k2 # Describe the source collector1.sources.r1.type = avro collector1.sources.r1.port = 5150 collector1.sources.r1.bind = 0.0.0.0 collector1.sources.r1.channels = c1 c2 # Describe channels c1 c2 which buffers events in memory collector1.channels.c1.type = file collector1.channels.c1.checkpointDir = /usr/local/apache-flume-1.6.0-bin/fileChannel/checkpoint collector1.channels.c1.dataDir = /usr/local/apache-flume-1.6.0-bin/fileChannel/data collector1.channels.c2.type = memory collector1.channels.c2.capacity = 1000 collector1.channels.c2.transactionCapacity = 100 # Describe the sink k1 to hadoop collector1.sinks.k1.type = hdfs collector1.sinks.k1.channel = c1 collector1.sinks.k1.hdfs.path = /quantone/flume/ collector1.sinks.k1.hdfs.fileType = DataStream collector1.sinks.k1.hdfs.writeFormat = TEXT collector1.sinks.k1.hdfs.rollInterval = 300 collector1.sinks.k1.hdfs.filePrefix = %Y-%m-%d collector1.sinks.k1.hdfs.round = true collector1.sinks.k1.hdfs.roundValue = 5 collector1.sinks.k1.hdfs.roundUnit = minute collector1.sinks.k1.hdfs.useLocalTimeStamp = true # Describe the sink k2 to kafka collector1.sinks.k2.type = org.apache.flume.sink.kafka.KafkaSink collector1.sinks.k2.topic = mytopic collector1.sinks.k2.channel = c2 collector1.sinks.k2.brokerList = 10.0.3.178:9092,10.0.3.179:9092 collector1.sinks.k2.requiredAcks = 1 collector1.sinks.k2.batchSize = 20
collector2的配置
# The configuration file needs to define the sources, # the channels and the sinks. # Sources, channels and sinks are defined per agent, # in this case called 'agent' collector2.sources = r1 collector2.channels = c1 c2 collector2.sinks = k1 k2 # Describe the source collector2.sources.r1.type = avro collector2.sources.r1.port = 5150 collector2.sources.r1.bind = 0.0.0.0 collector2.sources.r1.channels = c1 c2 # Describe channels c1 c2 which buffers events in memory collector2.channels.c1.type = file collector2.channels.c1.checkpointDir = /usr/local/apache-flume-1.6.0-bin/fileChannel/checkpoint collector2.channels.c1.dataDir = /usr/local/apache-flume-1.6.0-bin/fileChannel/data collector2.channels.c2.type = memory collector2.channels.c2.capacity = 1000 collector2.channels.c2.transactionCapacity = 100 # Describe the sink k1 to hadoop collector2.sinks.k1.type = hdfs collector2.sinks.k1.channel = c1 collector2.sinks.k1.hdfs.path = /quantone/flume collector2.sinks.k1.hdfs.fileType = DataStream collector2.sinks.k1.hdfs.writeFormat = TEXT collector2.sinks.k1.hdfs.rollInterval = 300 collector2.sinks.k1.hdfs.filePrefix = %Y-%m-%d collector2.sinks.k1.hdfs.round = true collector2.sinks.k1.hdfs.roundValue = 5 collector2.sinks.k1.hdfs.roundUnit = minute collector2.sinks.k1.hdfs.useLocalTimeStamp = true # Describe the sink k2 to kafka collector2.sinks.k2.type = org.apache.flume.sink.kafka.KafkaSink collector2.sinks.k2.topic = mytopic collector2.sinks.k2.channel = c2 collector2.sinks.k2.brokerList = 10.0.3.178:9092,10.0.3.179:9092 collector2.sinks.k2.requiredAcks = 1 collector2.sinks.k2.batchSize = 20
sink到hadoop的channel型別為file型別,該型別的channel會在對應的sink傳送資料失敗後將資訊持久化到對應的檔案目錄中,待網路恢復正常後繼續講資料傳送出去,相比memory channel,此種類型的channel適合資料量不大但是對可靠性要求較高的資料傳輸。
需要注意的是:此處我們使用collector2.sinks.k1.hdfs.filePrefix = %Y-%m-%d 的配置標明寫入hadoop中檔名的字首,如果在傳送資料的header中沒有對應的timestamp欄位,這樣配置會導致資料傳送不了,此時需要加上配置collector2.sinks.k1.hdfs.useLocalTimeStamp = true 表明使用collector此時的時間來匹配%Y-%m-%d欄位,但是這個時間其實不是日誌在agent本地生成的真實時間。
如果想讓不同的agent的資料寫入到不同的kafka 的topic中,在collector的kafka sink中的欄位collector1.sinks.k2.topic = mytopic 配置可以不配,在每個agent的source中配置static型別的interceptors,如:
a1.sources.r1.interceptors.i1.type = static
a1.sources.r1.interceptors.i1.key = topic
a1.sources.r1.interceptors.i1.value = mytopic
這樣可以使不同的agent生成不同的topic名,將不同agent的資料寫入到對應的topic中