1. 程式人生 > >spark記錄(0)SparkStreaming算子操作

spark記錄(0)SparkStreaming算子操作

top 單詞 operation upd cor ins 參數 arc 奇數

1 foreachRDD

  • output operation算子,必須對抽取出來的RDD執行action類算子,代碼才能執行。
  • 代碼:見上個隨筆例子

2 transform

  • transformation類算子
  • 可以通過transform算子,對Dstream做RDD到RDD的任意操作。
  • 代碼:
/**
 * 過濾黑名單
 * transform操作
 * DStream可以通過transform做RDD到RDD的任意操作。
 * @author root
 *
 */
public class TransformOperator {
    public
static void main(String[] args) { SparkConf conf = new SparkConf(); conf.setMaster("local[2]").setAppName("transform"); JavaStreamingContext jsc = new JavaStreamingContext(conf,Durations.seconds(5)); //黑名單 List<String> list = Arrays.asList("zhangsan");
final Broadcast<List<String>> bcBlackList = jsc.sparkContext().broadcast(list); //接受socket數據源 JavaReceiverInputDStream<String> nameList = jsc.socketTextStream("node5", 9999); JavaPairDStream<String, String> pairNameList = nameList.mapToPair(
new PairFunction<String, String, String>() { private static final long serialVersionUID = 1L; @Override public Tuple2<String, String> call(String s) throws Exception { return new Tuple2<String, String>(s.split(" ")[1], s); } }); /** * transform 可以拿到DStream中的RDD,做RDD到RDD之間的轉換,不需要Action算子觸發,需要返回RDD類型。 * 註意:transform call方法內,拿到RDD 算子外的代碼 在Driver端執行,也可以做到動態改變廣播變量。 */ JavaDStream<String> transFormResult = pairNameList.transform(new Function<JavaPairRDD<String,String>, JavaRDD<String>>() { private static final long serialVersionUID = 1L; @Override public JavaRDD<String> call(JavaPairRDD<String, String> nameRDD) throws Exception { JavaPairRDD<String, String> filter = nameRDD.filter(new Function<Tuple2<String,String>, Boolean>() { private static final long serialVersionUID = 1L; @Override public Boolean call(Tuple2<String, String> tuple) throws Exception { return !bcBlackList.value().contains(tuple._1); } }); JavaRDD<String> map = filter.map(new Function<Tuple2<String,String>, String>() { private static final long serialVersionUID = 1L; @Override public String call(Tuple2<String, String> tuple) throws Exception { return tuple._2; } }); //返回過濾好的結果 return map; } }); transFormResult.print(); jsc.start(); jsc.awaitTermination(); jsc.stop(); } }

3 updateStateByKey

  • transformation算子
  • updateStateByKey作用:

    1) 為SparkStreaming中每一個Key維護一份state狀態,state類型可以是任意類型的,可以是一個自定義的對象,更新函數也可以是自定義的。

    2) 通過更新函數對該key的狀態不斷更新,對於每個新的batch而言,SparkStreaming會在使用updateStateByKey的時候為已經存在的key進行state的狀態更新。

  • 使用到updateStateByKey要開啟checkpoint機制和功能。
  • 多久會將內存中的數據寫入到磁盤一份?

    如果batchInterval設置的時間小於10秒,那麽10秒寫入磁盤一份。如果batchInterval設置的時間大於10秒,那麽就會batchInterval時間間隔寫入磁盤一份。

  • 代碼
public class UpdateStateByKeyDemo {

public static void main(String[] args) {

/*

* 第一步:配置SparkConf:

* 1,至少2條線程:因為Spark Streaming應用程序在運行的時候,至少有一條

* 線程用於不斷的循環接收數據,並且至少有一條線程用於處理接受的數據(否則的話無法

* 有線程用於處理數據,隨著時間的推移,內存和磁盤都會不堪重負);

* 2,對於集群而言,每個Executor一般肯定不止一個Thread,那對於處理Spark Streaming的

* 應用程序而言,每個Executor一般分配多少Core比較合適?根據我們過去的經驗,5個左右的

* Core是最佳的(一個段子分配為奇數個Core表現最佳,例如3個、5個、7個Core等);

*/

SparkConf conf = new SparkConf().setMaster("local[2]").

setAppName("UpdateStateByKeyDemo");

/*

* 第二步:創建SparkStreamingContext:

* 1,這個是SparkStreaming應用程序所有功能的起始點和程序調度的核心

* SparkStreamingContext的構建可以基於SparkConf參數,也可基於持久化的SparkStreamingContext的內容

* 來恢復過來(典型的場景是Driver崩潰後重新啟動,由於Spark Streaming具有連續7*24小時不間斷運行的特征,

* 所有需要在Driver重新啟動後繼續上衣系的狀態,此時的狀態恢復需要基於曾經的Checkpoint);

* 2,在一個Spark Streaming應用程序中可以創建若幹個SparkStreamingContext對象,使用下一個SparkStreamingContext

* 之前需要把前面正在運行的SparkStreamingContext對象關閉掉,由此,我們獲得一個重大的啟發SparkStreaming框架也只是

* Spark Core上的一個應用程序而已,只不過Spark Streaming框架箱運行的話需要Spark工程師寫業務邏輯處理代碼;

*/

JavaStreamingContext jsc = new JavaStreamingContext(conf, Durations.seconds(5));

//報錯解決辦法做checkpoint,開啟checkpoint機制,把checkpoint中的數據放在這裏設置的目錄中,

//生產環境下一般放在HDFS中

jsc.checkpoint("/usr/local/tmp/checkpoint");

/*

* 第三步:創建Spark Streaming輸入數據來源input Stream:

* 1,數據輸入來源可以基於File、HDFS、Flume、Kafka、Socket等

* 2, 在這裏我們指定數據來源於網絡Socket端口,Spark Streaming連接上該端口並在運行的時候一直監聽該端口

* 的數據(當然該端口服務首先必須存在),並且在後續會根據業務需要不斷的有數據產生(當然對於Spark Streaming

* 應用程序的運行而言,有無數據其處理流程都是一樣的); 

* 3,如果經常在每間隔5秒鐘沒有數據的話不斷的啟動空的Job其實是會造成調度資源的浪費,因為並沒有數據需要發生計算,所以

* 實例的企業級生成環境的代碼在具體提交Job前會判斷是否有數據,如果沒有的話就不再提交Job;

*/

JavaReceiverInputDStream lines = jsc.socketTextStream("hadoop100", 9999);

/*

* 第四步:接下來就像對於RDD編程一樣基於DStream進行編程!!!原因是DStream是RDD產生的模板(或者說類),在Spark Streaming具體

* 發生計算前,其實質是把每個Batch的DStream的操作翻譯成為對RDD的操作!!!

*對初始的DStream進行Transformation級別的處理,例如map、filter等高階函數等的編程,來進行具體的數據計算

    * 第4.1步:講每一行的字符串拆分成單個的單詞

    */

JavaDStream<String> words = lines.flatMap(new FlatMapFunction<String, String>() { //如果是Scala,由於SAM轉換,所以可以寫成val words = lines.flatMap { line => line.split(" ")}

@Override

public Iterable<String> call(String line) throws Exception {

return Arrays.asList(line.split(" "));

}

});

/*

      * 第四步:對初始的DStream進行Transformation級別的處理,例如map、filter等高階函數等的編程,來進行具體的數據計算

      * 第4.2步:在單詞拆分的基礎上對每個單詞實例計數為1,也就是word => (word, 1)

      */

JavaPairDStream<String, Integer> pairs = words.mapToPair(new PairFunction<String, String, Integer>() {

@Override

public Tuple2<String, Integer> call(String word) throws Exception {

return new Tuple2<String, Integer>(word, 1);

}

});

/*

      * 第四步:對初始的DStream進行Transformation級別的處理,例如map、filter等高階函數等的編程,來進行具體的數據計算

      *第4.3步:在這裏是通過updateStateByKey來以Batch Interval為單位來對歷史狀態進行更新,

      * 這是功能上的一個非常大的改進,否則的話需要完成同樣的目的,就可能需要把數據保存在Redis、

      * Tagyon或者HDFS或者HBase或者數據庫中來不斷的完成同樣一個key的State更新,如果你對性能有極為苛刻的要求,

      * 且數據量特別大的話,可以考慮把數據放在分布式的Redis或者Tachyon內存文件系統中;

      * 當然從Spark1.6.x開始可以嘗試使用mapWithState,Spark2.X後mapWithState應該非常穩定了。

 */

JavaPairDStream<String, Integer> wordsCount = pairs.updateStateByKey(new Function2<List<Integer>, Optional<Integer>, Optional<Integer>>() { //對相同的Key,進行Value的累計(包括Local和Reducer級別同時Reduce)

@Override

public Optional<Integer> call(List<Integer> values, Optional<Integer> state)

throws Exception {

Integer updatedValue = 0 ;

if(state.isPresent()){

updatedValue = state.get();

}

for(Integer value: values){

updatedValue += value;

}

return Optional.of(updatedValue);

}

});

/*

*此處的print並不會直接出發Job的執行,因為現在的一切都是在Spark Streaming框架的控制之下的,對於Spark Streaming

*而言具體是否觸發真正的Job運行是基於設置的Duration時間間隔的

*諸位一定要註意的是Spark Streaming應用程序要想執行具體的Job,對Dtream就必須有output Stream操作,

*output Stream有很多類型的函數觸發,類print、saveAsTextFile、saveAsHadoopFiles等,最為重要的一個

*方法是foraeachRDD,因為Spark Streaming處理的結果一般都會放在Redis、DB、DashBoard等上面,foreachRDD

*主要就是用用來完成這些功能的,而且可以隨意的自定義具體數據到底放在哪裏!!!

*/

wordsCount.print();

/*

* Spark Streaming執行引擎也就是Driver開始運行,Driver啟動的時候是位於一條新的線程中的,當然其內部有消息循環體,用於

* 接受應用程序本身或者Executor中的消息;

*/

jsc.start();

jsc.awaitTermination();

jsc.close();

}

4 窗口操作

  • 窗口操作理解圖:

技術分享圖片

假設每隔5s 1個batch,上圖中窗口長度為15s,窗口滑動間隔10s。

  • 窗口長度和滑動間隔必須是batchInterval的整數倍。如果不是整數倍會檢測報錯。
  • 優化後的window窗口操作示意圖:

技術分享圖片

  • 優化後的window操作要保存狀態所以要設置checkpoint路徑,沒有優化的window操作可以不設置checkpoint路徑。
  • 代碼:
/**
 * 基於滑動窗口的熱點搜索詞實時統計
 * @author root
 *
 */
public class WindowOperator {
    
    public static void main(String[] args) {
        SparkConf conf = new SparkConf()
                .setMaster("local[2]")
                .setAppName("WindowHotWord"); 
        
        JavaStreamingContext jssc = new JavaStreamingContext(conf, Durations.seconds(5));
        /**
         * 設置日誌級別為WARN
         *
         */
        jssc.sparkContext().setLogLevel("WARN");
        /**
         * 註意:
         *  沒有優化的窗口函數可以不設置checkpoint目錄
         *  優化的窗口函數必須設置checkpoint目錄         
         */
//           jssc.checkpoint("hdfs://node1:9000/spark/checkpoint");
           jssc.checkpoint("./checkpoint");
        JavaReceiverInputDStream<String> searchLogsDStream = jssc.socketTextStream("node5", 9999);
        JavaDStream<String> window = searchLogsDStream.window(Durations.seconds(15), Durations.seconds(5));
        //word    1
        JavaDStream<String> searchWordsDStream = searchLogsDStream.flatMap(new FlatMapFunction<String, String>() {

            /**
             * 
             */
            private static final long serialVersionUID = 1L;

            @Override
            public Iterable<String> call(String t) throws Exception {
                return Arrays.asList(t.split(" "));
            }
        });
        
        // 將搜索詞映射為(searchWord, 1)的tuple格式
        JavaPairDStream<String, Integer> searchWordPairDStream = searchWordsDStream.mapToPair(
                
                new PairFunction<String, String, Integer>() {

                    private static final long serialVersionUID = 1L;

                    @Override
                    public Tuple2<String, Integer> call(String searchWord)
                            throws Exception {
                        return new Tuple2<String, Integer>(searchWord, 1);
                    }
                    
                });
        /**
         * 每隔10秒,計算最近60秒內的數據,那麽這個窗口大小就是60秒,裏面有12個rdd,在沒有計算之前,這些rdd是不會進行計算的。
         * 那麽在計算的時候會將這12個rdd聚合起來,然後一起執行reduceByKeyAndWindow操作 ,
         * reduceByKeyAndWindow是針對窗口操作的而不是針對DStream操作的。
         */
//            JavaPairDStream<String, Integer> searchWordCountsDStream = 
//                
//                searchWordPairDStream.reduceByKeyAndWindow(new Function2<Integer, Integer, Integer>() {
//
//                    private static final long serialVersionUID = 1L;
//
//                    @Override
//                    public Integer call(Integer v1, Integer v2) throws Exception {
//                        return v1 + v2;
//                    }
//        }, Durations.seconds(15), Durations.seconds(5)); 
        
        
        /**
         * window窗口操作優化:
         */
         JavaPairDStream<String, Integer> searchWordCountsDStream = 
        
         searchWordPairDStream.reduceByKeyAndWindow(new Function2<Integer, Integer, Integer>() {

            private static final long serialVersionUID = 1L;

            @Override
            public Integer call(Integer v1, Integer v2) throws Exception {
                return v1 + v2;
            }
            
        },new Function2<Integer, Integer, Integer>() {

            private static final long serialVersionUID = 1L;

            @Override
            public Integer call(Integer v1, Integer v2) throws Exception {
                return v1 - v2;
            }
            
        }, Durations.seconds(15), Durations.seconds(5));    

          searchWordCountsDStream.print();
        
        jssc.start();     
        jssc.awaitTermination();
        jssc.close();
    }

}

spark記錄(0)SparkStreaming算子操作