1. 程式人生 > >Spark和Flume-ng的整合

Spark和Flume-ng的整合

Spark可以和Kafka、Flume、Twitter、ZeroMQ 和TCP sockets等進行整合,經過Spark處理,然後寫入到filesystems、databases和live dashboards中。


從上面的圖片可以清楚的瞭解到各個模組所處的位置。這篇文章主要是講述開發Spark Streaming這塊,因為Flume-ng這塊不需要特別的處理,完全和Flume-ng之間的互動一樣。所有的Spark Streaming程式都是以JavaStreamingContext作為切入點的。如下:

JavaStreamingContext jssc =
    new JavaStreamingContext(master, appName,
                             new Duration(1000),
                             [sparkHome], [jars]);
JavaDStream<SparkFlumeEvent> flumeStream = FlumeUtils.createStream(jssc, host, port);
最後需要呼叫JavaStreamingContext的start方法來啟動這個程式。如下:
jssc.start();
jssc.awaitTermination();
整個程式如下:
package scala;
import org.apache.flume.source.avro.AvroFlumeEvent;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.api.java.function.VoidFunction;
import org.apache.spark.storage.StorageLevel;
import org.apache.spark.streaming.Duration;
import org.apache.spark.streaming.api.java.JavaDStream;
import org.apache.spark.streaming.api.java.JavaStreamingContext;
import org.apache.spark.streaming.flume.FlumeUtils;
import org.apache.spark.streaming.flume.SparkFlumeEvent;
import java.nio.ByteBuffer;
public static void JavaFlumeEventTest(String master, String host, int port) {
        Duration batchInterval = new Duration(2000);
        JavaStreamingContext ssc = new JavaStreamingContext(master,
               "FlumeEventCount", batchInterval,
                System.getenv("SPARK_HOME"),
                JavaStreamingContext.jarOfClass(JavaFlumeEventCount.class));
        StorageLevel storageLevel = StorageLevel.MEMORY_ONLY();
        JavaDStream<SparkFlumeEvent> flumeStream =
                FlumeUtils.createStream(ssc, host, port, storageLevel);
 
        flumeStream.count().map(new Function<java.lang.Long, String>() {
            @Override
            public String call(java.lang.Long in) {
                return "Received " + in + " flume events.";
            }
        }).print();
        ssc.start();
        ssc.awaitTermination();
}
然後開啟Flume往這邊發資料,在Spark的這端可以接收到資料

下面是一段Scala的程式,功能和上面的一樣:

import org.apache.spark.storage.StorageLevel
import org.apache.spark.streaming._
import org.apache.spark.streaming.flume._
import org.apache.spark.util.IntParam

def ScalaFlumeEventTest(master : String, host : String, port : Int) {
    val batchInterval = Milliseconds(2000)
    val ssc = new StreamingContext(master, "FlumeEventCount", batchInterval,
      System.getenv("SPARK_HOME"), StreamingContext.jarOfClass(this.getClass))
    val stream = FlumeUtils.createStream(ssc, host,port,StorageLevel.MEMORY_ONLY)
    stream.count().map(cnt => "Received " + cnt + " flume events." ).print()
    ssc.start()
    ssc.awaitTermination()
}
以上程式都是在Spark tandalone Mode下面執行的,如果你想在YARN上面執行,也是可以的,不過需要做點修改。