1. 程式人生 > >Spark Streaming開發入門——WordCount(Java&Scala)

Spark Streaming開發入門——WordCount(Java&Scala)

一、Java方式開發

1、開發前準備

假定您以搭建好了Spark叢集。

2、開發環境採用eclipse maven工程,需要新增Spark Streaming依賴。

    <dependency>
      <groupId>org.apache.spark</groupId>
      <artifactId>spark-streaming_2.10</artifactId>
      <version>1.6.0</version>
    </dependency>

3、Spark streaming 基於Spark Core進行計算,需要注意事項:

1.local模式的話,local後必須為大於等於2的數字【即至少2條執行緒】。因為receiver 佔了一個。
因為,SparkStreaming 在執行的時候,至少需要一條執行緒不斷迴圈接收資料,而且至少有一條執行緒處理接收的資料。
如果只有一條執行緒的話,接受的資料不能被處理。

溫馨提示:

對於叢集而言,每隔exccutor一般肯定不只一個Thread,那對於處理Spark Streaming應用程式而言,每個executor一般分配多少core比較合適?根據我們過去的經驗,5個左右的core是最佳的(段子:分配為奇數個core的表現最佳,例如:分配3個、5個、7個core等)

接下來,讓我們開始動手寫寫Java程式碼吧!

第一步:

SparkConf conf = new SparkConf().setMaster("spark://Master:7077").setAppName("wordCountOnline");

第二步:

我們採用基於配置檔案的方式建立SparkStreamingContext物件

    /**
     * 第二步:建立SparkStreamingContext,這個是SparkStreaming應用程式所有功能的起始點和程式排程的核心
     * SparkStreamingContext的構建可以基於SparkConf引數,也可基於持久化的SparkStreamingContext的內容來回復過來
     * (典型的場景是Driver崩潰後重新啟動,由於Spark Streaming具有連續7*24小時不間斷執行的特徵,
     * 所有需要再Drver重新啟動後繼續上的狀態,此時的狀態恢復需要基於曾經的Checkpoint)
     * 
     * 
     * 2.在一個SparkStreaming應用程式中可以有多個SparkStreamingContext物件,使用下一個SparkStreaming程式之前
     * 需要把前面正在執行的SparkStreamingContext物件關閉掉,由此,我們獲得一個啟發:
     * 
     * SparkStreaming框架也 就是Spark Core上的一個應用程式。
     * 只不過需要Spark工程師寫業務邏輯程式碼。
     * 所以要掌握好Spark,學好SparkStreaming 就行了。
     * 
     * java虛擬機器在os角度上看就是一個簡單的c。c++應用程式。
     * 
     * 這裡可以用工廠方法建立ssc
     * 
     * 
     */
JavaStreamingContext jsc = new JavaStreamingContext(conf,Durations.seconds(5));

第三步,建立Spark Streaming輸入資料來源:

我們將資料來源配置為本地埠9999(注意埠要求沒有被佔用):

    /**
     * 第三步:建立Spark Streaming 輸入資料來源:input Stream
     * 1.資料輸入來源可以基於File、HDFS、Flume、Kafka、Socket
     * 
     * 2.在這裡我們指定資料來源於網路Socket埠,
     * Spark Streaming連線上該埠並在執行的時候一直監聽該埠的資料(當然該埠服務首先必須存在,並且在後續會根據業務需要不斷的有資料產生)。
     * 
     * 有資料和沒有資料 在處理邏輯上沒有影響(在程式看來都是資料)
     * 
     * 3.如果經常在每間隔5秒鐘 沒有資料的話,不斷的啟動空的job其實是會造成排程資源的浪費。因為 並沒有資料需要發生計算。
     * 所以企業級生產環境的程式碼在具體提交Job前會判斷是否有資料,如果沒有的話,就不再提交Job;
     * 
     * 
     */
    JavaReceiverInputDStream<String>  lines = jsc.socketTextStream("Master", 9999);

第四步:我們就像對RDD程式設計一樣,基於DStream進行程式設計,原因是DStream是RDD產生的模板,在Spark Streaming發生計算前,其實質是把每個Batch的DStream的操作翻譯成為了RDD操作。DStream對RDD進行了一次抽象。如同DataFrame對RDD進行一次抽象。JavaRDD —-> JavaDStream

1、flatMap操作:

    JavaDStream<String> words =  lines.flatMap(new FlatMapFunction<String, String>() {

        @Override
        public Iterable<String> call(String line) throws Exception {
            // TODO Auto-generated method stub
            return Arrays.asList(line.split(" "));
        }
    });

2、 mapToPair操作:

    JavaPairDStream<String, Integer> pairs = words.mapToPair(new PairFunction<String, String, Integer>() {

        /**
         * 
         */
        private static final long serialVersionUID = 1L;

        @Override
        public Tuple2<String, Integer> call(String word) throws Exception {
            // TODO Auto-generated method stub
            return new Tuple2<String,Integer>(word,1);
        }
    });

3、reduceByKey操作:

    JavaPairDStream<String, Integer>  wordsCount = pairs.reduceByKey(new Function2<Integer, Integer, Integer>() {

        @Override
        public Integer call(Integer v1, Integer v2) throws Exception {
            // TODO Auto-generated method stub
            return v1 + v2;
        }
    });

4、print等操作:

    /**
     * 此處的print並不會直接觸發Job的執行,因為現在的一切都是在Spark Streaming框架的控制之下的。
     * 具體是否真正觸發Job的執行是基於設定的Duration時間間隔的觸發的。
     * 
     * 
     * Spark應用程式要想執行具體的Job對DStream就必須有output Stream操作。
     * ouput Stream 有很多型別的函式觸發,類print、savaAsTextFile、saveAsHadoopFiles等,最為重要的是foreachRDD,因為Spark Streamimg處理的結果一般都會
     * 放在Redis、DB、DashBoard等上面,foreachRDD主要
     * 就是用來 完成這些功能的,而且可以隨意的自定義具體資料放在哪裡。
     * 
     */
    wordsCount.print();

    jsc.start();

    jsc.awaitTermination();
    jsc.close();
溫馨提示:

除了print()方法將處理後的資料輸出之外,還有其他的方法也非常重要,在開發中需要重點掌握,比如SaveAsTextFile,SaveAsHadoopFile等,最為重要的是foreachRDD方法,這個方法可以將資料寫入Redis,DB,DashBoard等,甚至可以隨意的定義資料放在哪裡,功能非常強大。

二、Scala方式開發

import org.apache.spark.SparkConf
import org.apache.spark.streaming.StreamingContext
import org.apache.spark.streaming.Seconds
import org.apache.spark.storage.StorageLevel

/**
 * @author lxh
 */
object WordCountOnline {

  def main(args: Array[String]): Unit = {

    /**
   * 建立SparkConf
   */
  val conf =  new SparkConf().setMaster("spark://Master:7077").setAppName("wordcountonline")

  val ssc  = new StreamingContext(conf,Seconds(1))

  val lines = ssc.socketTextStream("Master", 9999, StorageLevel.MEMORY_AND_DISK)

  /**
   * 按空格分割
   */
  val words = lines.flatMap { line => line.split(" ") }

  /**
   * 把單個的word變成tuple
   */
  val wordCount  = words.map { word => (word,1) }

  /**
   * (key1,1) (key1,1)
   * key相同的累加。
   */
  wordCount.reduceByKey(_+_)

  wordCount.print()
  ssc.start()

  /**
   * 等待程式結束
   */
  ssc.awaitTermination()
  ssc.stop(true)

  }
}