Apache flume+Kafka獲取實時日誌資訊
Flume簡介以及安裝
-
Flume是一個分散式的對海量日誌進行採集,聚合和傳輸的系統。Flume系統分為三個元件,分別是source,sink,channel:source表明資料的來源,可能來自檔案,Avro等,channel作為source和sink的橋樑,作為資料的臨時儲存地,channal是一個完整的事務,這一點保證了資料在收發的時候的一致性,支援的型別有: JDBC channel , File System channel , Memort channel等;sink表明資料的去向,可以把資料再次轉發到HDFS,或者Kafka等。
-
本文使用的版本是1.8.0(目前最新)的版本,可以到
-
解壓、安裝、配置
sudo tar -zxvf apache-flume-1.8.0-bin.tar.gz
cd apache-flume-1.8.0-bin
sudo vim conf/kafka.conf #這個檔案剛開始並不存在,要新建
kafka.conf的具體內容:
# 分別對應三種基礎元件,起的別名 kafka是在啟動flume的時候,指定的agent的名字 kafka.sources = src kafka.sinks = sk kafka.channels = chl # 表明需要收集的資料來自avro,此處配置會啟動avro server kafka.sources.src.type = avro kafka.sources.src.bind = localhost kafka.sources.src.port = 44446 # Flume 收集的資料轉發到Kafka的關鍵配置 kafka.sinks.sk.type = org.apache.flume.sink.kafka.KafkaSink kafka.sinks.sk.kafka.bootstrap.servers = localhost:9092 #指定kafka叢集的地址 kafka.sinks.sk.partition.key=0 kafka.sinks.sk.partitioner.class=org.apache.flume.plugins.SinglePartition kafka.sinks.sk.serializer.class=kafka.serializer.StringEncoder kafka.sinks.sk.request.required.acks=0 kafka.sinks.sk.max.message.size=1000000 kafka.sinks.sk.producer.type=sync kafka.sinks.sk.topic=log #指定kafka的topic # Use a channel which buffers events in memory kafka.channels.chl.type = memory kafka.channels.chl.capacity = 1000 kafka.channels.chl.transactionCapacity = 100 # Bind the source and sink to the channel kafka.sources.src.channels = chl kafka.sinks.sk.channel = chl
Flume服務啟動:
bin/flume-ng agent --conf conf --conf-file conf/kafka.conf --name kafka -Dflume.root.logger=INFO,LOGFILE
ps:上述命令也可以通過nohup方式啟動
至此,flume已經啟動完成了!
接下來我們請出另一個主角 Kafka
同樣是下載,安裝,配置的步驟:
wget http://mirrors.shuosc.org/apache/kafka/1.0.0/kafka_2.12-1.0.0.tgz tar -zxf kafka_2.12-1.0.0.tgz cd kafka_2.12-1.0.0 vim config/server.properties
下面是config/server.properties的配置和說明:
#指定broker的id,數字,但不能過大
broker.id=0
#指定服務的監聽埠,預設是9092
port=9092
#這個地方需要特別注意,在程式碼中使用的時候,需要完全複製這個地方的配置,如localhost:9092,但是如果是分散式的,localhost顯然是不符合要求的,最好寫成當前機器的ipv4的地址,這裡寫localhost方便單機測試和開發
listeners=PLAINTEXT://localhost:9092
#同上,具體含義看官方原始檔中的說明
advertised.listeners=PLAINTEXT://localhost:9092
#指定zk的地址
zookeeper.connect=localhost:2181
配置基本上是可以了,然後開始run
bin/kafka-server-start config/server.properties
注意: 在啟動kafka之前需要先啟動zk,可以從官網專門下載一個zk,或者使用kafka自帶的zk,都可以。
Java程式碼實現日誌列印並被Flume採集
接下來就需要在程式碼實現日誌的列印,通過flume的採集,然後傳送到kafka,其實flume採集傳送到kafka通過上述配置就已經完成了,現在就做第一件事:
我個人採用spring boot進行實現的,在pom中進行如下配置:
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter</artifactId>
<exclusions><!--把logback忽略,使用log4j-->
<exclusion>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-logging</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-log4j2</artifactId>
</dependency>
<!--flume對log4j的支援,使用avro的關鍵-->
<dependency>
<groupId>org.apache.flume.flume-ng-clients</groupId>
<artifactId>flume-ng-log4jappender</artifactId>
<version>1.5.0</version>
</dependency>
然後在src/main/resources下面,加入log配置檔案:log4j.properties
# set log levels
log4j.rootLogger=INFO, stdout, file, flume
log4j.logger.per.flume=INFO
#flume#
log4j.appender.flume=org.apache.flume.clients.log4jappender.Log4jAppender
log4j.appender.flume.layout=org.apache.log4j.PatternLayout
##重點在這裡!!這個地方就是在flume配置的avro的server地址,程式中產生的log都會通過avro的方式,被flume所採集
log4j.appender.flume.Hostname=localhost
log4j.appender.flume.Port=44446
#stdout
log4j.appender.stdout=org.apache.log4j.ConsoleAppender
log4j.appender.stdout.Threshold=INFO
log4j.appender.stdout.Target=System.out
log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
log4j.appender.stdout.layout.ConversionPattern=%d{yyyy-MM-dd HH:mm:ss} %c{1} [%p] %m%n
#file
log4j.appender.file=org.apache.log4j.DailyRollingFileAppender
log4j.appender.file.Threshold=INFO
log4j.appender.file.File=/tmp/logs/real-log.log
log4j.appender.file.Append=true
log4j.appender.file.DatePattern='.'yyyy-MM-dd
log4j.appender.file.layout=org.apache.log4j.PatternLayout
log4j.appender.file.layout.ConversionPattern=%d{yyyy-MM-dd HH:mm:ss} %c{1} [%p] %m%n
關於這個專案的簡單程式碼,已經在我的gitee上了,這個demo主要是通過列印日誌,flume 收集並publish 到 kafka指定的topic上,然後通過kafka consume接受到,後續會通過storm進行實時計算和處理