Spark Streaming 2.3.2整合Flume
導讀:在Spark Streaming整合Flume文件中,官方提供兩種方式,push和pull
Flume是Spark Streaming的高階資料來源之一直達車
Spark Streaming整合Flume官方文件直達車
如果你對Flume不熟悉,這裡是我記錄的Flume的基本教程直達車,歡迎到訪
該文例項程式碼我的碼雲直達車
一、概述
Apache Flume是一種分散式,可靠且可用的服務,用於高效收集,聚合和移動大量日誌資料。在這裡,我們將解釋如何配置Flume和Spark Streaming以從Flume接收資料。這有兩種方法。
注意:從Spark 2.3.0開始,不推薦使用Flume支援。
二、Flume-style Push-based Approach
直達車
Flume旨在推動Flume agent之間的資料。在這種方法中,Spark Streaming基本上設定了一個接收器,它作為Flume的Avro代理,Flume可以將資料推送過來。
由於是推送模型,Spark Streaming應用程式需要先啟動,接收器在所選埠上進行排程和監聽,以便Flume能夠推送資料。
本地開發測試
1)conf配置,直達車,這裡我配置的使用者本地測試的,後面會提到如何跑在Spark上
cd $FLUME_HOME/conf
編輯 vim flume-push-streaming.conf
flume-push-streaming.sources = netcat-source flume-push-streaming.sinks = avro-sink flume-push-streaming.channels = memory-channel flume-push-streaming.sources.netcat-source.type = netcat flume-push-streaming.sources.netcat-source.bind = hadoop000 flume-push-streaming.sources.netcat-source.port = 44444 flume-push-streaming.sinks.avro-sink.type = avro flume-push-streaming.sinks.avro-sink.hostname = 192.168.31.31 flume-push-streaming.sinks.avro-sink.port = 44445 flume-push-streaming.channels.memory-channel.type = memory flume-push-streaming.sources.netcat-source.channels = memory-channel flume-push-streaming.sinks.avro-sink.channel = memory-channel
注意:hadoop000是我linux系統的hostname,192.168.31.31是我windows的ip
2)程式碼,直達車
object FlumePushWordCountTest { def main(args: Array[String]): Unit = { if (args.length != 2) { System.err.println("Usage: FlumePushWordCountTest <hostname> <port>") System.exit(1) } val Array(hostname, port) = args val sparkConf = new SparkConf().setMaster("local[2]").setAppName("FlumePushWordCountTest") val ssc = new StreamingContext(sparkConf, Seconds(5)) ssc.sparkContext.setLogLevel("ERROR") val flumeStream = FlumeUtils.createStream(ssc, hostname, port.toInt) flumeStream.map(x => new String(x.event.getBody.array()).trim) .flatMap(_.split(" ")) .map((_, 1)) .reduceByKey(_ + _) .print() ssc.start() ssc.awaitTermination() } }
3)本地執行Spark Streaming,執行引數為:0.0.0.0
和 44445
,因為上面的conf中配置了sink到我的windows的44445埠
4)之後linux伺服器執行Flume,如果你對Flume不熟悉,這裡是我記錄的Flume的基本教程直達車,歡迎到訪
flume-ng agent \
--name flume-push-streaming \
--conf $FLUME_HOME/conf \
--conf-file $FLUME_HOME/conf/flume-push-streaming.conf \
-Dflume.root.logger=INFO,console
5)啟動 telnet hadoop000 44444
輸入資料,檢視結果
伺服器環境
1)部署到伺服器環境執行(先執行jar包,再執行flume,和上面的本地操作流程一樣)
前提:上面是本地測試的,sink到的是我windows的ip。所以上面的conf檔案中sink得改一下
flume-push-streaming.sinks.avro-sink.hostname = hadoop000
2)打包,mvn clean package -DskipTests
3)上傳jar包到伺服器
4)執行jar包命令
./bin/spark-submit \
--class com.imooc.spark.streaming.flume.FlumePushWordCountTest \
--name FlumePushWordCountTest \
--master local[2] \
--packages org.apache.spark:spark-streaming-flume_2.11:2.3.2 \
/root/lib/spark-sql-1.0-jar-with-dependencies.jar \
hadoop000 44445
由於上面打包依賴沒有打進去,這裡指定一下(–packages)就可以了,執行的時候會自動幫你下載依賴,注意聯網。注意jar包路徑
提示:使用 maven-assembly-plugin
外掛可以把自己想要的包打進去。
三、Pull-based Approach using a Custom Sink
直達車
和第一種不一樣,Flume不將資料直接推送到Spark Streaming。
而是
Flume將資料推入接收器,資料保持緩衝 ,Spark Streaming使用可靠的Flume接收器 和事務從接收器中提取資料。只有在Spark Streaming接收和複製資料後,事務才會成功 。(我們自己取拿資料處理)
和第一種push相比,這一種具有更強的可靠性和容錯性
1)conf配置,直達車,這裡我配置的使用者本地測試的,如何跑在Spark上和第一種方式一模一樣
cd $FLUME_HOME/conf
vim flume-pull-streaming.conf
flume-pull-streaming.sources = netcat-source
flume-pull-streaming.sinks = spark-sink
flume-pull-streaming.channels = memory-channel
flume-pull-streaming.sources.netcat-source.type = netcat
flume-pull-streaming.sources.netcat-source.bind = hadoop000
flume-pull-streaming.sources.netcat-source.port = 44444
flume-pull-streaming.sinks.spark-sink.type = org.apache.spark.streaming.flume.sink.SparkSink
flume-pull-streaming.sinks.spark-sink.hostname = hadoop000
flume-pull-streaming.sinks.spark-sink.port = 44445
flume-pull-streaming.channels.memory-channel.type = memory
flume-pull-streaming.sources.netcat-source.channels = memory-channel
flume-pull-streaming.sinks.spark-sink.channel = memory-channel
注意:hadoop000是我linux的hostname
2)程式碼,直達車
object FlumePullWordCountTest {
def main(args: Array[String]): Unit = {
if (args.length != 2) {
System.err.println("Usage: FlumePushWordCountTest <hostname> <port>")
System.exit(1)
}
val Array(hostname, port) = args
val sparkConf = new SparkConf().setMaster("local[2]").setAppName("FlumePullWordCountTest")
val ssc = new StreamingContext(sparkConf, Seconds(5))
ssc.sparkContext.setLogLevel("ERROR")
val flumeStream = FlumeUtils.createPollingStream(ssc, hostname, port.toInt)
flumeStream.map(x => new String(x.event.getBody.array()).trim)
.flatMap(_.split(" "))
.map((_, 1))
.reduceByKey(_ + _)
.print()
ssc.start()
ssc.awaitTermination()
}
}
3)先啟動flume,後啟動spark streaming應用
4)執行flume
flume-ng agent \
--name flume-pull-streaming \
--conf $FLUME_HOME/conf \
--conf-file $FLUME_HOME/conf/flume-pull-streaming.conf \
-Dflume.root.logger=INFO,console
5)本地執行Spark Streaming,執行引數為 192.168.31.30
44445
,分別是我linux的ip和埠
6)啟動 telnet hadoop000 44444
輸入資料,檢視結果