1. 程式人生 > >spark2的transformation和action操作

spark2的transformation和action操作

spark支援兩種RDD操作:transformation和action操作。

transformation操作會針對已有RDD建立一個新的RDD,而action則對RDD進行最後的操作,如遍歷、儲存到檔案等,並將結果返回到Driver程式。

transformation有lazy特性:若一個spark程式只定義了transformation操作,即使執行了該程式,那些操作也不會執行。

action操作則會觸發一個spark job的執行,從而觸發action前面所有的transformation的執行。

 

常用transformation:

map:遍歷RDD的每一個元素,通過自定義方法對其進行操作,獲得一個新的元素,最終組成一個新的RDD。

/**
 * 將集合中每一個元素乘2
 */
private static void map(){
		
		SparkConf conf = new SparkConf().setAppName("map").setMaster("local");
		JavaSparkContext sc = new JavaSparkContext(conf);
		
		List<Integer> numbers = Arrays.asList(1,2,3,4,5);
		//並行化集合,建立初始RDD
		JavaRDD<Integer> numberRDD = sc.parallelize(numbers);
		
		//使用map運算元,將集合中的每個元素都乘2
		//Function的第二個泛型引數型別需自己設定,這個是返回的新元素的型別,call方法返回的引數型別需與之相同
		//call方法中進行計算處理,返回新的元素,這些新的元素會組成新的RDD
		JavaRDD<Integer> multipleNumberRDD = numberRDD.map(new Function<Integer, Integer>() {
			private static final long serialVersionUID = 1L;
			@Override
			public Integer call(Integer v1) throws Exception {
				return v1 * 2;
			}
		});
		
		//列印新的RDD
		multipleNumberRDD.foreach(new VoidFunction<Integer>() {
			private static final long serialVersionUID = 1L;
			@Override
			public void call(Integer t) throws Exception {
				System.out.println(t);
			}
		});
		sc.close();
	}

filter:對RDD的每個元素進行自定義判斷,返回true的保留,false的刪除。

        /**
	 * 過濾集合中的偶數
	 */
	private static void filter(){
		SparkConf conf = new SparkConf().setAppName("filter").setMaster("local");
		JavaSparkContext sc = new JavaSparkContext(conf);
		
		List<Integer> numbers = Arrays.asList(1,2,3,4,5,6,7,8,9,10);
		//並行化集合,建立初始RDD
		JavaRDD<Integer> numberRDD = sc.parallelize(numbers);
		
		//使用filter運算元,過濾出集合中的偶數
		//call方法返回boolean值,若想在新的RDD中保留這個元素就返回true,否則返回false
		JavaRDD<Integer> evenNumberRDD = numberRDD.filter(new Function<Integer, Boolean>() {
			private static final long serialVersionUID = 1L;
			@Override
			public Boolean call(Integer v1) throws Exception {
				return v1 % 2 == 0;
			}
		});
		
		//列印新的RDD
		evenNumberRDD.foreach(new VoidFunction<Integer>() {
			private static final long serialVersionUID = 1L;
			@Override
			public void call(Integer t) throws Exception {
				System.out.println(t);
			}
		});
		sc.close();
	}

flatMap:與map類似,但對每個元素都可返回一個或多個新的元素。

    /**
	 * 將行拆分為單詞
	 */
	private static void flatMap(){
		SparkConf conf = new SparkConf().setAppName("flatMap").setMaster("local");
		JavaSparkContext sc = new JavaSparkContext(conf);
		
		List<String> lineList = Arrays.asList("hello you","hello me","hello world");
		JavaRDD<String> lines = sc.parallelize(lineList);
		
		//將每一行文字,拆分成多個單詞
		//flatMap接收的引數是FlatMapFunction,需自定義第二個泛型引數,此引數代表了返回的新元素的型別;
		//call方法返回的型別是Iterator<U>,此處U與flatMap的第二個引數型別相同;
		//flatMap:接收原始RDD的每個元素,在call方法中進行各種計算和處理,返回多個元素;
		//多個元素封裝在Iterator集合中,可使用ArrayList等集合;
		//新的RDD中封裝了所有的新元素,新的RDD大小大於原始RDD
		JavaRDD<String> words = lines.flatMap(new FlatMapFunction<String, String>() {
			private static final long serialVersionUID = 1L;
			@Override
			public Iterator<String> call(String t) throws Exception {
				return Arrays.asList(t.split(" ")).iterator();
			}
		});
		
		words.foreach(new VoidFunction<String>() {
			private static final long serialVersionUID = 1L;
			@Override
			public void call(String v1) throws Exception {
				System.out.println(v1);
			}
		});
		sc.close();
	}

