1. 程式人生 > 實用技巧 >Flume+Kafka+Spark Streaming實現大資料實時流式資料採集

Flume+Kafka+Spark Streaming實現大資料實時流式資料採集

大資料實時流式資料處理是大資料應用中最為常見的場景,與我們的生活也息息相關,以手機流量實時統計來說,它總是能夠實時的統計出使用者的使用的流量,在第一時間通知使用者流量的使用情況,並且最為人性化的為使用者提供各種優惠的方案,如果採用離線處理,那麼等到使用者流量超標了才通知使用者,這樣會使得使用者體驗滿意度降低,這也是這幾年大資料實時流處理的進步,淡然還有很多應用場景。因此Spark Streaming應用而生,不過對於實時我們應該準確理解,需要明白的一點是Spark Streaming不是真正的實時處理,更應該成為準實時,因為它有延遲,而真正的實時處理Storm更為適合,最為典型場景的是淘寶雙十一大螢幕上盈利額度統計,在一般實時度要求不太嚴格的情況下,Spark Streaming+Flume+Kafka是大資料準實時資料採集的最為可靠並且也是最常用的方案,大資料實時流式資料採集的流程圖如下所示:

在本篇文章中使用Flume+Kafka+Spark Streaming具體實現大資料實時流式資料採集的架構圖如下:

轉發請標明原文地址:原文地址

對Flume,Spark Streaming,Kafka的配置如有任何問題請參考筆者前面的文章:

Flume跨伺服器採集資料

Spark Streaming整合Kafka的兩種方式

Kafka的簡單使用以及原理

開發環境、工具:

  • Linux作業系統,JDK環境,SCALA環境、CDH5版本軟體
  • Spark
  • Kafka_2.10-0.8.2.1
  • Flume-1.5.0-cdh5.3.6-bin
  • Zookeeper-3.4.5

下面我們就開始進行實戰配置:

Flume檔案配置

首先建立兩個配置檔案分別來啟動兩個Agent。

exec-memory-avro.conf:

# The configuration file needs to define the sources, 
# the channels and the sinks.
# Sources, channels and sinks are defined per agent, 
# in this case called 'agent'
f1.sources = r1
f1.channels = c1
f1.sinks = k1

#define sources
f1.sources.r1.type = exec
f1.sources.r1.command =tail -f /opt/datas/flume.log

#define channels
f1.channels.c1.type = memory
f1.channels.c1.capacity = 1000
f1.channels.c1.transactionCapacity = 100

#define sink
f1.sinks.k1.type = avro
f1.sinks.k1.hostname = hadoop-senior.shinelon.com
f1.sinks.k1.port =44444

#bind sources and sink to channel 
f1.sources.r1.channels = c1
f1.sinks.k1.channel = c1
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25

avro-memory-kafka.conf:

# The configuration file needs to define the sources, 
# the channels and the sinks.
# Sources, channels and sinks are defined per agent, 
# in this case called 'agent'
f2.sources = r2
f2.channels = c2
f2.sinks = k2

#define sources
f2.sources.r2.type = avro
f2.sources.r2.bind =hadoop-senior.shinelon.com
f2.sources.r2.port =44444

#define channels
f2.channels.c2.type = memory
f2.channels.c2.capacity = 1000
f2.channels.c2.transactionCapacity = 100

#define sink
f2.sinks.k2.type = org.apache.flume.sink.kafka.KafkaSink
f2.sinks.k2.brokerList = hadoop-senior.shinelon.com:9092
f2.sinks.k2.topic =testSpark
f2.sinks.k2.batchSize=4
f2.sinks.k2.requiredAcks=1

#bind sources and sink to channel 
f2.sources.r2.channels = c2
f2.sinks.k2.channel = c2
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28

上面的配置檔案關鍵需要注意kafka的配置,如有不懂也可參考Flume官方文件的說明。

接著我們啟動Flume,記得首先啟動avro-memory-kafka.conf的Agent:

bin/flume-ng agent 
--conf conf --name f2 \
--conf-file conf/avro-memory-kafka.conf \
-Dflume.root.logger=DEBUG,console

