flume spark streaming配置詳解
Apache Flume是一個用來有效地收集,聚集和移動大量日誌資料的分散式的,有效的服務。這裡我們解釋一下怎樣配置Flume和Spark Streaming來從Flume獲取資料,我們讓flume直接將收集到的資料傳送到spark streaming去處理
package com.pinganfu.flumespark import org.apache.spark.SparkConf import org.apache.spark.storage.StorageLevel import org.apache.spark.streaming.flume.FlumeUtils import org.apache.spark.streaming.{Milliseconds, StreamingContext} object SparkFlumeEvent{ def main(args: Array[String]) { val batchInterval = Milliseconds(5000) val sparkConf = new SparkConf().setAppName("flumetospark").setMaster("local[2]") val ssc = new StreamingContext(sparkConf, batchInterval) val stream = FlumeUtils.createStream(ssc,"localhost",33333, StorageLevel.MEMORY_AND_DISK) stream.count().map(cnt => "Received " + cnt + " flume events." ).print() ssc.start() ssc.awaitTermination() } }
Flume配置
a1.channels = c1 a1.sinks = k1 a1.sources = r1 a1.sinks.k1.type = avro a1.sinks.k1.channel = c1 a1.sinks.k1.hostname = localhost a1.sinks.k1.port = 33333 a1.sources.r1.type = netcat a1.sources.r1.bind = localhost a1.sources.r1.port = 44444 a1.sources.r1.channels = c1 a1.channels.c1.type = memory a1.channels.c1.capacity = 1000 a1.channels.c1.transactionCapacity = 100
啟動三個視窗
先啟動spark-shell(引入兩個依賴包)
bin/spark-shell --jars lib/spark-streaming-flume_2.10-1.0.0.jar,lib/flume-ng-sdk-1.6.0.jar
然後將上面程式碼貼進去
再啟動flume
bin/flume-ng agent --conf conf --conf-file conf/flume.conf --name a1 -Dflume.root.logger=INFO,console
再啟動telnet localhost 44444
發資料
截圖如下: