sparkCore Api常用運算元使用
阿新 • • 發佈:2018-12-05
package sparkjava; import org.apache.spark.SparkConf; import org.apache.spark.api.java.JavaPairRDD; import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaSparkContext; import org.apache.spark.api.java.function.*; import scala.Tuple2; import java.util.Arrays; import java.util.Iterator; import java.util.List; /** * 熟悉api */ public class TestSparkApi { // API介紹 /* 1.1 transform l map(func):對呼叫map的RDD資料集中的每個element都使用func,然後返回一個新的RDD,這個返回的資料集是分散式的資料集 l filter(func) : 對呼叫filter的RDD資料集中的每個元素都使用func,然後返回一個包含使func為true的元素構成的RDD l flatMap(func):和map差不多,但是flatMap生成的是多個結果 l mapPartitions(func):和map很像,但是map是每個element,而mapPartitions是每個partition l mapPartitionsWithSplit(func):和mapPartitions很像,但是func作用的是其中一個split上,所以func中應該有index l sample(withReplacement,faction,seed):抽樣 l union(otherDataset):返回一個新的dataset,包含源dataset和給定dataset的元素的集合 l distinct([numTasks]):返回一個新的dataset,這個dataset含有的是源dataset中的distinct的element l groupByKey(numTasks):返回(K,Seq[V]),也就是hadoop中reduce函式接受的key-valuelist l reduceByKey(func,[numTasks]):就是用一個給定的reduce func再作用在groupByKey產生的(K,Seq[V]),比如求和,求平均數 l sortByKey([ascending],[numTasks]):按照key來進行排序,是升序還是降序,ascending是boolean型別 1.2 action l reduce(func):說白了就是聚集,但是傳入的函式是兩個引數輸入返回一個值,這個函式必須是滿足交換律和結合律的 l collect():一般在filter或者足夠小的結果的時候,再用collect封裝返回一個數組 l count():返回的是dataset中的element的個數 l first():返回的是dataset中的第一個元素 l take(n):返回前n個elements l takeSample(withReplacement,num,seed):抽樣返回一個dataset中的num個元素,隨機種子seed l saveAsTextFile(path):把dataset寫到一個text file中,或者hdfs,或者hdfs支援的檔案系統中,spark把每條記錄都轉換為一行記錄,然後寫到file中 l saveAsSequenceFile(path):只能用在key-value對上,然後生成SequenceFile寫到本地或者hadoop檔案系統 l countByKey():返回的是key對應的個數的一個map,作用於一個RDD l foreach(func):對dataset中的每個元素都使用func*/ /*資料情況 a 1 b 2 c 3 d 4 e 5*/ public static void main(String[] args) { // String filepath=args[0]; SparkConf conf = new SparkConf(); conf.setAppName("transformAndAction"); conf.setMaster("local"); JavaSparkContext sc = new JavaSparkContext(conf); JavaRDD<String> rdd = sc.textFile("D://sparkAction.txt"); // --transform // testSparkCoreApiMap(rdd); //本案例實現,列印所有資料 // testSparkCoreApiFilter(rdd); // testSparkCoreApiFlatMap(rdd); //本案例實現:列印所有的字元 // testSparkCoreApiUnion(rdd);//合併兩個RDD // testSparkCoreApiDistinct(rdd); // testSparkCoreApiMaptoPair(rdd);//把RDD對映為鍵值對型別的資料 // testSparkCoreApiGroupByKey(rdd);//對鍵值對型別的資料進行按鍵值合併 // testSparkCoreApiReduceByKey(rdd);//對鍵值對進行按鍵相同的對值進行操作 // --action testSparkCoreApiReduce(rdd);//對RDD進行遞迴呼叫 } /** * Map主要是對資料進行處理,不進行資料集的增減 * <p> * 本案例實現,列印所有資料 * * @param rdd */ private static void testSparkCoreApiMap(JavaRDD<String> rdd) { JavaRDD<String> logData1 = rdd.map(new Function<String, String>() { public String call(String s) { return s; } }); List list = logData1.collect(); list.forEach(x->System.out.println(x)); /*for (int i = 0; i < list.size(); i++) { System.out.println(list.get(i)); }*/ } /* * filter主要是過濾資料的功能 * 本案例實現:過濾含有a的那行資料 */ private static void testSparkCoreApiFilter(JavaRDD<String> rdd){ JavaRDD<String> logData1 = rdd.filter(new Function<String, Boolean>() { @Override public Boolean call(String s) throws Exception { return (s.split(" "))[0].equals("a"); } }); // JavaRDD<String> logData1=rdd.filter(new Function<String,Boolean>(){ // public Boolean call(String s){ // // return (s.split(" "))[0].equals("a"); // } // // }); List list = logData1.collect(); list.forEach(System.out::println); // for (int i = 0; i < list.size(); i++) { // System.out.println(list.get(i)); // // // } } /* * flatMap 使用者行轉列 * 本案例實現:列印所有的字元 */ private static void testSparkCoreApiFlatMap(JavaRDD<String> rdd){ JavaRDD<String> words = rdd.flatMap(new FlatMapFunction<String, String>() { @Override public Iterator<String> call(String s) throws Exception { return Arrays.asList(s.split(" ")).iterator(); } }); // JavaRDD<String> words=rdd.flatMap( // new FlatMapFunction<String, String>() { // public Iterable<String> call(String s) throws Exception { // return Arrays.asList(s.split(" ")); // } // } // ); List list = words.collect(); list.forEach(x-> System.out.println(x)); // for (int i = 0; i < list.size(); i++) { // System.out.println(list.get(i)); // } } /** * testSparkCoreApiUnion * 合併兩個RDD * @param rdd */ private static void testSparkCoreApiUnion(JavaRDD<String> rdd){ JavaRDD<String> unionRdd=rdd.union(rdd); // unionRdd.foreach(new VoidFunction<String>(){ // public void call(String lines){ // System.out.println(lines); // } // }); unionRdd.foreach(new VoidFunction<String>() { @Override public void call(String lines) throws Exception { System.out.println(lines); } }); } /** * testSparkCoreApiDistinct Test * 對RDD去重 * @param rdd */ private static void testSparkCoreApiDistinct(JavaRDD<String> rdd){ JavaRDD<String> unionRdd=rdd.union(rdd).distinct(); unionRdd.foreach(new VoidFunction<String>(){ public void call(String lines){ System.out.println(lines); } }); } /** * testSparkCoreApiMaptoPair Test * 把RDD對映為鍵值對型別的資料 * @param rdd */ private static void testSparkCoreApiMaptoPair(JavaRDD<String> rdd){ JavaPairRDD<String,Integer> pairRDD = rdd.mapToPair(new PairFunction<String, String, Integer>() { @Override public Tuple2<String, Integer> call(String s) throws Exception { String[] st = s.split(" "); return new Tuple2(st[0],st[1]); //自動推導型別 } }); // JavaPairRDD<String, Integer> pairRDD=rdd.mapToPair(new PairFunction<String,String,Integer>(){ // @Override // public Tuple2<String, Integer> call(String t) throws Exception { // String[] st=t.split(" "); // return new Tuple2(st[0], st[1]); // } // // }); pairRDD.foreach(new VoidFunction<Tuple2<String, Integer>>(){ @Override public void call(Tuple2<String, Integer> t) throws Exception { System.out.println(t._2()+"=="+t._1()); } }); } /** * testSparkCoreApiGroupByKey Test * 對鍵值對型別的資料進行按鍵值合併 * @param rdd */ private static void testSparkCoreApiGroupByKey(JavaRDD<String> rdd){ JavaPairRDD<String, Integer> pairRdd=rdd.mapToPair(new PairFunction<String,String,Integer>(){ @Override public Tuple2<String, Integer> call(String t) throws Exception { String[] st=t.split(" "); return new Tuple2(st[0], Integer.valueOf(st[1])); } }); JavaPairRDD<String, Iterable<Integer>> pairrdd2= pairRdd.union(pairRdd).groupByKey().sortByKey();//合併後並按照大小排序 pairrdd2.foreach(new VoidFunction<Tuple2<String, Iterable<Integer>>>(){ @Override public void call(Tuple2<String, Iterable<Integer>> t) throws Exception { Iterable<Integer> iter = t._2(); for (Integer integer : iter) { System.out.println(integer); } } }); } /** * testSparkCoreApiReduceByKey * 對鍵值對進行按鍵相同的對值進行操作 * @param rdd */ private static void testSparkCoreApiReduceByKey(JavaRDD<String> rdd){ //轉換成鍵值對 JavaPairRDD<String, Integer> pairRdd=rdd.mapToPair(new PairFunction<String,String,Integer>(){ @Override public Tuple2<String, Integer> call(String t) throws Exception { String[] st=t.split(" "); return new Tuple2(st[0], Integer.valueOf(st[1])); } }); //相同鍵位 相加 並排序 JavaPairRDD<String, Integer> pairrdd2 =pairRdd.union(pairRdd).reduceByKey( new Function2<Integer,Integer,Integer>(){ @Override public Integer call(Integer v1, Integer v2) throws Exception { return v1+v2; } } ).sortByKey() ; //rdd輸出 pairrdd2.foreach(new VoidFunction<Tuple2<String, Integer>>(){ @Override public void call(Tuple2<String, Integer> t) throws Exception { System.out.println(t._2()); } }); } /** * testSparkCoreApiReduce * 對RDD進行遞迴呼叫 * @param rdd */ private static void testSparkCoreApiReduce(JavaRDD<String> rdd){ //由於原資料是String,需要轉為Integer才能進行reduce遞迴 JavaRDD<Integer> rdd1=rdd.map(new Function<String,Integer>(){ @Override public Integer call(String v1) throws Exception { // TODO Auto-generated method stub return Integer.valueOf(v1.split(" ")[1]); } }); Integer a= rdd1.reduce(new Function2<Integer,Integer,Integer>(){ @Override public Integer call(Integer v1,Integer v2) throws Exception { return v1+v2; } }); System.out.println(a); } }