bin/flume-ng agent 
--conf conf --name f1 \
--conf-file conf/exec-memory-avro.conf \
-Dflume.root.logger=DEBUG,console
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9

Kafka配置

注意:在啟動Kafka之前要啟動Zookeeper

下面就是kafka的配置:
server.properties:
主要注意下面幾個引數的配置,其他的引數預設就好。

broker.id=0
port=9092
host.name=hadoop-senior.shinelon.com
log.dirs=/opt/cdh-5.3.6/kafka_2.10-0.8.2.1/kafka_logs
zookeeper.connect=hadoop-senior.shinelon.com:2181
  • 1
  • 2
  • 3
  • 4
  • 5

啟動kafka(以後臺程序的方式啟動):

bin/kafka-server-start.sh -daemon config/server.properties &
  • 1

建立topic:
注意topic的名稱,需要與上面Flume中的配置一致,也要與下面Spark Streaming中設定的一致。

bin/kafka-topics.sh --create --zookeeper hadoop-senior.shinelon.com:2181 --replication-factor 1 --partitions 1 -- 
    topic testSpark
  • 1
  • 2

Spark Streaming配置

首先需要匯入Spark Streaming所需要的jar包並且啟動Spark:

bin/spark-shell --master local[2] --jars \
/opt/cdh-5.3.6/spark-1.3.0-bin-2.5.0-cdh5.3.6/externallibs/spark-streaming-kafka_2.10-1.3.0.jar,/opt/cdh-5.3.6/spark-1.3.0-bin-2.5.0-cdh5.3.6/externallibs/kafka_2.10-0.8.2.1.jar,/opt/cdh-5.3.6/spark-1.3.0-bin-2.5.0-cdh5.3.6/externallibs/kafka-clients-0.8.2.1.jar,/opt/cdh-5.3.6/spark-1.3.0-bin-2.5.0-cdh5.3.6/externallibs/zkclient-0.3.jar,/opt/cdh-5.3.6/spark-1.3.0-bin-2.5.0-cdh5.3.6/externallibs/metrics-core-2.2.0.jar
  • 1
  • 2

接著編寫指令碼啟動Spark Streaming,這個指令碼使用Spark Streaming實現wordCount功能,程式碼如下:
SparkWordCount.scala:

import java.util.HashMap
import org.apache.spark._
import org.apache.spark.streaming._
import org.apache.spark.streaming.StreamingContext._ 
import org.apache.spark.streaming.kafka._

val ssc = new StreamingContext(sc, Seconds(5))

val topicMap = Map("testSpark" -> 1)

// read data
val lines = KafkaUtils.createStream(ssc, "hadoop-senior.shinelon.com:2181", "testWordCountGroup", topicMap).map(_._2)

val words = lines.flatMap(_.split(" "))
val wordCounts = words.map(x => (x, 1)).reduceByKey(_ + _)
wordCounts.print()

ssc.start()             // Start the computation
ssc.awaitTermination()  // Wait for the computation to terminate
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19

執行上面的指令碼就啟動了Spark Streaming(對應指令碼的路徑):

:load /opt/spark/SparkWordCount.scala
  • 1

這時就啟動好了Spark Streaming,至此所有的配置已經完成,所有的伺服器也已經啟動,現在進行測試,在上面Flume中exec中設定的檔案中寫入資料:

[shinelon@hadoop-senior datas]$ echo hadoop spark hadoop saprk hadoop>>flume.log
[shinelon@hadoop-senior datas]$ echo hadoop spark hadoop saprk hadoop>>flume.log
[shinelon@hadoop-senior datas]$ echo hadoop spark hadoop saprk hadoop>>flume.log
[shinelon@hadoop-senior datas]$ echo hadoop spark hadoop saprk hadoop>>flume.log
[shinelon@hadoop-senior datas]$ echo hadoop spark hadoop saprk hadoop>>flume.log
[shinelon@hadoop-senior datas]$ echo hadoop spark hadoop saprk hadoop>>flume.log
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6

可以看見在Spark Streaming中採集到了資料並且進行了計數:

至此,我們就完成了Flume+Kafka+Spark Streaming的整合實現大資料實時流式資料採集,如有任何問題歡迎留言討論。