<Spark Streaming><Flume><Integration>
阿新 • • 發佈:2017-05-09
uri min 取數 nts general ora span int from
Overview
- Flume:一個分布式的,可靠的,可用的服務,用於有效地收集、聚合、移動大規模日誌數據
- 我們搭建一個flume + Spark Streaming的平臺來從Flume獲取數據,並處理它。
- 有兩種方法實現:使用flume-style的push-based方法,或者使用自定義的sink來實現pull-based方法。
Approach 1: Flume-style Push-based Approach
- flume被設計用來在Flume agents之間推信息,在這種方式下,Spark Streaming安裝一個receiver that acts like an Avro agent for Flume, to which Flume can push the data.
General Requirement
- 當你啟動flume + spark streaming應用時,該機器上必須運行一個Spark workers。
- flume可以向該機器的某一個port push數據。
- 基於這種push機制,streaming應用必須有一個receiver scheduled and listening on the chosen port.
Configuring Flume
- 配置flume以向Avro sink發送數據
agent.sinks = avroSink agent.sinks.avroSink.type = avro agent.sinks.avroSink.channel = memoryChannel agent.sinks.avroSink.hostname =View Code<chosen machine‘s hostname> agent.sinks.avroSink.port = <chosen port on the machine>
Configuring Spark Streaming Application
- Linking: 在maven項目中配置依賴
<dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-streaming-flume-sink_2.10</artifactId> <version>2.1.0</version> </dependency>
2. Programming:import FlumeUtils, 創建input DStream
import org.apache.spark.streaming.flume._ val flumeStream = FlumeUtils.createStream(streamingContext, [chosen machine‘s hostname], [chosen port])
- 註意:應該與cluster中的resourceManager使用同一個hostname,這樣的話資源分配可以匹配names,並在正確的機器上launch receiver
- 一個簡單的Spark Streaming統計Flume event個數的demo代碼:
object FlumeEventCount { def main(args: Array[String]) { if (args.length < 2) { System.err.println( "Usage: FlumeEventCount <host> <port>") System.exit(1) } StreamingExamples.setStreamingLogLevels() val Array(host, IntParam(port)) = args val batchInterval = Milliseconds(2000) // Create the context and set the batch size val sparkConf = new SparkConf().setAppName("FlumeEventCount") val ssc = new StreamingContext(sparkConf, batchInterval) // Create a flume stream val stream = FlumeUtils.createStream(ssc, host, port, StorageLevel.MEMORY_ONLY_SER_2) // Print out the count of events received from this server in each batch stream.count().map(cnt => "Received " + cnt + " flume events." ).print() ssc.start() ssc.awaitTermination() } }View Code
<Spark Streaming><Flume><Integration>