Spark Streaming整合flume實戰
阿新 • • 發佈:2018-12-09
Spark Streaming對接Flume有兩種方式
- Poll:Spark Streaming從flume 中拉取資料
- Push:Flume將訊息Push推給Spark Streaming
1、安裝flume1.6以上
2、下載依賴包
spark-streaming-flume-sink_2.11-2.0.2.jar放入到flume的lib目錄下
3、生成資料
伺服器上的 /root/data目錄下準備資料檔案data.txt
vi data.txt hadoop spark hive spark hadoop sqoop flume redis flume hadoop solr kafka solr hadoop
4、配置採集方案
vi flume-poll.conf a1.sources = r1 a1.sinks = k1 a1.channels = c1 #source a1.sources.r1.channels = c1 a1.sources.r1.type = spooldir a1.sources.r1.spoolDir = /root/data a1.sources.r1.fileHeader = true #channel a1.channels.c1.type =memory a1.channels.c1.capacity = 20000 a1.channels.c1.transactionCapacity=5000 #sinks a1.sinks.k1.channel = c1 a1.sinks.k1.type = org.apache.spark.streaming.flume.sink.SparkSink a1.sinks.k1.hostname=hdp-node-01 a1.sinks.k1.port = 8888 a1.sinks.k1.batchSize= 2000
5、新增依賴
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming-flume_2.10</artifactId>
<version>2.0.2</version>
</dependency>
6、程式碼實現
package cn.cheng.spark import java.net.InetSocketAddress import org.apache.spark.storage.StorageLevel import org.apache.spark.streaming.dstream.{DStream, ReceiverInputDStream} import org.apache.spark.streaming.flume.{FlumeUtils, SparkFlumeEvent} import org.apache.spark.streaming.{Seconds, StreamingContext} import org.apache.spark.{SparkConf, SparkContext} /** * sparkStreaming整合flume 拉模式Poll */ object SparkStreaming_Flume_Poll { //newValues 表示當前批次彙總成的(word,1)中相同單詞的所有的1 //runningCount 歷史的所有相同key的value總和 def updateFunction(newValues: Seq[Int], runningCount: Option[Int]): Option[Int] = { val newCount =runningCount.getOrElse(0)+newValues.sum Some(newCount) } def main(args: Array[String]): Unit = { //配置sparkConf引數 val sparkConf: SparkConf = new SparkConf().setAppName("SparkStreaming_Flume_Poll").setMaster("local[2]") //構建sparkContext物件 val sc: SparkContext = new SparkContext(sparkConf) //構建StreamingContext物件,每個批處理的時間間隔 val scc: StreamingContext = new StreamingContext(sc, Seconds(5)) //設定checkpoint scc.checkpoint("./") //設定flume的地址,可以設定多臺 val address=Seq(new InetSocketAddress("192.168.200.160",8888)) // 從flume中拉取資料 val flumeStream: ReceiverInputDStream[SparkFlumeEvent] = FlumeUtils.createPollingStream(scc,address,StorageLevel.MEMORY_AND_DISK) //獲取flume中資料,資料存在event的body中,轉化為String val lineStream: DStream[String] = flumeStream.map(x=>new String(x.event.getBody.array())) //實現單詞彙總 val result: DStream[(String, Int)] = lineStream.flatMap(_.split(" ")).map((_,1)).updateStateByKey(updateFunction) result.print() scc.start() scc.awaitTermination() } }
7、啟動flume
flume-ng agent -n a1 -c /opt/bigdata/flume/conf -f /opt/bigdata/flume/conf/flume-poll.conf -Dflume.root.logger=INFO,console
8、啟動spark-streaming應用程式
9、檢視結果
flume將訊息Push推給Spark Streaming
1、配置採集方案
vi flume-push.conf
#push mode
a1.sources = r1
a1.sinks = k1
a1.channels = c1
#source
a1.sources.r1.channels = c1
a1.sources.r1.type = spooldir
a1.sources.r1.spoolDir = /root/data
a1.sources.r1.fileHeader = true
#channel
a1.channels.c1.type =memory
a1.channels.c1.capacity = 20000
a1.channels.c1.transactionCapacity=5000
#sinks
a1.sinks.k1.channel = c1
a1.sinks.k1.type = avro
a1.sinks.k1.hostname=172.16.43.63
a1.sinks.k1.port = 8888
a1.sinks.k1.batchSize= 2000
注意配置檔案中指明的hostname和port是spark應用程式所在伺服器的ip地址和埠。
2、程式碼實現
package cn.cheng.spark
import java.net.InetSocketAddress
import org.apache.spark.storage.StorageLevel
import org.apache.spark.streaming.dstream.{DStream, ReceiverInputDStream}
import org.apache.spark.streaming.flume.{FlumeUtils, SparkFlumeEvent}
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.{SparkConf, SparkContext}
/**
* sparkStreaming整合flume 推模式Push
*/
object SparkStreaming_Flume_Push {
//newValues 表示當前批次彙總成的(word,1)中相同單詞的所有的1
//runningCount 歷史的所有相同key的value總和
def updateFunction(newValues: Seq[Int], runningCount: Option[Int]): Option[Int] = {
val newCount =runningCount.getOrElse(0)+newValues.sum
Some(newCount)
}
def main(args: Array[String]): Unit = {
//配置sparkConf引數
val sparkConf: SparkConf = new SparkConf().setAppName("SparkStreaming_Flume_Push").setMaster("local[2]")
//構建sparkContext物件
val sc: SparkContext = new SparkContext(sparkConf)
//構建StreamingContext物件,每個批處理的時間間隔
val scc: StreamingContext = new StreamingContext(sc, Seconds(5))
//設定日誌輸出級別
sc.setLogLevel("WARN")
//設定檢查點目錄
scc.checkpoint("./")
//flume推資料過來
// 當前應用程式部署的伺服器ip地址,跟flume配置檔案保持一致
val flumeStream: ReceiverInputDStream[SparkFlumeEvent] = FlumeUtils.createStream(scc,"172.16.43.63",8888,StorageLevel.MEMORY_AND_DISK)
//獲取flume中資料,資料存在event的body中,轉化為String
val lineStream: DStream[String] = flumeStream.map(x=>new String(x.event.getBody.array()))
//實現單詞彙總
val result: DStream[(String, Int)] = lineStream.flatMap(_.split(" ")).map((_,1)).updateStateByKey(updateFunction)
result.print()
scc.start()
scc.awaitTermination()
}
}
}
3、啟動spark-streaming應用程式
4、生成資料
cp data.txt data2.txt
5、啟動flume
flume-ng agent -n a1 -c /opt/bigdata/flume/conf -f /opt/bigdata/flume/conf/flume-push.conf -Dflume.root.logger=INFO,console