1. 程式人生 > >[大資料]連載No16之 SparkSql函式+SparkStreaming運算元

[大資料]連載No16之 SparkSql函式+SparkStreaming運算元

本次總結圖如下


SparkSql可以自定義函式、聚合函式、開窗函式
作用說明:自定義一個函式,並且註冊本身,這樣就能在SQL語句中使用

 使用方式sqlContext.udf().register(函式名,函式(輸入,輸出),返回型別))

程式碼

public static void main(String[] args) {
    SparkConf conf = new SparkConf().setAppName("udf").setMaster("local");
JavaSparkContext sc = new JavaSparkContext(conf);
SQLContext sqlContext = new 
SQLContext(sc); List<String> strs = Arrays.asList("yarn", "Marry", "Jack", "To m", "tom"); JavaRDD<String> rdd = sc.parallelize(strs); JavaRDD<Row> rowRdd = rdd.map(new Function<String, Row>() { @Override public Row call(String s) throws Exception { return
RowFactory.create(s); } }); List<StructField> structTypes = new ArrayList<>(); structTypes.add(DataTypes.createStructField("name", DataTypes.StringType, true)); StructType structType = DataTypes.createStructType(structTypes); DataFrame df = sqlContext.createDataFrame(rowRdd, structType);
df.show(); /** *自定義一個函式,並且註冊本身 */ sqlContext.udf().register("strLen", new UDF1<String, Integer>() { @Override public Integer call(String s) throws Exception { return s.length(); } }, DataTypes.IntegerType); df.registerTempTable("nameTB"); sqlContext.sql("select name,strLen(name) from nameTB").show(); sc.stop(); }
SparkStreaming:流式處理框架
SparkStreaming vs Storm
1、Storm是純實時(來一條處理一條)的處理框架,SparkStreamIng是準實時(間隔一秒)的流式處理框架,sotrm處理資料吞吐量不如sparkStreaming高
    Storm接收來的資料是一條一條傳輸到節點執行,SpakStreaming會攥一段的資料,然後傳輸到節點執行
2、Storm對於事物的支援度要比SparkStreaming要好,對於流式處理框架,事物指的是:資料剛好被執行一次
3、SparkStreaming中可以使用sql來處理資料,Storm不行
    SparkStreaming適合做複雜業務邏輯,Storm適合做簡單的彙總型的計算

SparkStreaming運算元
foreachRDD , transform ,UpdateStateByKey(累加) ,reduceBykeyAndWindow(視窗)
1、foreachRDD和transform能夠從DSStream中將RDD抽取出來
2、foreach是一個out operator運算元,transform是一個transform運算元
3、updateStateByKey為已經存在的key進行state的狀態更新
4、reduceBykeyAndWindow:基於滑動視窗的熱點搜尋詞實時統計,會處理一段時間之類的資料,處理的間隔時長資料+處理的間隔時間

   重複計算邏輯和圖

重複計算問題解決邏輯圖


 注意點:
1、local的模擬執行緒數不許大於2、因為一條執行緒被receiver(接受資料的執行緒佔用了),另外一個執行緒是job執行
2、Durations時間的設定,就是我們能接受的延遲度、時間間隔要根據叢集資源、job執行時間設定,
3、業務邏輯完成,需要有一個output operator類運算元(類似action類運算元)
4、javaStreamingContext.start()
5、javaStreamingContext.stop() 無參的sotp方法會將sparkContext一同關閉 ,停止後不能再呼叫start,不關閉使用stop(false)

程式碼實戰:transform 和 updateStateByKey

