Flume遠端實時採集Windows產生的log4j產生的資料
阿新 • • 發佈:2018-12-29
一、配置log4j.properties
在IDEA的resource下面新建log4j.properties
配置如下:主要定義了控制檯和flume裡面的日誌格式,flumeAppender配置參考Flume官網
log4j.rootLogger=INFO,stdout,flume log4j.appender.stdout=org.apache.log4j.ConsoleAppender log4j.appender.stdout.target=System.out log4j.appender.stdout.layout=org.apache.log4j.PatternLayout log4j.appender.stdout.layout.ConversionPattern=%d{yyyy-MM-dd HH:mm:ss,SSS} [%t] [%c] [%p] - %m%n log4j.appender.flume = org.apache.flume.clients.log4jappender.Log4jAppender log4j.appender.flume.Hostname = 192.168.116.10 log4j.appender.flume.Port = 41414 log4j.appender.flume.UnsafeMode = true log4j.appender.flume.layout=org.apache.log4j.PatternLayout log4j.appender.flume.layout.ConversionPattern=%d{yyyy-MM-dd HH:mm:ss,SSS} [%t] [%c] [%p] - %m%n
二、配置依賴
我把整個scala專案依賴都寫進來的,其實生產日誌只需要配置flume-ng-log4jappender就夠了
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd"> <modelVersion>4.0.0</modelVersion> <groupId>com.imooc.spark</groupId> <artifactId>sparktrain</artifactId> <version>1.0</version> <inceptionYear>2008</inceptionYear> <properties> <scala.version>2.11.4</scala.version> <kafka.version>1.0.0</kafka.version> <spark.version>2.4.0</spark.version> <hadoop.version>2.6.1</hadoop.version> <hbase.version>1.2.6</hbase.version> </properties> <dependencies> <dependency> <groupId>org.scala-lang</groupId> <artifactId>scala-library</artifactId> <version>${scala.version}</version> </dependency> <dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka_2.11</artifactId> <version>1.0.0</version> </dependency> <dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-client</artifactId> <version>${hadoop.version}</version> </dependency> <dependency> <groupId>io.netty</groupId> <artifactId>netty-all</artifactId> <version>4.1.17.Final</version> </dependency> <dependency> <groupId>com.fasterxml.jackson.module</groupId> <artifactId>jackson-module-scala_2.11</artifactId> <version>2.9.1</version> </dependency> <dependency> <groupId>mysql</groupId> <artifactId>mysql-connector-java</artifactId> <version>5.1.31</version> </dependency> <dependency> <groupId>org.apache.hbase</groupId> <artifactId>hbase-client</artifactId> <version>${hbase.version}</version> </dependency> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-sql_2.11</artifactId> <version>2.4.0</version> </dependency> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-streaming-flume_2.11</artifactId> <version>2.3.0</version> </dependency> <dependency> <groupId>org.apache.commons</groupId> <artifactId>commons-lang3</artifactId> <version>3.5</version> </dependency> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-streaming-flume-sink_2.11</artifactId> <version>2.3.0</version> </dependency> <dependency> <groupId>org.slf4j</groupId> <artifactId>slf4j-log4j12</artifactId> <version>1.7.21</version> </dependency> <dependency> <groupId>log4j</groupId> <artifactId>log4j</artifactId> <version>1.2.16</version> </dependency> <dependency> <groupId>org.apache.flume.flume-ng-clients</groupId> <artifactId>flume-ng-log4jappender</artifactId> <version>1.6.0</version> </dependency> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-streaming-kafka-0-8_2.11</artifactId> <version>2.4.0</version> </dependency> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-streaming_2.11</artifactId> <version>2.4.0</version> </dependency> </dependencies> <build> <sourceDirectory>src/main/scala</sourceDirectory> <testSourceDirectory>src/test/scala</testSourceDirectory> <plugins> <plugin> <groupId>org.scala-tools</groupId> <artifactId>maven-scala-plugin</artifactId> <executions> <execution> <goals> <goal>compile</goal> <goal>testCompile</goal> </goals> </execution> </executions> <configuration> <scalaVersion>${scala.version}</scalaVersion> <args> <arg>-target:jvm-1.5</arg> </args> </configuration> </plugin> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-eclipse-plugin</artifactId> <configuration> <downloadSources>true</downloadSources> <buildcommands> <buildcommand>ch.epfl.lamp.sdt.core.scalabuilder</buildcommand> </buildcommands> <additionalProjectnatures> <projectnature>ch.epfl.lamp.sdt.core.scalanature</projectnature> </additionalProjectnatures> <classpathContainers> <classpathContainer>org.eclipse.jdt.launching.JRE_CONTAINER</classpathContainer> <classpathContainer>ch.epfl.lamp.sdt.launching.SCALA_CONTAINER</classpathContainer> </classpathContainers> </configuration> </plugin> </plugins> </build> <reporting> <plugins> <plugin> <groupId>org.scala-tools</groupId> <artifactId>maven-scala-plugin</artifactId> <configuration> <scalaVersion>${scala.version}</scalaVersion> </configuration> </plugin> </plugins> </reporting> </project>
三、模擬產生日誌
import org.apache.log4j.Logger; public class LoggerGenerator { private static Logger logger=Logger.getLogger(LoggerGenerator.class.getName()); public static void main(String[] args)throws Exception{ int i=0; while (1==1){ Thread.sleep(1000); logger.info("now is "+i); i++; } } }
四、配置Flume
a1.sources = r1
a1.sinks = k1
a1.channels = c1
# 配置監控檔案
a1.sources.r1.type = avro
a1.sources.r1.bind=0.0.0.0
a1.sources.r1.port=41414
# 配置sink
#a1.sinks.k1.type =logger
a1.sinks.k1.type = file_roll
a1.sinks.k1.sink.directory =/usr/tmp/stream
a1.sinks.k1.sink.rollInterval=300
a1.sinks.k1.sink.batchSize=100
a1.sinks.k1.sink.serializer=TEXT
a1.sinks.k1.sink.serializer.appendNewline = false
#-------------
# 配置channel
a1.channels.c1.type = memory
# 將三者串聯
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
五、啟動Flume
flume-ng agent -n a1 -c /usr/local/src/apache-flume-1.6.0-bin/conf -f /usr/local/src/apache-flume-1.6.0-bin/conf/streaming.conf -Dflume.root.logger=INFO,console
六、啟動java程式
直接右鍵run
七、測試
可以發現在定義的資料夾下有新生成的日誌檔案,關閉Flume之後,日誌已經寫進去了