groupByKey:根據key進行分組,每個key對應一個Iterable<value>。

    /**
	 * 將每個班級的成績進行分組
	 */
	private static void groupByKey(){
		
		SparkConf conf = new SparkConf().setAppName("froupByKey").setMaster("local");
		JavaSparkContext sc = new JavaSparkContext(conf);
		
		List<Tuple2<String,Integer>> scoreList = Arrays.asList(
				new Tuple2<String, Integer>("class01",80),
				new Tuple2<String, Integer>("class02",81),
				new Tuple2<String, Integer>("class01",82),
				new Tuple2<String, Integer>("class02",83));
		
		//並行化集合,建立JavaPairRDD
		JavaPairRDD<String,Integer> scores = sc.parallelizePairs(scoreList);
		
		//使用groupByKey對每個班級的成績進行分組
		//使用groupByKey返回的是一個JavaPairRDD,第一個泛型型別不變,第二個變成Iterable這種集合型別,
		//即,按照key進行分組,每個key可能有多個value,此時多個value聚合成了iterable。
		JavaPairRDD<String, Iterable<Integer>> groupedScore = scores.groupByKey();
		
		groupedScore.foreach(new VoidFunction<Tuple2<String,Iterable<Integer>>>() {
			private static final long serialVersionUID = 1L;
			@Override
			public void call(Tuple2<String, Iterable<Integer>> t) throws Exception {
				System.out.println(t._1);
				Iterator<Integer> ite = t._2.iterator();
				while(ite.hasNext()){
					System.out.println(ite.next());
				}
				System.out.println("**********************************");
			}
		});
		sc.close();
	}

reduceByKey:對每個key對應的value進行reduce操作。

    /**
	 * 統計每個班級的總分
	 */
	private static void reduceByKey(){
		
		SparkConf conf = new SparkConf().setAppName("reduceByKey").setMaster("local");
		JavaSparkContext sc = new JavaSparkContext(conf);
		
		List<Tuple2<String,Integer>> scoreList = Arrays.asList(
				new Tuple2<String, Integer>("class01",80),
				new Tuple2<String, Integer>("class02",81),
				new Tuple2<String, Integer>("class01",82),
				new Tuple2<String, Integer>("class02",83));
		
		JavaPairRDD<String, Integer> scores = sc.parallelizePairs(scoreList);
		
		//reduceByKey,接收的引數是Function2型別,它有三個泛型引數,代表了三個值;
		//第一二個泛型型別,代表了原始的RDD中的元素的value的型別
		//第三個泛型的型別,代表了每次reduce操作返回值的型別,預設也是與原始RDD的value型別相同
		JavaPairRDD<String, Integer> totalScores = scores.reduceByKey(new Function2<Integer, Integer, Integer>() {
			private static final long serialVersionUID = 1L;
			@Override
			public Integer call(Integer v1, Integer v2) throws Exception {
				return v1 + v2;
			}
		});
		
		totalScores.foreach(new VoidFunction<Tuple2<String,Integer>>() {
			private static final long serialVersionUID = 1L;
			@Override
			public void call(Tuple2<String, Integer> t) throws Exception {
				System.out.println(t._1 + ":" + t._2);
			}
		});
		sc.close();
	}

sortByKey:對每個key對應的value進行排序操作。

    /**
	 * 將學生分數進行排序
	 */
	private static void sortByKey(){
		
		SparkConf conf = new SparkConf().setAppName("sortByKey").setMaster("local");
		JavaSparkContext sc = new JavaSparkContext(conf);
		
		List<Tuple2<Integer, String>> scoreList = Arrays.asList(
				new Tuple2<Integer, String>(65,"leo1"),
				new Tuple2<Integer, String>(45,"leo2"),
				new Tuple2<Integer, String>(85,"leo3"),
				new Tuple2<Integer, String>(67,"leo4")
				); 
		
		JavaPairRDD<Integer, String> scores = sc.parallelizePairs(scoreList);
		
		//使用sortByKey進行排序,無參為升序,false為降序
		JavaPairRDD<Integer, String> sortedScores = scores.sortByKey(false);//false為降序
		
		sortedScores.foreach(new VoidFunction<Tuple2<Integer,String>>() {
			private static final long serialVersionUID = 1L;
			@Override
			public void call(Tuple2<Integer, String> t) throws Exception {
				System.out.println(t._1 + ":" + t._2);
			}
		});
		sc.close();
	}

join:對兩個包含<key,value>對的RDD進行join操作,每個key對應的pair都會傳入自定義函式進行處理。

    /**
	 * 列印每個學生的成績
	 */
	private static void join(){
		
		SparkConf conf = new SparkConf().setAppName("join").setMaster("local");
		JavaSparkContext sc = new JavaSparkContext(conf);
		
		List<Tuple2<Integer, String>> studentList = Arrays.asList(
				new Tuple2<Integer, String>(1,"leo1"),
				new Tuple2<Integer, String>(2,"leo2"),
				new Tuple2<Integer, String>(3,"leo3"),
				new Tuple2<Integer, String>(4,"leo4")
				);
		List<Tuple2<Integer, Integer>> scoreList = Arrays.asList(
				new Tuple2<Integer, Integer>(1,100),
				new Tuple2<Integer, Integer>(2,88),
				new Tuple2<Integer, Integer>(3,75),
				new Tuple2<Integer, Integer>(4,97)
				);
		
		//並行化兩個RDD
		JavaPairRDD<Integer, String> students = sc.parallelizePairs(studentList);
		JavaPairRDD<Integer, Integer> scores = sc.parallelizePairs(scoreList);
		
		JavaPairRDD<Integer, Tuple2<String, Integer>> studentScore = students.join(scores);
		
		//使用join運算元關聯兩個RDD
		//join以後,還是會根據key進行join,並返回JavaPairRDD,
		//但是JavaPairRDD的第一個泛型型別,是之前兩個JavaPairRDD的key的型別,因為是通過key進行join的
		//第二個泛型型別,是Tuple2<v1,v2>的型別,Tuple2的兩個泛型分別為原始RDD的value的型別
		studentScore.foreach(new VoidFunction<Tuple2<Integer,Tuple2<String,Integer>>>() {
			private static final long serialVersionUID = 1L;
			@Override
			public void call(Tuple2<Integer, Tuple2<String, Integer>> t) throws Exception {
				System.out.println("student id: " + t._1);
				System.out.println("student name: " + t._2._1);
				System.out.println("student score: " + t._2._2);
				System.out.println("*****************************");
			}
		});
		sc.close();
	}

cogroup:同join,但是每個key對應的Iterable<value>都會傳入自定義函式進行處理。

    /**
	 * 列印每個學生的成績
	 */
	private static void cogroup(){
		
		SparkConf conf = new SparkConf().setAppName("cogroup").setMaster("local");
		JavaSparkContext sc = new JavaSparkContext(conf);
		
		List<Tuple2<Integer, String>> studentList = Arrays.asList(
				new Tuple2<Integer, String>(1,"leo1"),
				new Tuple2<Integer, String>(2,"leo2"),
				new Tuple2<Integer, String>(3,"leo3"),
				new Tuple2<Integer, String>(4,"leo4")
				);
		List<Tuple2<Integer, Integer>> scoreList = Arrays.asList(
				new Tuple2<Integer, Integer>(1,100),
				new Tuple2<Integer, Integer>(2,88),
				new Tuple2<Integer, Integer>(3,75),
				new Tuple2<Integer, Integer>(4,97),
				new Tuple2<Integer, Integer>(1,80),
				new Tuple2<Integer, Integer>(2,68),
				new Tuple2<Integer, Integer>(3,95),
				new Tuple2<Integer, Integer>(4,57)
				);
		
		//並行化兩個RDD
		JavaPairRDD<Integer, String> students = sc.parallelizePairs(studentList);
		JavaPairRDD<Integer, Integer> scores = sc.parallelizePairs(scoreList);
		
		//cogroup與join不同
		//cogroup相當於一個key join上的所有value,都放在一個Iterable裡
		JavaPairRDD<Integer, Tuple2<Iterable<String>, Iterable<Integer>>> studentScore = students.cogroup(scores);
		
		studentScore.foreach(new VoidFunction<Tuple2<Integer,Tuple2<Iterable<String>,Iterable<Integer>>>>() {
			private static final long serialVersionUID = 1L;
			@Override
			public void call(Tuple2<Integer, Tuple2<Iterable<String>, Iterable<Integer>>> t) throws Exception {
				System.out.println("student id: " + t._1);
				System.out.println("student name: " + t._2._1);
				System.out.println("student score: " + t._2._2);
				System.out.println("*****************************");
			}
		});
		sc.close();
	}

 

常用action操作:

reduce:將RDD的所有元素進行聚合操作。

/**
	 * 使用reduce進行累加
	 */
	private static void reduce(){
		
		SparkConf conf = new SparkConf().setAppName("reduce").setMaster("local");
		JavaSparkContext sc = new JavaSparkContext(conf);
		
		List<Integer> numberList = Arrays.asList(1,2,3,4,5,6,7,8,9,10);
		JavaRDD<Integer> numbers = sc.parallelize(numberList);
		
		//使用reduce操作對集合中的數字進行累加
		//reduce操作原理:
		//首先將第一個和第二個元素傳入call()方法,進行計算獲得一個結果,
		//再將該結果與下一個元素傳入call()方法,進行計算,以此類推
		//所以reduce就是聚合,將多個元素聚合成一個元素
		int count = numbers.reduce(new Function2<Integer, Integer, Integer>() {
			private static final long serialVersionUID = 1L;
			@Override
			public Integer call(Integer v1, Integer v2) throws Exception {
				return v1 + v2;
			}
		});
		System.out.println(count);
		sc.close();
	}

