Flume+Kafka+Spark Streaming實現大資料實時流式資料採集
大資料實時流式資料處理是大資料應用中最為常見的場景,與我們的生活也息息相關,以手機流量實時統計來說,它總是能夠實時的統計出使用者的使用的流量,在第一時間通知使用者流量的使用情況,並且最為人性化的為使用者提供各種優惠的方案,如果採用離線處理,那麼等到使用者流量超標了才通知使用者,這樣會使得使用者體驗滿意度降低,這也是這幾年大資料實時流處理的進步,淡然還有很多應用場景。因此Spark Streaming應用而生,不過對於實時我們應該準確理解,需要明白的一點是Spark Streaming不是真正的實時處理,更應該成為準實時,因為它有延遲,而真正的實時處理Storm更為適合,最為典型場景的是淘寶雙十一大螢幕上盈利額度統計,在一般實時度要求不太嚴格的情況下,Spark Streaming+Flume+Kafka是大資料準實時資料採集的最為可靠並且也是最常用的方案,大資料實時流式資料採集的流程圖如下所示:
在本篇文章中使用Flume+Kafka+Spark Streaming具體實現大資料實時流式資料採集的架構圖如下:
轉發請標明原文地址:原文地址
對Flume,Spark Streaming,Kafka的配置如有任何問題請參考筆者前面的文章:
開發環境、工具:
- Linux作業系統,JDK環境,SCALA環境、CDH5版本軟體
- Spark
- Kafka_2.10-0.8.2.1
- Flume-1.5.0-cdh5.3.6-bin
- Zookeeper-3.4.5
下面我們就開始進行實戰配置:
Flume檔案配置
首先建立兩個配置檔案分別來啟動兩個Agent。
exec-memory-avro.conf:
# The configuration file needs to define the sources, # the channels and the sinks. # Sources, channels and sinks are defined per agent, # in this case called 'agent' f1.sources = r1 f1.channels = c1 f1.sinks = k1 #define sources f1.sources.r1.type = exec f1.sources.r1.command =tail -f /opt/datas/flume.log #define channels f1.channels.c1.type = memory f1.channels.c1.capacity = 1000 f1.channels.c1.transactionCapacity = 100 #define sink f1.sinks.k1.type = avro f1.sinks.k1.hostname = hadoop-senior.shinelon.com f1.sinks.k1.port =44444 #bind sources and sink to channel f1.sources.r1.channels = c1 f1.sinks.k1.channel = c1
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
- 14
- 15
- 16
- 17
- 18
- 19
- 20
- 21
- 22
- 23
- 24
- 25
avro-memory-kafka.conf:
# The configuration file needs to define the sources,
# the channels and the sinks.
# Sources, channels and sinks are defined per agent,
# in this case called 'agent'
f2.sources = r2
f2.channels = c2
f2.sinks = k2
#define sources
f2.sources.r2.type = avro
f2.sources.r2.bind =hadoop-senior.shinelon.com
f2.sources.r2.port =44444
#define channels
f2.channels.c2.type = memory
f2.channels.c2.capacity = 1000
f2.channels.c2.transactionCapacity = 100
#define sink
f2.sinks.k2.type = org.apache.flume.sink.kafka.KafkaSink
f2.sinks.k2.brokerList = hadoop-senior.shinelon.com:9092
f2.sinks.k2.topic =testSpark
f2.sinks.k2.batchSize=4
f2.sinks.k2.requiredAcks=1
#bind sources and sink to channel
f2.sources.r2.channels = c2
f2.sinks.k2.channel = c2
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
- 14
- 15
- 16
- 17
- 18
- 19
- 20
- 21
- 22
- 23
- 24
- 25
- 26
- 27
- 28
上面的配置檔案關鍵需要注意kafka的配置,如有不懂也可參考Flume官方文件的說明。
接著我們啟動Flume,記得首先啟動avro-memory-kafka.conf的Agent:
bin/flume-ng agent
--conf conf --name f2 \
--conf-file conf/avro-memory-kafka.conf \
-Dflume.root.logger=DEBUG,console
bin/flume-ng agent
--conf conf --name f1 \
--conf-file conf/exec-memory-avro.conf \
-Dflume.root.logger=DEBUG,console
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
Kafka配置
注意:在啟動Kafka之前要啟動Zookeeper
下面就是kafka的配置:
server.properties:
主要注意下面幾個引數的配置,其他的引數預設就好。
broker.id=0
port=9092
host.name=hadoop-senior.shinelon.com
log.dirs=/opt/cdh-5.3.6/kafka_2.10-0.8.2.1/kafka_logs
zookeeper.connect=hadoop-senior.shinelon.com:2181
- 1
- 2
- 3
- 4
- 5
啟動kafka(以後臺程序的方式啟動):
bin/kafka-server-start.sh -daemon config/server.properties &
- 1
建立topic:
注意topic的名稱,需要與上面Flume中的配置一致,也要與下面Spark Streaming中設定的一致。
bin/kafka-topics.sh --create --zookeeper hadoop-senior.shinelon.com:2181 --replication-factor 1 --partitions 1 --
topic testSpark
- 1
- 2
Spark Streaming配置
首先需要匯入Spark Streaming所需要的jar包並且啟動Spark:
bin/spark-shell --master local[2] --jars \
/opt/cdh-5.3.6/spark-1.3.0-bin-2.5.0-cdh5.3.6/externallibs/spark-streaming-kafka_2.10-1.3.0.jar,/opt/cdh-5.3.6/spark-1.3.0-bin-2.5.0-cdh5.3.6/externallibs/kafka_2.10-0.8.2.1.jar,/opt/cdh-5.3.6/spark-1.3.0-bin-2.5.0-cdh5.3.6/externallibs/kafka-clients-0.8.2.1.jar,/opt/cdh-5.3.6/spark-1.3.0-bin-2.5.0-cdh5.3.6/externallibs/zkclient-0.3.jar,/opt/cdh-5.3.6/spark-1.3.0-bin-2.5.0-cdh5.3.6/externallibs/metrics-core-2.2.0.jar
- 1
- 2
接著編寫指令碼啟動Spark Streaming,這個指令碼使用Spark Streaming實現wordCount功能,程式碼如下:
SparkWordCount.scala:
import java.util.HashMap
import org.apache.spark._
import org.apache.spark.streaming._
import org.apache.spark.streaming.StreamingContext._
import org.apache.spark.streaming.kafka._
val ssc = new StreamingContext(sc, Seconds(5))
val topicMap = Map("testSpark" -> 1)
// read data
val lines = KafkaUtils.createStream(ssc, "hadoop-senior.shinelon.com:2181", "testWordCountGroup", topicMap).map(_._2)
val words = lines.flatMap(_.split(" "))
val wordCounts = words.map(x => (x, 1)).reduceByKey(_ + _)
wordCounts.print()
ssc.start() // Start the computation
ssc.awaitTermination() // Wait for the computation to terminate
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
- 14
- 15
- 16
- 17
- 18
- 19
執行上面的指令碼就啟動了Spark Streaming(對應指令碼的路徑):
:load /opt/spark/SparkWordCount.scala
- 1
這時就啟動好了Spark Streaming,至此所有的配置已經完成,所有的伺服器也已經啟動,現在進行測試,在上面Flume中exec中設定的檔案中寫入資料:
[shinelon@hadoop-senior datas]$ echo hadoop spark hadoop saprk hadoop>>flume.log
[shinelon@hadoop-senior datas]$ echo hadoop spark hadoop saprk hadoop>>flume.log
[shinelon@hadoop-senior datas]$ echo hadoop spark hadoop saprk hadoop>>flume.log
[shinelon@hadoop-senior datas]$ echo hadoop spark hadoop saprk hadoop>>flume.log
[shinelon@hadoop-senior datas]$ echo hadoop spark hadoop saprk hadoop>>flume.log
[shinelon@hadoop-senior datas]$ echo hadoop spark hadoop saprk hadoop>>flume.log
- 1
- 2
- 3
- 4
- 5
- 6
可以看見在Spark Streaming中採集到了資料並且進行了計數:
至此,我們就完成了Flume+Kafka+Spark Streaming的整合實現大資料實時流式資料採集,如有任何問題歡迎留言討論。