Spark學習筆記(15)——Spark Streaming 整合 Flume
阿新 • • 發佈:2018-11-06
1 flume 配置檔案
在 flume-env.sh 裡配置 JAVA_HOME
1.1 flume-pull.conf
# Name the components on this agent a1.sources = r1 a1.sinks = k1 a1.channels = c1 # source a1.sources.r1.type = spooldir a1.sources.r1.spoolDir = /export/data/flume a1.sources.r1.fileHeader = true # Describe the sink a1.sinks.k1.type = org.apache.spark.streaming.flume.sink.SparkSink a1.sinks.k1.hostname = node1 a1.sinks.k1.port = 8888 # Use a channel which buffers events in memory a1.channels.c1.type = memory a1.channels.c1.capacity = 1000 a1.channels.c1.transactionCapacity = 100 # Bind the source and sink to the channel a1.sources.r1.channels = c1 a1.sinks.k1.channel = c1
1.2 flume-push.conf
# Name the components on this agent a1.sources = r1 a1.sinks = k1 a1.channels = c1 # source a1.sources.r1.type = spooldir a1.sources.r1.spoolDir = /export/data/flume a1.sources.r1.fileHeader = true # Describe the sink a1.sinks.k1.type = avro #這是接收方 a1.sinks.k1.hostname = 192.168.30.1 a1.sinks.k1.port = 8888 # Use a channel which buffers events in memory a1.channels.c1.type = memory a1.channels.c1.capacity = 1000 a1.channels.c1.transactionCapacity = 100 # Bind the source and sink to the channel a1.sources.r1.channels = c1 a1.sinks.k1.channel = c1
2 push 案例
2.1 pom
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming-flume_2.10</artifactId>
<version>1.6.3</version>
</dependency>
2.2 原始碼
package streamingAndflume
import mystreaming. LoggerLevels
import org.apache.spark.SparkConf
import org.apache.spark.streaming.flume.FlumeUtils
import org.apache.spark.streaming.{Seconds, StreamingContext}
object FlumePushCount {
def main(args: Array[String]): Unit = {
LoggerLevels.setStreamingLogLevels()
val conf = new SparkConf().setAppName("FlumePush").setMaster("local[2]")
val ssc = new StreamingContext(conf,Seconds(5))
//flume 向 spark 傳送資訊,此處IP地址是本機電腦IP
val flumeStream = FlumeUtils.createStream(ssc,"192.168.30.1",8888)
//flume 中的資料通過event.getBody()才能拿到真正的內容
val words = flumeStream.flatMap(x=>new String(x.event.getBody().array()).split(" ")).map((_,1))
val results = words.reduceByKey(_+_)
results.print()
ssc.start()
ssc.awaitTermination()
}
}
2.3 啟動 flume
在 /export/data/flume
放一些 資料
[[email protected] apache-flume-1.6.0-bin]$ bin/flume-ng agent -n a1 -c conf -f conf/flume-push.conf
3 poll 方式
需要為在flume lib 下新增相應的 jar 包
參考連結https://spark.apache.org/docs/latest/streaming-flume-integration.html
3.1 poll原始碼
package streamingAndflume
import java.net.InetSocketAddress
import mystreaming.LoggerLevels
import org.apache.spark.SparkConf
import org.apache.spark.storage.StorageLevel
import org.apache.spark.streaming.flume.FlumeUtils
import org.apache.spark.streaming.{Seconds, StreamingContext}
object FlumePollCount {
def main(args: Array[String]): Unit = {
LoggerLevels.setStreamingLogLevels()
val conf = new SparkConf().setAppName("FlumePush").setMaster("local[2]")
val ssc = new StreamingContext(conf, Seconds(5))
//從 flume 中拉取資料,flume的地址,這裡可以傳很多地址
val address = Seq(new InetSocketAddress("node1",8888))
val flumeStream = FlumeUtils.createPollingStream(ssc,address,StorageLevel.MEMORY_AND_DISK)
val words = flumeStream.flatMap(x=>new String(x.event.getBody().array()).split(" ")).map((_,1))
val results = words.reduceByKey(_+_)
results.print()
ssc.start()
ssc.awaitTermination()
}
}
3.2 先啟動flume,再啟動程式
[[email protected] apache-flume-1.6.0-bin]$ bin/flume-ng agent -n a1 -c conf -f conf/flume-pull.conf
4 pull 方式在叢集中執行
啟動 Spark 叢集[[email protected] apache-flume-1.6.0-bin]$ /home/hadoop/apps/spark-1.6.3-bin-hadoop2.6/sbin/start-all.sh
4.1 原始碼
原始碼用 maven 打包
package streamingAndflume
import mystreaming.LoggerLevels
import org.apache.spark.SparkConf
import org.apache.spark.streaming.flume.FlumeUtils
import org.apache.spark.streaming.{Seconds, StreamingContext}
object FlumePushCount {
def main(args: Array[String]): Unit = {
LoggerLevels.setStreamingLogLevels()
val host = args(0)
val port = args(1).toInt
val conf = new SparkConf().setAppName("FlumePush")
val ssc = new StreamingContext(conf,Seconds(5))
//flume 向 spark 傳送資訊,此處IP地址是本機電腦IP
val flumeStream = FlumeUtils.createStream(ssc,host,port)
//flume 中的資料通過event.getBody()才能拿到真正的內容
val words = flumeStream.flatMap(x=>new String(x.event.getBody().array()).split(" ")).map((_,1))
val results = words.reduceByKey(_+_)
results.print()
ssc.start()
ssc.awaitTermination()
}
}
4.2 啟動 jar
修改 flume-push.conf
a1.sinks.k1.hostname = node2
[[email protected] ~]$ /home/hadoop/apps/spark-1.6.3-bin-hadoop2.6/bin/spark-submit --master spark://node1:7077 --class streamingAndflume.FlumePushCount /home/hadoop/push.jar node2 8888
4.3 啟動 flume
[[email protected] apache-flume-1.6.0-bin]$ bin/flume-ng agent -n a1 -c conf -f conf/flume-pull.conf