spark2的transformation和action操作
spark支援兩種RDD操作:transformation和action操作。
transformation操作會針對已有RDD建立一個新的RDD,而action則對RDD進行最後的操作,如遍歷、儲存到檔案等,並將結果返回到Driver程式。
transformation有lazy特性:若一個spark程式只定義了transformation操作,即使執行了該程式,那些操作也不會執行。
action操作則會觸發一個spark job的執行,從而觸發action前面所有的transformation的執行。
常用transformation:
map:遍歷RDD的每一個元素,通過自定義方法對其進行操作,獲得一個新的元素,最終組成一個新的RDD。
/** * 將集合中每一個元素乘2 */ private static void map(){ SparkConf conf = new SparkConf().setAppName("map").setMaster("local"); JavaSparkContext sc = new JavaSparkContext(conf); List<Integer> numbers = Arrays.asList(1,2,3,4,5); //並行化集合,建立初始RDD JavaRDD<Integer> numberRDD = sc.parallelize(numbers); //使用map運算元,將集合中的每個元素都乘2 //Function的第二個泛型引數型別需自己設定,這個是返回的新元素的型別,call方法返回的引數型別需與之相同 //call方法中進行計算處理,返回新的元素,這些新的元素會組成新的RDD JavaRDD<Integer> multipleNumberRDD = numberRDD.map(new Function<Integer, Integer>() { private static final long serialVersionUID = 1L; @Override public Integer call(Integer v1) throws Exception { return v1 * 2; } }); //列印新的RDD multipleNumberRDD.foreach(new VoidFunction<Integer>() { private static final long serialVersionUID = 1L; @Override public void call(Integer t) throws Exception { System.out.println(t); } }); sc.close(); }
filter:對RDD的每個元素進行自定義判斷,返回true的保留,false的刪除。
/** * 過濾集合中的偶數 */ private static void filter(){ SparkConf conf = new SparkConf().setAppName("filter").setMaster("local"); JavaSparkContext sc = new JavaSparkContext(conf); List<Integer> numbers = Arrays.asList(1,2,3,4,5,6,7,8,9,10); //並行化集合,建立初始RDD JavaRDD<Integer> numberRDD = sc.parallelize(numbers); //使用filter運算元,過濾出集合中的偶數 //call方法返回boolean值,若想在新的RDD中保留這個元素就返回true,否則返回false JavaRDD<Integer> evenNumberRDD = numberRDD.filter(new Function<Integer, Boolean>() { private static final long serialVersionUID = 1L; @Override public Boolean call(Integer v1) throws Exception { return v1 % 2 == 0; } }); //列印新的RDD evenNumberRDD.foreach(new VoidFunction<Integer>() { private static final long serialVersionUID = 1L; @Override public void call(Integer t) throws Exception { System.out.println(t); } }); sc.close(); }
flatMap:與map類似,但對每個元素都可返回一個或多個新的元素。
/**
* 將行拆分為單詞
*/
private static void flatMap(){
SparkConf conf = new SparkConf().setAppName("flatMap").setMaster("local");
JavaSparkContext sc = new JavaSparkContext(conf);
List<String> lineList = Arrays.asList("hello you","hello me","hello world");
JavaRDD<String> lines = sc.parallelize(lineList);
//將每一行文字,拆分成多個單詞
//flatMap接收的引數是FlatMapFunction,需自定義第二個泛型引數,此引數代表了返回的新元素的型別;
//call方法返回的型別是Iterator<U>,此處U與flatMap的第二個引數型別相同;
//flatMap:接收原始RDD的每個元素,在call方法中進行各種計算和處理,返回多個元素;
//多個元素封裝在Iterator集合中,可使用ArrayList等集合;
//新的RDD中封裝了所有的新元素,新的RDD大小大於原始RDD
JavaRDD<String> words = lines.flatMap(new FlatMapFunction<String, String>() {
private static final long serialVersionUID = 1L;
@Override
public Iterator<String> call(String t) throws Exception {
return Arrays.asList(t.split(" ")).iterator();
}
});
words.foreach(new VoidFunction<String>() {
private static final long serialVersionUID = 1L;
@Override
public void call(String v1) throws Exception {
System.out.println(v1);
}
});
sc.close();
}
groupByKey:根據key進行分組,每個key對應一個Iterable<value>。
/**
* 將每個班級的成績進行分組
*/
private static void groupByKey(){
SparkConf conf = new SparkConf().setAppName("froupByKey").setMaster("local");
JavaSparkContext sc = new JavaSparkContext(conf);
List<Tuple2<String,Integer>> scoreList = Arrays.asList(
new Tuple2<String, Integer>("class01",80),
new Tuple2<String, Integer>("class02",81),
new Tuple2<String, Integer>("class01",82),
new Tuple2<String, Integer>("class02",83));
//並行化集合,建立JavaPairRDD
JavaPairRDD<String,Integer> scores = sc.parallelizePairs(scoreList);
//使用groupByKey對每個班級的成績進行分組
//使用groupByKey返回的是一個JavaPairRDD,第一個泛型型別不變,第二個變成Iterable這種集合型別,
//即,按照key進行分組,每個key可能有多個value,此時多個value聚合成了iterable。
JavaPairRDD<String, Iterable<Integer>> groupedScore = scores.groupByKey();
groupedScore.foreach(new VoidFunction<Tuple2<String,Iterable<Integer>>>() {
private static final long serialVersionUID = 1L;
@Override
public void call(Tuple2<String, Iterable<Integer>> t) throws Exception {
System.out.println(t._1);
Iterator<Integer> ite = t._2.iterator();
while(ite.hasNext()){
System.out.println(ite.next());
}
System.out.println("**********************************");
}
});
sc.close();
}
reduceByKey:對每個key對應的value進行reduce操作。
/**
* 統計每個班級的總分
*/
private static void reduceByKey(){
SparkConf conf = new SparkConf().setAppName("reduceByKey").setMaster("local");
JavaSparkContext sc = new JavaSparkContext(conf);
List<Tuple2<String,Integer>> scoreList = Arrays.asList(
new Tuple2<String, Integer>("class01",80),
new Tuple2<String, Integer>("class02",81),
new Tuple2<String, Integer>("class01",82),
new Tuple2<String, Integer>("class02",83));
JavaPairRDD<String, Integer> scores = sc.parallelizePairs(scoreList);
//reduceByKey,接收的引數是Function2型別,它有三個泛型引數,代表了三個值;
//第一二個泛型型別,代表了原始的RDD中的元素的value的型別
//第三個泛型的型別,代表了每次reduce操作返回值的型別,預設也是與原始RDD的value型別相同
JavaPairRDD<String, Integer> totalScores = scores.reduceByKey(new Function2<Integer, Integer, Integer>() {
private static final long serialVersionUID = 1L;
@Override
public Integer call(Integer v1, Integer v2) throws Exception {
return v1 + v2;
}
});
totalScores.foreach(new VoidFunction<Tuple2<String,Integer>>() {
private static final long serialVersionUID = 1L;
@Override
public void call(Tuple2<String, Integer> t) throws Exception {
System.out.println(t._1 + ":" + t._2);
}
});
sc.close();
}
sortByKey:對每個key對應的value進行排序操作。
/**
* 將學生分數進行排序
*/
private static void sortByKey(){
SparkConf conf = new SparkConf().setAppName("sortByKey").setMaster("local");
JavaSparkContext sc = new JavaSparkContext(conf);
List<Tuple2<Integer, String>> scoreList = Arrays.asList(
new Tuple2<Integer, String>(65,"leo1"),
new Tuple2<Integer, String>(45,"leo2"),
new Tuple2<Integer, String>(85,"leo3"),
new Tuple2<Integer, String>(67,"leo4")
);
JavaPairRDD<Integer, String> scores = sc.parallelizePairs(scoreList);
//使用sortByKey進行排序,無參為升序,false為降序
JavaPairRDD<Integer, String> sortedScores = scores.sortByKey(false);//false為降序
sortedScores.foreach(new VoidFunction<Tuple2<Integer,String>>() {
private static final long serialVersionUID = 1L;
@Override
public void call(Tuple2<Integer, String> t) throws Exception {
System.out.println(t._1 + ":" + t._2);
}
});
sc.close();
}
join:對兩個包含<key,value>對的RDD進行join操作,每個key對應的pair都會傳入自定義函式進行處理。
/**
* 列印每個學生的成績
*/
private static void join(){
SparkConf conf = new SparkConf().setAppName("join").setMaster("local");
JavaSparkContext sc = new JavaSparkContext(conf);
List<Tuple2<Integer, String>> studentList = Arrays.asList(
new Tuple2<Integer, String>(1,"leo1"),
new Tuple2<Integer, String>(2,"leo2"),
new Tuple2<Integer, String>(3,"leo3"),
new Tuple2<Integer, String>(4,"leo4")
);
List<Tuple2<Integer, Integer>> scoreList = Arrays.asList(
new Tuple2<Integer, Integer>(1,100),
new Tuple2<Integer, Integer>(2,88),
new Tuple2<Integer, Integer>(3,75),
new Tuple2<Integer, Integer>(4,97)
);
//並行化兩個RDD
JavaPairRDD<Integer, String> students = sc.parallelizePairs(studentList);
JavaPairRDD<Integer, Integer> scores = sc.parallelizePairs(scoreList);
JavaPairRDD<Integer, Tuple2<String, Integer>> studentScore = students.join(scores);
//使用join運算元關聯兩個RDD
//join以後,還是會根據key進行join,並返回JavaPairRDD,
//但是JavaPairRDD的第一個泛型型別,是之前兩個JavaPairRDD的key的型別,因為是通過key進行join的
//第二個泛型型別,是Tuple2<v1,v2>的型別,Tuple2的兩個泛型分別為原始RDD的value的型別
studentScore.foreach(new VoidFunction<Tuple2<Integer,Tuple2<String,Integer>>>() {
private static final long serialVersionUID = 1L;
@Override
public void call(Tuple2<Integer, Tuple2<String, Integer>> t) throws Exception {
System.out.println("student id: " + t._1);
System.out.println("student name: " + t._2._1);
System.out.println("student score: " + t._2._2);
System.out.println("*****************************");
}
});
sc.close();
}
cogroup:同join,但是每個key對應的Iterable<value>都會傳入自定義函式進行處理。
/**
* 列印每個學生的成績
*/
private static void cogroup(){
SparkConf conf = new SparkConf().setAppName("cogroup").setMaster("local");
JavaSparkContext sc = new JavaSparkContext(conf);
List<Tuple2<Integer, String>> studentList = Arrays.asList(
new Tuple2<Integer, String>(1,"leo1"),
new Tuple2<Integer, String>(2,"leo2"),
new Tuple2<Integer, String>(3,"leo3"),
new Tuple2<Integer, String>(4,"leo4")
);
List<Tuple2<Integer, Integer>> scoreList = Arrays.asList(
new Tuple2<Integer, Integer>(1,100),
new Tuple2<Integer, Integer>(2,88),
new Tuple2<Integer, Integer>(3,75),
new Tuple2<Integer, Integer>(4,97),
new Tuple2<Integer, Integer>(1,80),
new Tuple2<Integer, Integer>(2,68),
new Tuple2<Integer, Integer>(3,95),
new Tuple2<Integer, Integer>(4,57)
);
//並行化兩個RDD
JavaPairRDD<Integer, String> students = sc.parallelizePairs(studentList);
JavaPairRDD<Integer, Integer> scores = sc.parallelizePairs(scoreList);
//cogroup與join不同
//cogroup相當於一個key join上的所有value,都放在一個Iterable裡
JavaPairRDD<Integer, Tuple2<Iterable<String>, Iterable<Integer>>> studentScore = students.cogroup(scores);
studentScore.foreach(new VoidFunction<Tuple2<Integer,Tuple2<Iterable<String>,Iterable<Integer>>>>() {
private static final long serialVersionUID = 1L;
@Override
public void call(Tuple2<Integer, Tuple2<Iterable<String>, Iterable<Integer>>> t) throws Exception {
System.out.println("student id: " + t._1);
System.out.println("student name: " + t._2._1);
System.out.println("student score: " + t._2._2);
System.out.println("*****************************");
}
});
sc.close();
}
常用action操作:
reduce:將RDD的所有元素進行聚合操作。
/**
* 使用reduce進行累加
*/
private static void reduce(){
SparkConf conf = new SparkConf().setAppName("reduce").setMaster("local");
JavaSparkContext sc = new JavaSparkContext(conf);
List<Integer> numberList = Arrays.asList(1,2,3,4,5,6,7,8,9,10);
JavaRDD<Integer> numbers = sc.parallelize(numberList);
//使用reduce操作對集合中的數字進行累加
//reduce操作原理:
//首先將第一個和第二個元素傳入call()方法,進行計算獲得一個結果,
//再將該結果與下一個元素傳入call()方法,進行計算,以此類推
//所以reduce就是聚合,將多個元素聚合成一個元素
int count = numbers.reduce(new Function2<Integer, Integer, Integer>() {
private static final long serialVersionUID = 1L;
@Override
public Integer call(Integer v1, Integer v2) throws Exception {
return v1 + v2;
}
});
System.out.println(count);
sc.close();
}
collect:將RDD中所有元素獲取到本地客戶端。
private static void collect(){
SparkConf conf = new SparkConf().setAppName("collect").setMaster("local");
JavaSparkContext sc = new JavaSparkContext(conf);
List<Integer> numberList = Arrays.asList(1,2,3,4,5,6,7,8,9,10);
JavaRDD<Integer> numbers = sc.parallelize(numberList);
JavaRDD<Integer> doubleNumbers = numbers.map(new Function<Integer, Integer>() {
private static final long serialVersionUID = 1L;
@Override
public Integer call(Integer t) throws Exception {
return t * 2;
}
});
//不用foreach action操作,在遠端叢集上遍歷RDD的元素
//而使用collect操作,將分佈在遠端叢集上的doubleNumbers RDD的資料拉取到本地
//效能差,若資料量大,會造成記憶體溢位,所以,不建議使用,推薦使用foreach
List<Integer> doubleNumberList = doubleNumbers.collect();
for(Integer num:doubleNumberList){
System.out.println(num);
}
sc.close();
}
count:獲取RDD元素總數。
private static void count(){
SparkConf conf = new SparkConf().setAppName("count").setMaster("local");
JavaSparkContext sc = new JavaSparkContext(conf);
List<Integer> numberList = Arrays.asList(1,2,3,4,5,6,7,8,9,10);
JavaRDD<Integer> numbers = sc.parallelize(numberList);
//對RDD進行count操作,統計有多少元素
long count = numbers.count();
System.out.println(count);
sc.close();
}
take(n):獲取RDD前n個元素。
private static void take(){
SparkConf conf = new SparkConf().setAppName("take").setMaster("local");
JavaSparkContext sc = new JavaSparkContext(conf);
List<Integer> numberList = Arrays.asList(1,2,3,4,5,6,7,8,9,10);
JavaRDD<Integer> numbers = sc.parallelize(numberList);
//take操作,與collect類似,也是從遠端叢集上獲取RDD的資料
//但是collect是獲取RDD的所有資料,take只是獲取前n個數據
List<Integer> top3Number = numbers.take(3);
for(Integer num : top3Number){
System.out.println(num);
}
sc.close();
}
saveAsTextFile:將RDD元素儲存到檔案中,對每個元素呼叫toString方法。
private static void saveAsTextFile(){
SparkConf conf = new SparkConf().setAppName("saveAsTextFile");
JavaSparkContext sc = new JavaSparkContext(conf);
List<Integer> numberList = Arrays.asList(1,2,3,4,5,6,7,8,9,10);
JavaRDD<Integer> numbers = sc.parallelize(numberList);
JavaRDD<Integer> doubleNumbers = numbers.map(new Function<Integer, Integer>() {
private static final long serialVersionUID = 1L;
@Override
public Integer call(Integer t) throws Exception {
return t * 2;
}
});
//直接將RDD中的資料儲存在HDFS檔案中
doubleNumbers.saveAsTextFile("/data/double_number.txt");
sc.close();
}
countByKey:對每個key對應的值進行計數。
private static void countByKey(){
SparkConf conf = new SparkConf().setAppName("countByKey").setMaster("local");
JavaSparkContext sc = new JavaSparkContext(conf);
List<Tuple2<String, String>> studentList = Arrays.asList(
new Tuple2<String, String>("class02","leo1"),
new Tuple2<String, String>("class01","leo2"),
new Tuple2<String, String>("class02","leo3"),
new Tuple2<String, String>("class01","leo4")
);
//並行化兩個RDD
JavaPairRDD<String, String> students = sc.parallelizePairs(studentList);
//統計每個班的學生人數,也就是統計每個key對應的元素個數
Map<String, Long> studentCounts = students.countByKey();
for(Map.Entry<String, Long> studentCount : studentCounts.entrySet()){
System.out.println(studentCount.getKey() + ":" + studentCount.getValue());
}
sc.close();
}
foreach:遍歷RDD的每個元素。