1. 程式人生 > >Rsyslog,HAProxy,Flume 做負載均衡

Rsyslog,HAProxy,Flume 做負載均衡

記錄利用 HAProxy 給 Flume 做負載均衡的關鍵步驟。

Rsyslog 配置

業務端:

module(load="omrelp")

$SystemLogRateLimitInterval 0
$WorkDirectory /tmp
$EscapeControlCharactersOnReceive on
$ActionQueueType LinkedList
$ActionQueueFileName da_queue
$ActionResumeRetryCount -1
$ActionQueueSaveOnShutdown on
$ActionQueueSize 100000000
$ActionQueueMaxFileSize
1000M $ActionQueueMaxDiskSpace 100G if($pri-text == "local5.info") then { action(type="omrelp" target="rsyslog-proxy" port="5140") }

轉發端:
一定要加上RebindInterval="1000"引數,每傳送1000條資料,關閉當前連線,建立新連線。HAProxy 才能進行輪詢。

$template billfile,"datadir/%$year%%$month%%$day%/%$hour%/%msg:R,ERE,1,ZERO:([0-9]{5,6})\|.*--end%"
$template
flume,"<%pri%> %timereported:::date-rfc3339% %hostname% %msg:2:$:drop-last-lf%" $template kafka,"%msg:2:$:drop-last-lf%" if($pri-text == "local5.info") then { action(type="omfile" dynaFile="billfile" template="verbose" dirCreateMode="0755") action(type="omkafka" topic="kafka topic" broker=
"kafka broker list" template="kafka") action(type = "omfwd" Target = "haproxy" Port = "4444" Protocol = "tcp" Template = "flume" queue.type = "LinkedList" queue.filename = "queueData" queue.spoolDirectory = "/tmp" queue.size = "100000000" queue.dequeuebatchsize = "1000" queue.maxfilesize = "100m" queue.saveonshutdown = "on" queue.workerthreads = "4" RebindInterval="1000") }

HAProxy 配置

global
    log         127.0.0.1 local2 info # haproxy 用 syslog 寫日誌
    chroot      /data/haproxy/chroot
    pidfile     /data/haproxy/haproxy.pid
    maxconn     10000
    user        root

defaults
    mode        tcp
    log         global
    option      tcplog
    option      redispatch
    timeout connect 10000
    timeout client 300000
    timeout server 300000
    maxconn     60000
    retries     3

listen rsyslog_balance
   bind *:4444
   mode tcp
   balance roundrobin
   server tcp-1 flume1:4444 check port 4444 inter 5000 fall 5
   server tcp-2 flume2:4444 check port 4444 inter 5000 fall 

HAProxyRsyslog 寫日誌檔案,所以 HAProxy 伺服器的 Rsyslog 配置檔案中需要加入以下配置:

# 表示 local2 的日誌不會向 messages 裡面寫
local2.none  /var/log/messages 
# local2 的所有日誌都寫入指定 log 檔案
local2.* /path/to/haproxy/log

Flume 配置


a1.sources  = syslogtcp
a1.channels = hdfschannel
a1.sinks    = hdfssink

# source
a1.sources.syslogtcp.channels = kafkachannel hdfschannel
a1.sources.syslogtcp.type     = syslogtcp
a1.sources.syslogtcp.bind     =  0.0.0.0
a1.sources.syslogtcp.port     = 4444
a1.sources.syslogtcp.eventSize = 10000

# sink
a1.sinks.hdfssink.channel = hdfschannel
a1.sinks.hdfssink.type = hdfs
a1.sinks.hdfssink.hdfs.path = hdfs://nameservice1:8020/data/flume/%Y%m%d/%H
a1.sinks.hdfssink.hdfs.fileType = DataStream
a1.sinks.hdfssink.hdfs.writeFormat = Text
a1.sinks.hdfssink.hdfs.useLocalTimeStamp = true
a1.sinks.hdfssink.hdfs.filePrefix = FlumeData.1
a1.sinks.hdfssink.hdfs.inUsePrefix = tmp.
a1.sinks.hdfssink.hdfs.inUseSuffix = .tmp
a1.sinks.hdfssink.hdfs.rollInterval = 60
a1.sinks.hdfssink.hdfs.rollSize = 0
a1.sinks.hdfssink.hdfs.rollCount = 0
a1.sinks.hdfssink.hdfs.round = true
a1.sinks.hdfssink.hdfs.roundValue = 1
a1.sinks.hdfssink.hdfs.roundUnit = minute
a1.sinks.hdfssink.hdfs.batchSize = 2000
a1.sinks.hdfssink.hdfs.callTimeout = 180000
a1.sinks.hdfssink.hdfs.retryInterval = 0
a1.sinks.hdfssink.hdfs.closeTries = 1

a1.channels.hdfschannel.type = memory
a1.channels.hdfschannel.capacity = 200000000
a1.channels.hdfschannel.transactionCapacity = 10000

參考: