[大資料]連載No16之 SparkSql函式+SparkStreaming運算元
阿新 • • 發佈:2019-01-04
本次總結圖如下
SparkSql可以自定義函式、聚合函式、開窗函式
作用說明:自定義一個函式,並且註冊本身,這樣就能在SQL語句中使用
使用方式sqlContext.udf().register(函式名,函式(輸入,輸出),返回型別))
程式碼
public static void main(String[] args) { SparkConf conf = new SparkConf().setAppName("udf").setMaster("local"); JavaSparkContext sc = new JavaSparkContext(conf); SQLContext sqlContext = newSparkStreaming:流式處理框架SQLContext(sc); List<String> strs = Arrays.asList("yarn", "Marry", "Jack", "To m", "tom"); JavaRDD<String> rdd = sc.parallelize(strs); JavaRDD<Row> rowRdd = rdd.map(new Function<String, Row>() { @Override public Row call(String s) throws Exception { returnRowFactory.create(s); } }); List<StructField> structTypes = new ArrayList<>(); structTypes.add(DataTypes.createStructField("name", DataTypes.StringType, true)); StructType structType = DataTypes.createStructType(structTypes); DataFrame df = sqlContext.createDataFrame(rowRdd, structType);df.show(); /** *自定義一個函式,並且註冊本身 */ sqlContext.udf().register("strLen", new UDF1<String, Integer>() { @Override public Integer call(String s) throws Exception { return s.length(); } }, DataTypes.IntegerType); df.registerTempTable("nameTB"); sqlContext.sql("select name,strLen(name) from nameTB").show(); sc.stop(); }
SparkStreaming vs Storm
1、Storm是純實時(來一條處理一條)的處理框架,SparkStreamIng是準實時(間隔一秒)的流式處理框架,sotrm處理資料吞吐量不如sparkStreaming高
Storm接收來的資料是一條一條傳輸到節點執行,SpakStreaming會攥一段的資料,然後傳輸到節點執行
2、Storm對於事物的支援度要比SparkStreaming要好,對於流式處理框架,事物指的是:資料剛好被執行一次
3、SparkStreaming中可以使用sql來處理資料,Storm不行
SparkStreaming適合做複雜業務邏輯,Storm適合做簡單的彙總型的計算
SparkStreaming運算元
foreachRDD , transform ,UpdateStateByKey(累加) ,reduceBykeyAndWindow(視窗)
1、foreachRDD和transform能夠從DSStream中將RDD抽取出來
2、foreach是一個out operator運算元,transform是一個transform運算元
3、updateStateByKey為已經存在的key進行state的狀態更新
4、reduceBykeyAndWindow:基於滑動視窗的熱點搜尋詞實時統計,會處理一段時間之類的資料,處理的間隔時長資料+處理的間隔時間
重複計算邏輯和圖
重複計算問題解決邏輯圖
注意點:
1、local的模擬執行緒數不許大於2、因為一條執行緒被receiver(接受資料的執行緒佔用了),另外一個執行緒是job執行
2、Durations時間的設定,就是我們能接受的延遲度、時間間隔要根據叢集資源、job執行時間設定,
3、業務邏輯完成,需要有一個output operator類運算元(類似action類運算元)
4、javaStreamingContext.start()
5、javaStreamingContext.stop() 無參的sotp方法會將sparkContext一同關閉 ,停止後不能再呼叫start,不關閉使用stop(false)
程式碼實戰:transform 和 updateStateByKey
public static void main(String[] args) throws Exception { SparkConf conf = new SparkConf().setAppName("tst") /** * 如果攜程local,則只會啟動一個執行緒, * 對於sparkStrem,預設會有一個執行緒, 會一直監聽server傳送的資料,那麼會這用這一個執行緒, * 每隔5秒發啟動一個job執行緒處理請求過來的資料,無法執行,所以需要設定為local[2],另啟動一個執行緒 */ .setMaster("local[2]"); JavaStreamingContext streamingContext = new JavaStreamingContext(conf, Durations.seconds(5)); JavaReceiverInputDStream<String> str = streamingContext.socketTextStream("localhost", 9999); JavaPairDStream<String, Integer> rdd = str.transformToPair(new Function<JavaRDD<String>, JavaPairRDD<String, Integer>>() { @Override public JavaPairRDD<String, Integer> call(JavaRDD<String> stringJavaRDD) throws Exception { return stringJavaRDD.flatMap(new FlatMapFunction<String, String>() { @Override public Iterable<String> call(String s) throws Exception { return Arrays.asList(s.split(" ")); } }).mapToPair(new PairFunction<String, String, Integer>() { @Override public Tuple2<String, Integer> call(String s) throws Exception { return new Tuple2<>(s,1); } }).reduceByKey(new Function2<Integer, Integer, Integer>() { @Override public Integer call(Integer v1, Integer v2) throws Exception { return v1+v2; } }); } }); /** * 多久會將記憶體中的資料(每一個key所對應的狀態)寫入到磁碟上一份呢? * 如果你的batch interval小於10s 那麼10s會將記憶體中的資料寫入到磁碟一份 * 如果bacth interval 大於10s,那麼就以bacth interval為準 */ streamingContext.checkpoint("sscheckpoint01"); JavaPairDStream<String, Integer> keysDS=rdd.updateStateByKey /** * par1: 本次通過key分組後的值集合 * par2: 上一次記錄的值 * par3: 返回的值 */ (new Function2<List<Integer>, Optional<Integer>, Optional<Integer>>() { @Override public Optional<Integer> call /** *par1: 本次通過key分組後的值集合 * par2: 上一次記錄的值 */ (List<Integer> par1, Optional<Integer> par2) throws Exception { Integer result=par2.isPresent() ? par2.get():0; for(Integer i : par1) result =result+ i; return Optional.of(result); } }); str.print(); System.out.println("------------------------------------"); rdd.print(); System.out.println("------------------------------------"); keysDS.print(); streamingContext.start(); streamingContext.awaitTermination(); }程式碼實戰reduceByWindow運算元
public static void main(String[] args) throws Exception { SparkConf conf = new SparkConf().setAppName("tes").setMaster("local[2]"); JavaStreamingContext spj = new JavaStreamingContext(conf, Durations.seconds(5)); spj.checkpoint("sscheckpoint01"); JavaReceiverInputDStream<String> receiverInputDStream = spj.socketTextStream("localhost", 9999); JavaDStream<String> transform = receiverInputDStream.transform(new Function<JavaRDD<String>, JavaRDD<String>>() { @Override public JavaRDD<String> call(JavaRDD<String> stringJavaRDD) throws Exception { return stringJavaRDD; } }); /** * 每隔10秒,計算最近60秒內的資料,那麼這個視窗大小就是60秒,裡面有12個rdd,在沒有計算之前,這些rdd是不會進行計算的。 * 那麼在計算的時候會將這12個rdd聚合起來,然後一起執行reduceByKeyAndWindow 操作 * ,reduceByKeyAndWindow是針對視窗操作的而不是針對DStream操作的。 */ // JavaDStream<String> stringJavaDStream = transform.reduceByWindow(new Function2<String, String, String>() { // @Override // public String call(String s, String s2) throws Exception { // return s + "_" + s2; // } // }, // /** // * 視窗長度 , 視窗滑動時間 // */ // Durations.seconds(15), Durations.seconds(10)); //優化後的程式碼 JavaDStream<String> stringJavaDStream = transform.reduceByWindow(new Function2<String, String, String>() { @Override /** * 將講個時間資料累加 */ public String call(String s, String s2) throws Exception { return s + "_" + s2; } }, new Function2<String, String, String>() { @Override /** * 使用優化程式碼,避免重複計算,現將上一次的計算結果和本次累加, * 然後去掉上一次結算結果頭號計算 */ public String call(String s, String s2) throws Exception { return s.replace(s2,""); } }, /** * 視窗長度 , 視窗滑動時間 */ Durations.seconds(15), Durations.seconds(10)); stringJavaDStream.print(); spj.start(); spj.awaitTermination(); }