spark記錄(4)spark算子之Action
阿新 • • 發佈:2019-02-27
lac atm ide replace action ret 加載 再次 col
Action類算子也是一類算子(函數)叫做行動算子,如foreach,collect,count等。Transformations類算子是延遲執行,Action類算子是觸發執行。一個application應用程序中有幾個Action類算子執行,就有幾個job運行。
(1)reduce
reduce其實是講RDD中的所有元素進行合並,當運行call方法時,會傳入兩個參數,在call方法中將兩個參數合並後返回,而這個返回值回合一個新的RDD中的元素再次傳入call方法中,繼續合並,直到合並到只剩下一個元素時。
代碼
public static void reduce() { JavaRDD<String> rdd = jsc.textFile("words"); String reduce = rdd.flatMap(new FlatMapFunction<String, String>() { private static final long serialVersionUID = 1L; @Override public Iterable<String> call(String a) throws Exception {return Arrays.asList(a.split(" ")); } }).reduce(new Function2<String, String, String>() { private static final long serialVersionUID = 1L; @Override public String call(String a, String b) throws Exception {return a+"-->"+b; } }); System.out.println(reduce); }
結果:
(2)collect()
將計算的結果作為集合拉回到driver端,一般在使用過濾算子或者一些能返回少量數據集的算子後,將結果回收到Driver端打印顯示。
分布式環境下盡量規避,如有其他需要,手動編寫代碼實現相應功能就好。
詳情請參考:https://blog.csdn.net/Fortuna_i/article/details/80851775
代碼:
public static void collect() { JavaRDD<String> rdd = jsc.textFile("words"); List<String> collect = rdd.flatMap(new FlatMapFunction<String, String>() { private static final long serialVersionUID = 1L; @Override public Iterable<String> call(String a) throws Exception { return Arrays.asList(a.split(" ")); } }).collect(); for (String string : collect) { System.out.println(string); } }
結果:
(3)take
返回一個包含數據集前n個元素的數組(從0下標到n-1下標的元素),不排序。
代碼:
public static void take() { JavaRDD<String> rdd = jsc.textFile("words"); List<String> take = rdd.flatMap(new FlatMapFunction<String, String>() { private static final long serialVersionUID = 1L; @Override public Iterable<String> call(String a) throws Exception { return Arrays.asList(a.split(" ")); } }).take(5); for (String string : take) { System.out.println(string); } }
結果:
(4)first
返回數據集的第一個元素(底層即是take(1))
代碼:
public static void first() { JavaRDD<String> rdd = jsc.textFile("words"); String first = rdd.flatMap(new FlatMapFunction<String, String>() { private static final long serialVersionUID = 1L; @Override public Iterable<String> call(String a) throws Exception { return Arrays.asList(a.split(" ")); } }).first(); System.out.println(first); }
結果:
(5)takeSample(withReplacement, num, [seed])
對於一個數據集進行隨機抽樣,返回一個包含num個隨機抽樣元素的數組,withReplacement表示是否有放回抽樣,參數seed指定生成隨機數的種子。
該方法僅在預期結果數組很小的情況下使用,因為所有數據都被加載到driver端的內存中。
代碼:
結果:
(6)count
返回數據集中元素個數,默認Long類型。
代碼:
public static void count() { JavaRDD<String> rdd = jsc.textFile("words"); long count = rdd.flatMap(new FlatMapFunction<String, String>() { private static final long serialVersionUID = 1L; @Override public Iterable<String> call(String a) throws Exception { return Arrays.asList(a.split(" ")); } }).count(); System.out.println(count); }
結果:
spark記錄(4)spark算子之Action