Spark Streaming使用Flume作為資料來源
1、Spark Streaming 2.3.1適配 Flume 1.6.0,在Spark Streaming 2.3.0之後對flume的支援已被標記為過時。主要由於flume直接對接Spark Streaming 會造成Spark Streaming壓力過大,特別是高峰期的時候(在之間加一層訊息佇列會好得多)。但由於很多公司可能仍然在用,故簡單做一下介紹。
2、有兩種方式可以讓Spark Streaming整合Flume並從Flume接受資料。以下分別介紹這兩種方式。
一、基於Flume的Push模式(Flume-style Push-based Approach)
這種方式Spark Streaming會建立一個Receiver,這個Receiver起到一個相當於Flume的Avro Agent的作用,Flume可以將資料推送到這個Receiver。以下是詳細的配置步驟。
1、一般要求
在你的叢集中選擇一臺機器,這臺機器需要滿足一下條件:
A.當Spark Streaming+Flume的應用程式執行時,有一個Spark的Worker節點執行在這臺機器上。
B.Flume通過配置後可以將資料推送到這臺機器的埠上。
2、配置Flume
通過以下的配置檔案可以將資料傳送到Avro Sink。
agent.sinks = avroSink agent.sinks.avroSink.type = avro agent.sinks.avroSink.channel = memoryChannel agent.sinks.avroSink.hostname = <所選機器的IP> agent.sinks.avroSink.port = <所選機器的埠>
3、配置Spark Streaming應用程式
A.新增依賴
<dependency>
<groupId>org.apache.spark </groupId>
<artifactId>spark-streaming-flume_2.11</artifactId>
<version>${spark.version}</version>
</dependency>
B.在Streaming應用程式的程式碼中,匯入一個FlumeUtils類並建立input DStream。
import org.apache.spark.streaming.flume._ val flumeStream = FlumeUtils.createStream(streamingContext, [所選機器ip], [所選機器埠])
4、測試
A.直接執行程式碼
package com.ruozedata.streaming
import org.apache.spark.SparkConfimport org.apache.spark.streaming.flume.FlumeUtils
import org.apache.spark.streaming.{Seconds, StreamingContext}
object FlumePushApp {
def main(args: Array[String]) {
val Array(hostname, port) = args
val sparkConf = new SparkConf()
.setAppName("FlumePushApp")
.setMaster("local[2]")
val ssc = new StreamingContext(sparkConf, Seconds(10))
val lines = FlumeUtils.createStream(ssc,hostname,port.toInt)
/*由於createStream返回的DStream型別為SparkFlumeEvent,而不是String,故此時split方法無法使用
*為了能夠使用split,我們執行了以下的map操作
*/
lines.map(x => new String(x.event.getBody.array()).trim)
.flatMap(_.split(" ")).map((_,1)).reduceByKey(_+_).print()
ssc.start()
ssc.awaitTermination()
}
}
//以上程式碼執行會出錯,因為args沒有傳引數。
傳參流程:右上角FlumePushApp-->Edit Configurations-->program arguments-->填寫滿足條件的引數
接著:
B.打成jar包上傳執行
先啟動spark-submit
./spark-submit --master local[2] \
--class com.ruozedata.streaming.FlumePullApp \
--packages org.apache.spark:spark-streaming-flume_2.11:2.2.0 \ //相當於新增依賴,需要叢集能夠訪問外網
--name FlumePushApp \
/home/hadoop/lib/train-scala-1.0.jar \
localhost 41414
再啟動flume
./flume-ng agent \
--name a1 \
--conf $FLUME_HOME/conf \
--conf-file $FLUME_HOME/conf/nc-memory-flume.conf \
-Dflume.root.logger=INFO,console
在另一臺客戶端執行 telnet localhost 44444,此時在該客戶端輸入資料,另一客戶端會實時打印出處理結果
//mvn clean packages 打包只包含原始碼不包含依賴包
//以上解決方式需要叢集能夠訪問外網,不能訪問外網時可參照以下步驟
a。把不所有需要打包進來的依賴全部新增<scope>provided</scope>
b。在pom.xml檔案中新增plugin
<plugin>
<artifactId>maven-assembly-plugin</artifactId>
<configuration>
<archive>
<manifest>
<mainClass></mainClass>
</manifest>
</archive>
<descriptorRefs>
<descriptorRef>jar-with-dependencies</descriptorRef>
</descriptorRefs>
</configuration>
</plugin>
c。編譯時使用 mvn assembly:assembly來打包
二、基於自定義sink的pull模式(Pull-based Approach using a Custom Sink)
1、一般要求
不同於Flume直接將資料推送到Spark Streaming中,這種方法自定義一個滿足以下條件的Flume Sink:
- Flume將資料推送到sink中,並且資料保持buffered狀態
- Spark Streaming使用一個可靠的Flume接收器(reliable Flume receiver)和轉換器(transaction)從sink拉取資料.只要當資料被接收並且被Spark Streaming備份後,轉換器才執行成功.
2、新增依賴
groupId = org.apache.spark
artifactId = spark-streaming-flume-sink_2.11
version = 2.3.1
groupId = org.scala-lang
artifactId = scala-library
version = 2.11.8
groupId = org.apache.commons
artifactId = commons-lang3
version = 3.5
3、配置flume
a1.sinks.k1.type = org.apache.spark.streaming.flume.sink.SparkSink
a1.sinks.k1.hostname = localhost
a1.sinks.k1.port = 41414
4、測試
package com.ruozedata.streaming
import org.apache.spark.SparkConf
import org.apache.spark.streaming.flume.FlumeUtils
import org.apache.spark.streaming.{Seconds, StreamingContext}
object FlumePullApp {
def main(args: Array[String]) {
val Array(hostname, port) = args
val sparkConf = new SparkConf()
.setAppName("FlumePullApp")
.setMaster("local[2]")
val ssc = new StreamingContext(sparkConf, Seconds(10))
val lines = FlumeUtils.createPollingStream(ssc, hostname,port.toInt)
lines.map(x => new String(x.event.getBody.array()).trim)
.flatMap(_.split(" ")).map((_,1)).reduceByKey(_+_).print()
ssc.start()
ssc.awaitTermination()
}
}
5、啟動
先啟動Flume
./flume-ng agent \
--name a1 \
--conf $FLUME_HOME/conf \
--conf-file $FLUME_HOME/conf/nc-memory-flume2.conf \
-Dflume.root.logger=INFO,console
然後再另一客戶端啟動 telnet localhost 44444
最後啟動Spark Streaming
./spark-submit --master local[2] \
--class com.ruozedata.streaming.FlumePullApp \
--packages org.apache.spark:spark-streaming-flume_2.11:2.2.0 \
--name FlumePushApp \
/home/hadoop/lib/train-scala-1.0.jar \
localhost 41414
此時在telnet端輸入資料,立馬出處理結果。
第二種方式更好
最後來兩個與題目無關的Spark Streaming小例子
1、黑名單過濾
import org.apache.spark.{SparkConf, SparkContext}
import scala.collection.mutable.ListBuffer/**
* Created by ruozedata on 2018/4/15.
*/
object FilterApp {
def main(args: Array[String]) {
val sparkConf = new SparkConf()
.setAppName("FilterApp")
.setMaster("local[2]")
val sc = new SparkContext(sparkConf)
// (騷馬,true)
val blackTuple = new ListBuffer[(String,Boolean)]
blackTuple.append(("sm",true)) //true為了方便join
val blacksRDD = sc.parallelize(blackTuple) //將陣列轉成RDD
// 準備測試資料: log
val input = new ListBuffer[(String,String)]
input.append(("su","20180808,su,M,20"))
input.append(("kk","20180808,kk,M,20"))
input.append(("sm","20180808,sm,M,20"))
val inputRDD = sc.parallelize(input)
// TODO... 過濾掉黑名單使用者
val joinRDD = inputRDD.leftOuterJoin(blacksRDD)
joinRDD.filter(x => {
x._2._2.getOrElse(false) != true
}).map(_._2._1).foreach(println) //x._2._2為true或none,x._2._1為日誌本身
sc.stop()
}
}
2、改造例子1,將其使用Spark Streaming來完成
package com.ruozedata.streaming
import org.apache.spark.SparkConf
import org.apache.spark.streaming.{Seconds, StreamingContext}
object StreamingFilterApp {
def main(args: Array[String]) {
val sparkConf = new SparkConf()
.setAppName("SocketWordCountApp")
.setMaster("local[2]")
val ssc = new StreamingContext(sparkConf, Seconds(10))
val blacks = List("sm","su")
val blacksRDD = ssc.sparkContext.parallelize(blacks).map(x=>(x,true))
val lines = ssc.socketTextStream("hadoop000",9997)
// (su, log of su)
val clickLogDstream = lines.map(x => (x.split(",")(1), x)).transform(rdd => {
rdd.leftOuterJoin(blacksRDD)
.filter(x => {
// _2 : (string,option) _2:option
x._2._2.getOrElse(false) != true
}).map(_._2._1) //x._2._2為true或none,x._2._1為日誌本身
})
clickLogDstream.print()
ssc.start()
ssc.awaitTermination()
}
}