SparkStreaming(11):高階資料來源flume-pull方式(生產)
阿新 • • 發佈:2018-11-08
1.環境
(1)生產環境
flume1.6.0
spark2.1.0
(2)下載對應依賴
備註:一定要將依賴都放入flume的Flume’s classpath內,否則flume執行有問題。(遇到過坑~~~)
(i) Custom sink JAR: groupId = org.apache.spark artifactId = spark-streaming-flume-sink_2.11 version = 2.1.0 (ii) Scala library JAR: groupId = org.scala-lang artifactId = scala-library version = 2.11.7 (iii) Commons Lang 3 JAR: groupId = org.apache.commons artifactId = commons-lang3 version = 3.5
2.fluem的配置檔案flume_pull_streaming.conf
simple-agent.sources = netcat-source simple-agent.sinks = spark-sink simple-agent.channels = memory-channel simple-agent.sources.netcat-source.type = netcat simple-agent.sources.netcat-source.bind = bigdata.ibeifeng.com simple-agent.sources.netcat-source.port = 44444 simple-agent.sinks.spark-sink.type = org.apache.spark.streaming.flume.sink.SparkSink simple-agent.sinks.spark-sink.hostname = bigdata.ibeifeng.com simple-agent.sinks.spark-sink.port = 41414 simple-agent.channels.memory-channel.type = memory simple-agent.sources.netcat-source.channels = memory-channel simple-agent.sinks.spark-sink.channel = memory-channel
3.scala程式碼
package Spark import org.apache.spark.SparkConf import org.apache.spark.streaming.flume.FlumeUtils import org.apache.spark.streaming.{Seconds, StreamingContext} /** * sparkstreaming 整合flume的第二種方式 */ object FlumePullWordCount_product_server { def main(args: Array[String]): Unit = { //實際生產使用 if(args.length!=2){ System.err.println("Usage:FlumePullWordCount_product <hostname><port>") System.exit(1) } val Array(hostname,port)=args var sparkConf=new SparkConf() //.setMaster("local[2]").setAppName("FlumePullWordCount_product") val ssc=new StreamingContext(sparkConf,Seconds(5)) //TODO:如何使用Sparkfluming 整合flume val flumeStream= FlumeUtils.createPollingStream(ssc,hostname,port.toInt) flumeStream.map(x=>new String(x.event.getBody.array()).trim) .flatMap(_.split(" ")).map((_,1)).reduceByKey(_+_).print() ssc.start() ssc.awaitTermination() } }
4.測試
(1)將程式碼打包
(2)啟動flume
bin/flume-ng agent \
--name simple-agent \
--conf conf \
--conf-file conf/flume_pull_streaming.conf \
-Dflume.root.logger=INFO,console
(3)啟動telnet
telnet bigdata.ibeifeng.com 44444
(4)開啟hdfs(如不開啟,會報錯)
(5)提交spark任務
bin/spark-submit \
--class Spark.FlumePullWordCount_product_server \
--master local[2] \
--packages org.apache.spark:spark-streaming-flume_2.11:2.1.0 \
/opt/datas/lib/scalaProjectMaven.jar \
bigdata.ibeifeng.com 41414
(6)telnet測試輸入
OK
s d f s
OK
sd fd f
OK
(結果,成功!)