collect:將RDD中所有元素獲取到本地客戶端。

    private static void collect(){
		
		SparkConf conf = new SparkConf().setAppName("collect").setMaster("local");
		JavaSparkContext sc = new JavaSparkContext(conf);
		
		List<Integer> numberList = Arrays.asList(1,2,3,4,5,6,7,8,9,10);
		JavaRDD<Integer> numbers = sc.parallelize(numberList);
		
		JavaRDD<Integer> doubleNumbers = numbers.map(new Function<Integer, Integer>() {
			private static final long serialVersionUID = 1L;
			@Override
			public Integer call(Integer t) throws Exception {
				return t * 2;
			}
		});
		
		//不用foreach action操作,在遠端叢集上遍歷RDD的元素
		//而使用collect操作,將分佈在遠端叢集上的doubleNumbers RDD的資料拉取到本地
		//效能差,若資料量大,會造成記憶體溢位,所以,不建議使用,推薦使用foreach
		List<Integer> doubleNumberList = doubleNumbers.collect();
		for(Integer num:doubleNumberList){
			System.out.println(num);
		}
		sc.close();
	}

count:獲取RDD元素總數。

        private static void count(){
		
		SparkConf conf = new SparkConf().setAppName("count").setMaster("local");
		JavaSparkContext sc = new JavaSparkContext(conf);
		
		List<Integer> numberList = Arrays.asList(1,2,3,4,5,6,7,8,9,10);
		JavaRDD<Integer> numbers = sc.parallelize(numberList);
		
		//對RDD進行count操作,統計有多少元素
		long count = numbers.count();
		System.out.println(count);
		sc.close();
	}

take(n):獲取RDD前n個元素。

        private static void take(){
		
		SparkConf conf = new SparkConf().setAppName("take").setMaster("local");
		JavaSparkContext sc = new JavaSparkContext(conf);
		
		List<Integer> numberList = Arrays.asList(1,2,3,4,5,6,7,8,9,10);
		JavaRDD<Integer> numbers = sc.parallelize(numberList);
		
		//take操作,與collect類似,也是從遠端叢集上獲取RDD的資料
		//但是collect是獲取RDD的所有資料,take只是獲取前n個數據
		List<Integer> top3Number = numbers.take(3);
		for(Integer num : top3Number){
			System.out.println(num);
		}
		sc.close();
	}

saveAsTextFile:將RDD元素儲存到檔案中,對每個元素呼叫toString方法。

        private static void saveAsTextFile(){
		
		SparkConf conf = new SparkConf().setAppName("saveAsTextFile");
		JavaSparkContext sc = new JavaSparkContext(conf);
		
		List<Integer> numberList = Arrays.asList(1,2,3,4,5,6,7,8,9,10);
		JavaRDD<Integer> numbers = sc.parallelize(numberList);
		
		JavaRDD<Integer> doubleNumbers = numbers.map(new Function<Integer, Integer>() {
			private static final long serialVersionUID = 1L;
			@Override
			public Integer call(Integer t) throws Exception {
				return t * 2;
			}
		});
		
		//直接將RDD中的資料儲存在HDFS檔案中
		doubleNumbers.saveAsTextFile("/data/double_number.txt");
		sc.close();
	}

countByKey:對每個key對應的值進行計數。

	private static void countByKey(){
		
		SparkConf conf = new SparkConf().setAppName("countByKey").setMaster("local");
		JavaSparkContext sc = new JavaSparkContext(conf);
		
		List<Tuple2<String, String>> studentList = Arrays.asList(
				new Tuple2<String, String>("class02","leo1"),
				new Tuple2<String, String>("class01","leo2"),
				new Tuple2<String, String>("class02","leo3"),
				new Tuple2<String, String>("class01","leo4")
		);
		//並行化兩個RDD
		JavaPairRDD<String, String> students = sc.parallelizePairs(studentList);
		
		//統計每個班的學生人數,也就是統計每個key對應的元素個數
		Map<String, Long> studentCounts = students.countByKey();
		
		for(Map.Entry<String, Long> studentCount : studentCounts.entrySet()){
			System.out.println(studentCount.getKey() + ":" + studentCount.getValue());
		}
		sc.close();
	}

foreach:遍歷RDD的每個元素。