1. 程式人生 > 實用技巧 >Flume+Kafka+Sparkstreaming日誌分析

Flume+Kafka+Sparkstreaming日誌分析

2019獨角獸企業重金招聘Python工程師標準>>> hot3.png

最近要做一個日誌實時分析的應用,採用了flume+kafka+sparkstreaming框架,先搞了一個測試Demo,本文沒有分析其架構原理。

  簡介:flume是一個分散式,高可靠,可用的海量日誌聚合系統,kafka是一高吞吐量的分散式釋出訂閱系統,sparkstreaming是建立在spark上的實時計算框架,這這個Demo中,以上內容均為單機版偽分佈,flume的source為exec,agent的名稱為producer,sink為kafka。

  執行所需要的環境直接到官網上下載即可:

  我的環境是:flume1.6+kafka_2.10+spark1.2.0

  flume的配置:

  在conf下編輯配置檔案roomy.conf如下:

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

#agent section

producer.sources = s

producer.channels = c

producer.sinks = r

#source section

producer.sources.s.type=exec

producer.sources.s.command=tail-F -n+1/Users/roomy/Desktop/Coding/scala/real_time_project/debug

.log#監聽日誌所在

producer.sources.s.channels = c

# Each sink's type must be defined

producer.sinks.r.type= org.apache.flume.plugins.KafkaSink

producer.sinks.r.metadata.broker.list=192.168.1.102:9092#這裡換成自己Kafka的地址

producer.sinks.r.partition.key=0

producer.sinks.r.partitioner.class=org.apache.flume.plugins.SinglePartition

producer.sinks.r.serializer.class=kafka.serializer.StringEncoder

producer.sinks.r.request.required.acks=0

producer.sinks.r.max.message.size=1000000

producer.sinks.r.producer.type=sync

producer.sinks.r.custom.encoding=UTF-8

  在flume資料夾下執行

1

bin/flume-ngagent --conf conf --conf-fileconf/roomy.conf --name producer -Dflume.root.logger=INFO,console

  flume的部分完成。

  在kafka目錄下執行:

1

bin/zookeeper-server-start.sh config/zookeeper.properties

  啟動zookeeper

  執行:

1

bin/kafka-server-start.sh config/server.properties

  啟動kafka,這裡無需做什麼額外配置。

  最後編寫spark streaming測試Demo程式

  直接新建SBT專案,build.sbt如下:

1

2

3

4

5

6

7

8

9

10

11

name :="sk"

version :="1.0"

scalaVersion :="2.10.4"

libraryDependencies +="org.apache.spark"%"spark-streaming_2.10"%"1.6.1"

libraryDependencies +="org.apache.spark"%"spark-streaming-kafka_2.10"%"1.6.1"

libraryDependencies +="log4j"%"log4j"%"1.2.17"

  需要注意的是,由於GFW,下載慢的要死,接下來就是測試程式

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

29

30

31

32

33

34

35

importkafka.serializer.StringDecoder

importorg.apache.spark.SparkConf

importorg.apache.spark.streaming.dstream.DStream

importorg.apache.spark.streaming.{Seconds, StreamingContext}

importorg.apache.spark.streaming.kafka.KafkaUtils

/**

* Created by roomy on 16/3/23.

*/

objectKafkaStreaming {

defmain(agrs:Array[String]):Unit={

valsparkConf=newSparkConf().setMaster("local[2]").setAppName("Streamingtest")

valssc=newStreamingContext(sparkConf, Seconds(20))

valtopic="test"

valtopicSet=topic.split(" ").toSet

//create direct kafka stream with brokers and topics

valkafkaParams=Map[String, String]("metadata.broker.list"->"localhost:9092")

valmessages=KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](

ssc, kafkaParams, topicSet

)

vallines=messages.map(_._2)

lines.print()

valwords:DStream[String]=lines.flatMap(_.split("\n"))

words.count().print()

//啟動

ssc.start()

ssc.awaitTermination()

}

}

  可以通過StreamContext的建構函式設定資料採集分析的間隔。

  程式會監聽/Users/roomy/Desktop/Coding/scala/real_time_project/debug.log中的變動,並以20秒一次的頻率總計增加行數輸出在控制檯。

  日誌沒有變動的時候如下:

  執行測試程式產生日誌:

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

29

30

31

32

33

34

importorg.apache.log4j.Logger;

/**

* Created by roomy on 16/3/23.

* to generate some log to test

*/

publicclassLogGeneratorimplementsRunnable{

privatestaticLogger logger = Logger.getLogger(LogGenerator.class);

privateintno;

publicLogGenerator(intno){

this.no=no;

}

publicstaticvoidmain(String [] agrs)throwsInterruptedException {

for(inti=0;i<5;i++){

newThread(newLogGenerator(i)).start();

}

}

@Override

publicvoidrun() {

while(true){

logger.debug("this is a test information produced by roomy no:"+Thread.currentThread().getName());

try{

Thread.sleep((int)Math.random()*100);

}

catch(Exception e){

e.printStackTrace();

}

}

}

}

  控制檯輸出如下:

  streaming的輸出操作會把每個批次的前十個元素輸出如下:

  在這20秒內總共產生的日誌行數為:

  參考文件:

  https://flume.apache.org/FlumeUserGuide.html

  http://kafka.apache.org/documentation.html 

  Spark快速大資料分析

轉載於:https://my.oschina.net/hblt147/blog/1840271