public static void main(String[] args) throws Exception {
    SparkConf conf = new SparkConf().setAppName("tst")
            /**
             * 如果攜程local,則只會啟動一個執行緒,
             * 對於sparkStrem,預設會有一個執行緒, 會一直監聽server傳送的資料,那麼會這用這一個執行緒,
             * 每隔5秒發啟動一個job執行緒處理請求過來的資料,無法執行,所以需要設定為local[2],另啟動一個執行緒
             */
.setMaster("local[2]");
JavaStreamingContext streamingContext = new JavaStreamingContext(conf, Durations.seconds(5));
JavaReceiverInputDStream<String> str = streamingContext.socketTextStream("localhost", 9999);
JavaPairDStream<String, Integer> rdd = str.transformToPair(new Function<JavaRDD<String>, JavaPairRDD<String, Integer>>() {
        @Override
public JavaPairRDD<String, Integer> call(JavaRDD<String> stringJavaRDD) throws Exception {
            return stringJavaRDD.flatMap(new FlatMapFunction<String, String>() {
                @Override
public Iterable<String> call(String s) throws Exception {
                    return Arrays.asList(s.split(" "));
}
            }).mapToPair(new PairFunction<String, String, Integer>() {
                @Override
public Tuple2<String, Integer> call(String s) throws Exception {
                    return new Tuple2<>(s,1);
}
            }).reduceByKey(new Function2<Integer, Integer, Integer>() {
                @Override
public Integer call(Integer v1, Integer v2) throws Exception {
                    return v1+v2;
}
            });
}
    });
/**
     * 多久會將記憶體中的資料(每一個key所對應的狀態)寫入到磁碟上一份呢?
     *     如果你的batch interval小於10s  那麼10s會將記憶體中的資料寫入到磁碟一份
     *     如果bacth interval 大於10s,那麼就以bacth interval為準
     */
streamingContext.checkpoint("sscheckpoint01");
JavaPairDStream<String, Integer>  keysDS=rdd.updateStateByKey
            /**
             * par1: 本次通過key分組後的值集合
             * par2: 上一次記錄的值
             * par3: 返回的值
             */
(new Function2<List<Integer>, Optional<Integer>, Optional<Integer>>() {
        @Override
public Optional<Integer> call
/**
         *par1: 本次通過key分組後的值集合
         * par2: 上一次記錄的值
         */
(List<Integer> par1, Optional<Integer> par2) throws Exception {
            Integer result=par2.isPresent() ? par2.get():0;
            for(Integer i : par1)
                result =result+ i;
            return Optional.of(result);
}
    });
str.print();
System.out.println("------------------------------------");
rdd.print();
System.out.println("------------------------------------");
keysDS.print();
streamingContext.start();
streamingContext.awaitTermination();
}
程式碼實戰reduceByWindow運算元
public static void main(String[] args) throws Exception {
        SparkConf conf = new SparkConf().setAppName("tes").setMaster("local[2]");
JavaStreamingContext spj = new JavaStreamingContext(conf, Durations.seconds(5));
spj.checkpoint("sscheckpoint01");
JavaReceiverInputDStream<String> receiverInputDStream = spj.socketTextStream("localhost", 9999);
JavaDStream<String> transform = receiverInputDStream.transform(new Function<JavaRDD<String>, JavaRDD<String>>() {
            @Override
public JavaRDD<String> call(JavaRDD<String> stringJavaRDD) throws Exception {
                return stringJavaRDD;
}
        });
/**
         * 每隔10秒,計算最近60秒內的資料,那麼這個視窗大小就是60秒,裡面有12個rdd,在沒有計算之前,這些rdd是不會進行計算的。
         * 那麼在計算的時候會將這12個rdd聚合起來,然後一起執行reduceByKeyAndWindow 操作
         * ,reduceByKeyAndWindow是針對視窗操作的而不是針對DStream操作的。
         */
//        JavaDStream<String> stringJavaDStream = transform.reduceByWindow(new Function2<String, String, String>() {
//                                                                             @Override
//                                                                             public String call(String s, String s2) throws Exception {
//                                                                                 return s + "_" + s2;
//                                                                             }
//                                                                         },
//                /**
//                 * 視窗長度 , 視窗滑動時間
//                 */
//                Durations.seconds(15), Durations.seconds(10));
        //優化後的程式碼
JavaDStream<String> stringJavaDStream = transform.reduceByWindow(new Function2<String, String, String>() {
                                                                             @Override
/**
                                                                              * 將講個時間資料累加
                                                                              */
public String call(String s, String s2) throws Exception {
                                                                                 return s + "_" + s2;
}
                                                                         }, new Function2<String, String, String>() {
                                                                             @Override
/**
                                                                              * 使用優化程式碼,避免重複計算,現將上一次的計算結果和本次累加,
                                                                              * 然後去掉上一次結算結果頭號計算
                                                                              */
public String call(String s, String s2) throws Exception {
                                                                                 return s.replace(s2,"");
}
                                                                         },
/**
                 * 視窗長度 , 視窗滑動時間
                 */
Durations.seconds(15), Durations.seconds(10));
stringJavaDStream.print();
spj.start();
spj.awaitTermination();
}