1. 程式人生 > >Spark和Flume-ng整合

Spark和Flume-ng整合

如何將Flune-ng裡面的資料傳送到Spark,利用Spark進行實時的分析計算。本文將通過Java和Scala版本的程式進行程式的測試。

Spark和Flume-ng的整合屬於Spark的Streaming這塊。在講述如何使用Spark Streaming之前,我們先來了解一下什麼是Spark Streaming,在Spark官方文件是這麼描述的(英文我就不翻譯了,裡面沒有很複雜的語句):

Spark Streaming is an extension of the core Spark API that allows enables high-throughput, fault-tolerant stream processing of live data streams. Data can be ingested from many sources like Kafka, Flume, Twitter, ZeroMQ or plain old TCP sockets and be processed using complex algorithms expressed with high-level functions like map

reducejoinand window. Finally, processed data can be pushed out to filesystems, databases, and live dashboards.

從上面的說明中我們可以看出Spark可以和Kafka、Flume、Twitter、ZeroMQ 和TCP sockets等進行整合,經過Spark處理,然後寫入到filesystems、databases和live dashboards中。引用官方文件上面的一副圖:

從上面的圖片可以清楚的瞭解到各個模組所處的位置。這篇文章主要是講述開發Spark Streaming這塊,因為Flume-ng這塊不需要特別的處理,完全和Flume-ng之間的互動一樣。所有的Spark Streaming程式都是以JavaStreamingContext作為切入點的。如下:

JavaStreamingContext jssc = 
    new JavaStreamingContext(master, appName, 
                             new Duration(1000), 
                             [sparkHome], [jars]);
JavaDStream<SparkFlumeEvent> flumeStream = 
                             FlumeUtils.createStream(jssc, host, port);

最後需要呼叫JavaStreamingContext的start方法來啟動這個程式。如下:

jssc.start();
jssc.awaitTermination();

java版整個程式碼如下:

package scala;
 
import org.apache.flume.source.avro.AvroFlumeEvent;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.api.java.function.VoidFunction;
import org.apache.spark.storage.StorageLevel;
import org.apache.spark.streaming.Duration;
import org.apache.spark.streaming.api.java.JavaDStream;
import org.apache.spark.streaming.api.java.JavaStreamingContext;
import org.apache.spark.streaming.flume.FlumeUtils;
import org.apache.spark.streaming.flume.SparkFlumeEvent;
import java.nio.ByteBuffer;
 

public static void JavaFlumeEventTest(String master, String host, int port) {
        Duration batchInterval = new Duration(2000);
 
        JavaStreamingContext ssc = new JavaStreamingContext(master, 
               "FlumeEventCount", batchInterval,
                System.getenv("SPARK_HOME"),
                JavaStreamingContext.jarOfClass(JavaFlumeEventCount.class));
        StorageLevel storageLevel = StorageLevel.MEMORY_ONLY();
        JavaDStream<SparkFlumeEvent> flumeStream = 
                FlumeUtils.createStream(ssc, host, port, storageLevel);
 
        flumeStream.count().map(new Function<java.lang.Long, String>() {
            @Override
            public String call(java.lang.Long in) {
                return "Received " + in + " flume events.";
            }
        }).print();
 
        ssc.start();
        ssc.awaitTermination();
}

然後開啟Flume往這邊發資料,在Spark的這端可以接收到資料。

下面是一段Scala的程式:

def ScalaFlumeEventTest(master : String, host : String, port : Int) {
    val batchInterval = Milliseconds(2000)
 
    val ssc = new StreamingContext(master, "FlumeEventCount", batchInterval,
      System.getenv("SPARK_HOME"), StreamingContext.jarOfClass(this.getClass))
 
    val stream = FlumeUtils.createStream(ssc, host,port,StorageLevel.MEMORY_ONLY)
 
    stream.count().map(cnt => "Received " + cnt + " flume events." ).print()
    ssc.start()
    ssc.awaitTermination()
}

以上程式都是在Spark tandalone Mode下面執行的,如果你想在YARN上面執行,也是可以的,不過需要做點修改。具體怎麼在Yarn上面執行,請參見官方文件。

參考:https://www.iteblog.com/archives/1063.html