Spark-core運算元大全(java,scala,python)
阿新 • • 發佈:2019-01-28
/** * Java版本導包相關 */ import org.apache.spark.Partitioner; import org.apache.spark.SparkConf; import org.apache.spark.api.java.JavaPairRDD; import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaSparkContext; import org.apache.spark.api.java.function.*; import scala.Tuple2; import java.util.ArrayList; import java.util.Arrays; import java.util.Iterator; import java.util.List;
/**
* Scala版本導包相關
*/
import org.apache.spark.rdd.RDD
import org.apache.spark.{HashPartitioner, SparkConf, SparkContext}
import scala.collection.mutable.ArrayBuffer
---------------------------寫在前面的東西--------------------------------------
public class TransformationOperator { /** * java前面的東西 */ public static SparkConf conf = new SparkConf().setMaster("local").setAppName("test"); public static JavaSparkContext sc = new JavaSparkContext(conf); public static void println(String str) { System.out.println(str); } //運算元 //main()
object TransformationOperator { /** * Scala前面的東西 */ val conf: SparkConf = new SparkConf() conf.setMaster("local") conf.setAppName("TransformationOperator") val sc: SparkContext = new SparkContext(conf) val list: List[String] = List("張無忌","趙敏","周芷若") val rdd: RDD[String] = sc.parallelize(list) //運算元 //main()
---------------------------Transformation運算元--------------------------------------
map()
/**
* map()
*/
public static void map() {
final List<String> list = Arrays.asList("張無忌", "趙敏", "周芷若");
//通過並行化的方式建立RDD
final JavaRDD<String> rdd = sc.parallelize(list);
final JavaRDD<String> nameRDD = rdd.map(new Function<String, String>() {
@Override
public String call(String name) throws Exception {
return "Hello " + name;
}
});
nameRDD.foreach(new VoidFunction<String>() {
@Override
public void call(String s) throws Exception {
println(s);
}
});
}
/**
* name:張無忌
* name:趙敏
* name:周芷若
*/
def map(): Unit ={
rdd.map("name:"+_).foreach(println(_))
}
flatMap()
public static void flatMap() {
final List<String> list = Arrays.asList("張無忌 趙敏", "宋青書 周芷若");
final JavaRDD<String> rdd = sc.parallelize(list);
rdd.flatMap(new FlatMapFunction<String, String>() {
@Override
public Iterator<String> call(String names) throws Exception {
return Arrays.asList(names.split(" ")).iterator();
}
}).map(new Function<String, String>() {
@Override
public String call(String name) throws Exception {
return "Hello " + name;
}
}).foreach(new VoidFunction<String>() {
@Override
public void call(String line) throws Exception {
println(line);
}
});
}
/**
* Hello:張無忌
* Hello:趙敏
* Hello:宋青書
* Hello:周芷若
*/
def flatMap()={
val rdd1: RDD[String] = sc.parallelize(List("張無忌 趙敏", "宋青書 周芷若"))
rdd1.flatMap(_.split(",")).flatMap(_.split(" ")).foreach(word=>println("Hello:"+word))
}
filter()
/**
* 從RDD過濾出來偶數
*/
public static void filter() {
final List<Integer> list = Arrays.asList(1, 2, 3, 4, 5, 6, 7);
final JavaRDD<Integer> rdd = sc.parallelize(list);
final JavaRDD<Integer> filterRDD = rdd.filter(new Function<Integer, Boolean>() {
//true 代表這個值我們要
@Override
public Boolean call(Integer number) throws Exception {
return number % 2 == 0;
}
});
filterRDD.foreach(new VoidFunction<Integer>() {
@Override
public void call(Integer integer) throws Exception {
println(integer + "");
}
});
}
/**
* 2,4,6
*/
def filter()={
val list:List[Int] = List(1,2,3,4,5,6,7)
val rdd1: RDD[Int] = sc.parallelize(list)
rdd1.filter(_%2==0).foreach(println(_))
}
groupByKey()
不會做區域性彙總
/**
* RDD()
* bykey
* -結果:
* 峨眉
* 周芷若滅絕師太
* 武當
* 宋青書張三丰
*/
public static void groupBykey() {
final List<Tuple2<String, String>> list = Arrays.asList(
new Tuple2<String, String>("峨眉", "周芷若"),
new Tuple2<String, String>("武當", "宋青書"),
new Tuple2<String, String>("峨眉", "滅絕師太"),
new Tuple2<String, String>("武當", "張三丰")
);
final JavaPairRDD<String, String> rdd = sc.parallelizePairs(list);
final JavaPairRDD<String, Iterable<String>> groupBykeyRDD = rdd.groupByKey();
groupBykeyRDD.foreach(new VoidFunction<Tuple2<String, Iterable<String>>>() {
@Override
public void call(Tuple2<String, Iterable<String>> tuple) throws Exception {
final String menpai = tuple._1;
final Iterator<String> iterator = tuple._2.iterator();
println(menpai + " ");
while (iterator.hasNext()) {
final String name = iterator.next();
System.out.print(name);
}
println("");
}
});
}
/**
* 後面是可以傳分割槽規則的,如果不寫預設是Hash分割槽
* (峨眉,CompactBuffer(周芷若, 滅絕師太))
* (武當,CompactBuffer(宋青書, 張三丰))
*/
def groupBykey()= {
val list: List[(String, String)] = List(("峨眉", "周芷若"), ("武當", "宋青書"), ("峨眉", "滅絕師太"), ("武當", "張三丰"))
val rdd: RDD[(String, String)] = sc.parallelize(list)
rdd.groupByKey().foreach(println(_))
}
reduceByKey()
會進行區域性彙總,減少shuffle量,所以儘量使用該運算元,避免使用groupByKey()
/**
*峨眉 100
* 武當 129
*/
public static void reduceBykey() {
final List<Tuple2<String, Integer>> list = Arrays.asList(
new Tuple2<String, Integer>("峨眉", 40),
new Tuple2<String, Integer>("武當", 30),
new Tuple2<String, Integer>("峨眉", 60),
new Tuple2<String, Integer>("武當", 99)
);
//reduceBykey
final JavaPairRDD<String, Integer> rdd = sc.parallelizePairs(list);
rdd.reduceByKey(new Function2<Integer, Integer, Integer>() {
@Override
public Integer call(Integer v1, Integer v2) throws Exception {
return v1 + v2;
}
}).foreach(new VoidFunction<Tuple2<String, Integer>>() {
@Override
public void call(Tuple2<String, Integer> tuple) throws Exception {
println(tuple._1 + " " + tuple._2);
}
});
}
/**
* (峨眉,100)
* (武當,129)
*/
def reduceBykey()={
val list: List[(String, Int)] = List(("峨眉", 40), ("武當", 30), ("峨眉", 60), ("武當", 99))
val rdd: RDD[(String, Int)] = sc.parallelize(list)
rdd.reduceByKey(_+_).foreach(println(_))
}
sortByKey()
/**
* 98 -> 東方不敗
* 85 -> 令狐沖
* 83 -> 任我行
* 80 -> 嶽不群
*/
public static void sortBykey() {
final List<Tuple2<Integer, String>> list = Arrays.asList(
new Tuple2<Integer, String>(98, "東方不敗"),
new Tuple2<Integer, String>(80, "嶽不群"),
new Tuple2<Integer, String>(85, "令狐沖"),
new Tuple2<Integer, String>(83, "任我行")
);
final JavaPairRDD<Integer, String> rdd = sc.parallelizePairs(list);
rdd.sortByKey(false)
.foreach(new VoidFunction<Tuple2<Integer, String>>() {
@Override
public void call(Tuple2<Integer, String> tuple) throws Exception {
println(tuple._1 + " -> " + tuple._2);
}
});
}
/**
* 98->東方不敗
* 85->令狐沖
* 83->任我行
* 80->嶽不群
*/
def sortBykey()= {
val list: List[(Int, String)] = List((98, "東方不敗"), (80, "嶽不群"), (85, "令狐沖"), (83, "任我行"))
val rdd: RDD[(Int, String)] = sc.parallelize(list)
rdd.sortBy(word=>word,false,0).foreach(x=>println(x._1+"->"+x._2))
}
join()
/**
* 學號:1 名字:東方不敗 分數:99
* 學號:3 名字:林平之 分數:97
* 學號:2 名字:令狐沖 分數:98
*/
public static void join() {
final List<Tuple2<Integer, String>> names = Arrays.asList(
new Tuple2<Integer, String>(1, "東方不敗"),
new Tuple2<Integer, String>(2, "令狐沖"),
new Tuple2<Integer, String>(3, "林平之")
);
final List<Tuple2<Integer, Integer>> scores = Arrays.asList(
new Tuple2<Integer, Integer>(1, 99),
new Tuple2<Integer, Integer>(2, 98),
new Tuple2<Integer, Integer>(3, 97)
);
final JavaPairRDD<Integer, String> nemesrdd = sc.parallelizePairs(names);
final JavaPairRDD<Integer, Integer> scoresrdd = sc.parallelizePairs(scores);
/**
* <Integer, 學號
* Tuple2<String, 名字
* Integer>> 分數
*/
final JavaPairRDD<Integer, Tuple2<String, Integer>> joinRDD = nemesrdd.join(scoresrdd);
// final JavaPairRDD<Integer, Tuple2<Integer, String>> join = scoresrdd.join(nemesrdd);
joinRDD.foreach(new VoidFunction<Tuple2<Integer, Tuple2<String, Integer>>>() {
@Override
public void call(Tuple2<Integer, Tuple2<String, Integer>> tuple) throws Exception {
println("學號:" + tuple._1 + " 名字:" + tuple._2._1 + " 分數:" + tuple._2._2);
}
});
}
/**
* (1,(東方不敗,99))
* (3,(林平之,97))
* (2,(令狐沖,98))
* ----->
* 學號:1 名字:東方不敗 分數:99
* 學號:3 名字:林平之 分數:97
* 學號:2 名字:令狐沖 分數:98
*/
//可以把RDD1,RDD2中的相同的key給連線起來,類似於sql中的join操作
def join()= {
val list1: List[(Int, String)] = List((1, "東方不敗"), (2, "令狐沖"), (3, "林平之"))
val list2: List[(Int, Int)]=List((1, 99), (2, 98), (3, 97))
val rdd1: RDD[(Int, String)] = sc.parallelize(list1)
val rdd2: RDD[(Int, Int)] = sc.parallelize(list2)
rdd1.join(rdd2).foreach(x=>println("學號:"+x._1+" 名字:"+x._2._1+" 分數:"+x._2._2))
}
union()
/**
* 1
* 2
* 3
* 4
* 3
* 4
* 5
* 6
*/
public static void union() {
final List<Integer> list1 = Arrays.asList(1, 2, 3, 4);
final List<Integer> list2 = Arrays.asList(3, 4, 5, 6);
final JavaRDD<Integer> rdd1 = sc.parallelize(list1);
final JavaRDD<Integer> rdd2 = sc.parallelize(list2);
rdd1.union(rdd2)
.foreach(new VoidFunction<Integer>() {
@Override
public void call(Integer number) throws Exception {
println(number + "");
}
});
}
/**
* union(或稱為聯合)的作用是將多個結果合併在一起顯示出來。
* Union:將兩個RDD進行合併,不去重;
* 1
* 2
* 3
* 4
* 3
* 4
* 5
* 6
*/
def union()={
val list1: List[Int] = List(1,2,3,4)
val list2: List[Int] = List(3,4,5,6)
val rdd1: RDD[Int] = sc.parallelize(list1)
val rdd2: RDD[Int] = sc.parallelize(list2)
rdd1.union(rdd2).foreach(println(_))
}
leftOuterJoin()
import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}
/**
* (22,(qwe,None))
* (3,(zxc,Some(true)))
* (2,(asd,Some(true)))
*/
object Test {
def main(args: Array[String]): Unit = {
val a = List((1L, true), (2L, true), (3L, true))
val b = List((22L, "qwe"), (2L, "asd"), (3L, "zxc"))
val conf: SparkConf = new SparkConf().setMaster("local").setAppName("Test")
val sc: SparkContext = new SparkContext(conf)
val rdd1: RDD[(Long, Boolean)] = sc.parallelize(a)
val rdd2: RDD[(Long, String)] = sc.parallelize(b)
//必須都是kv形式才能使用,常用於黑名單過濾
val rdd3: RDD[(Long, (String, Option[Boolean]))] = rdd2.leftOuterJoin(rdd1)
rdd3.foreach(println(_))
}
}
intersection()
/**
* 交集
* 4
* 3
*/
public static void intersection() {
final List<Integer> list1 = Arrays.asList(1, 2, 3, 4);
final List<Integer> list2 = Arrays.asList(3, 4, 5, 6);
final JavaRDD<Integer> rdd1 = sc.parallelize(list1);
final JavaRDD<Integer> rdd2 = sc.parallelize(list2);
rdd1.intersection(rdd2)
.foreach(new VoidFunction<Integer>() {
@Override
public void call(Integer number) throws Exception {
println(number + "");
}
});
}
/**
* 求交集
* RDD1.intersection(RDD2) 返回兩個RDD的交集,並且去重
* intersection 需要混洗資料,比較浪費效能
* 4
* 3
*/
def intersection(){
val list1: List[Int] = List(1,2,3,4)
val list2: List[Int] = List(3,4,5,6)
val rdd1: RDD[Int] = sc.parallelize(list1)
val rdd2: RDD[Int] = sc.parallelize(list2)
rdd1.intersection(rdd2).foreach(println(_))
}
distinct()
/**
* 去重
* 4
* 1
* 3
* 2
*/
public static void distinct() {
final List<Integer> list1 = Arrays.asList(1, 2, 3, 3, 4, 4);
final JavaRDD<Integer> rdd1 = sc.parallelize(list1);
rdd1.distinct()
.foreach(new VoidFunction<Integer>() {
@Override
public void call(Integer number) throws Exception {
println(number + " ");
}
});
}
/**
* distinct用於去重, 我們生成的RDD可能有重複的元素,使用distinct方法可以去掉重複的元素, 不過此方法涉及到混洗,操作開銷很大
* 4
* 1
* 3
* 2
*/
def distinct()={
val list: List[Int] = List(1,2,3,3,4,4)
val rdd: RDD[Int] = sc.parallelize(list)
rdd.distinct().foreach(println(_))
}
cartesian()
/**
* 笛卡爾積
* a->0
* a->1
* a->2
* b->0
* b->1
* b->2
*/
public static void cartesian() {
final List<String> A = Arrays.asList("a", "b");
final List<Integer> B = Arrays.asList(0, 1, 2);
final JavaRDD<String> rddA = sc.parallelize(A);
final JavaRDD<Integer> rddB = sc.parallelize(B);
rddA.cartesian(rddB)
.foreach(new VoidFunction<Tuple2<String, Integer>>() {
@Override
public void call(Tuple2<String, Integer> tuple) throws Exception {
println(tuple._1 + "->" + tuple._2);
}
});
}
/**
* 笛卡爾積
* (a,0) a->0
* (a,1) a->1
* (a,2) -> a->2
* (b,0) b->1
* (b,1) b->2
* (b,2) b->3
*/
def cartesian()={
val list1: List[String] = List("a", "b")
val list2: List[Int] = List(0, 1, 2)
val rdd1: RDD[String] = sc.parallelize(list1)
val rdd2: RDD[Int] = sc.parallelize(list2)
rdd1.cartesian(rdd2).foreach(x=>println(x._1+"->"+x._2))
}
mapPartitions()
/**
* map:
* 一條資料一條資料的處理(檔案系統,資料庫等等)
* mapPartitions:
* 一次獲取的是一個分割槽的資料(hdfs)
* 正常情況下,mapPartitions 是一個高效能的運算元
* 因為每次處理的是一個分割槽的資料,減少了去獲取資料的次數。
* 但是如果我們的分割槽如果設定得不合理,有可能導致每個分割槽裡面的資料量過大。
* hello-1
* hello-2
* hello-3
* hello-4
* hello-5
* hello-6
*/
public static void mapPartitions() {
final List<Integer> list = Arrays.asList(1, 2, 3, 4, 5, 6);
//引數二代表這個rdd裡面有兩個分割槽
final JavaRDD<Integer> rdd = sc.parallelize(list, 2);
rdd.mapPartitions(new FlatMapFunction<Iterator<Integer>, String>() {
//每次處理的是一個分割槽的資料
@Override
public Iterator<String> call(Iterator<Integer> iterator) throws Exception {
List<String> list = new ArrayList<String>();
while (iterator.hasNext()) {
list.add("hello-" + iterator.next());
}
return list.iterator();
}
}).foreach(new VoidFunction<String>() {
@Override
public void call(String s) throws Exception {
println(s);
}
});
}
/**
* 一次獲取的是一個分割槽的資料(hdfs)
* mapPartition可以倒過來理解,先partition,再把每個partition進行map函式,
* 適用場景
* 如果在對映的過程中需要頻繁建立額外的物件,使用mapPartitions要比map高效的過。
* hello-1
* hello-2
* hello-3
* hello-4
* hello-5
* hello-6
*/
def mapPartitions()={
val list: List[Int] = List(1,2,3,4,5,6)
val rdd: RDD[Int] = sc.parallelize(list,2)
rdd.mapPartitions(_.toIterator).foreach(x=>println("hello-"+x))
}
repartition()
/**
* 進行重分割槽
* HDFS -》 hello.txt 2個檔案塊(不包含副本)
* 2個檔案塊 -》2個分割槽 -》當spark任務執行,一個分割槽就啟動一個task任務。
* 解決的問題:本來分割槽數少 -》 增加分割槽數
* 1,3,5,2,4,6
*/
public static void repartition() {
final List<Integer> list = Arrays.asList(1, 2, 3, 4, 5, 6);
final JavaRDD<Integer> rdd = (JavaRDD<Integer>) sc.parallelize(list, 1);
// coalesce(numPartitions, shuffle = true)
rdd.repartition(2)
.foreach(new VoidFunction<Integer>() {
@Override
public void call(Integer number) throws Exception {
println(number + "");
}
});
}
/**
* 進行重分割槽
* 解決的問題:本來分割槽數少 -》 增加分割槽數
* 1
* 3
* 5
* 2
* 4
* 6
*/
def repartition()={
val list: List[Int] = List(1,2,3,4,5,6)
val rdd: RDD[Int] = sc.parallelize(list,1)
rdd.repartition(2).foreach(println(_))
}
aggregateByKey()
/**
* 實現單詞計數
* you ->1
* jump ->2
* i ->1
*/
public static void aggregateByKey() {
final List<String> list = Arrays.asList("you,jump", "i,jump");
final JavaRDD<String> rdd = sc.parallelize(list);
rdd.flatMap(new FlatMapFunction<String, String>() {
@Override
public Iterator<String> call(String line) throws Exception {
return Arrays.asList(line.split(",")).iterator();
}
})
//第一個引數輸入的資料型別,第二個引數輸出的Key的型別,第三個引數輸出的Value的型別
.mapToPair(new PairFunction<String, String, Integer>() {
@Override
public Tuple2<String, Integer> call(String word) throws Exception {
return new Tuple2<String, Integer>(word, 1);
}
})
//第一個引數是初始值,如果是10就是每一個單詞一開始就是10個,0就是按照0往上加,第二個是區域性進行計算,第三個是全域性計算,這個特點就是控制的比較細,使用比較複雜,還可以對字串進行拼接
.aggregateByKey(0, new Function2<Integer, Integer, Integer>() {
@Override
public Integer call(Integer v1, Integer v2) throws Exception {
return v1 + v2;//區域性
}
}, new Function2<Integer, Integer, Integer>() {
@Override
public Integer call(Integer v1, Integer v2) throws Exception {
return v1 + v2;//全域性
}
}
).foreach(new VoidFunction<Tuple2<String, Integer>>() {
@Override
public void call(Tuple2<String, Integer> tuple) throws Exception {
println(tuple._1 + " ->" + tuple._2);
}
});
}
/**
* 實現單詞計數
* 第一個aggregateByKey函式我們可以自定義Partitioner。除了這個引數之外,其函式宣告和aggregate很類似;其他的aggregateByKey函式實現最終都是呼叫這個。
* 第二個aggregateByKey函式可以設定分割槽的個數(numPartitions),最終用的是HashPartitioner。
* 最後一個aggregateByKey實現先會判斷當前RDD是否定義了分割槽函式,如果定義了則用當前RDD的分割槽;如果當前RDD並未定義分割槽 ,則使用HashPartitioner。
* aggregateByKey(初始值)((區域性計算),(全域性計算))
* (you,1)
* (jump,2)
* (i,1)
*/
def aggregateByKey() ={
val list: List[String] = List("you,jump", "i,jump")
val rdd: RDD[String] = sc.parallelize(list)
rdd.flatMap(_.split(",")).map((_, 1)).aggregateByKey(0)(((x,y)=>x+y),((x,y)=>x+y)).foreach(println(_))
}
coalesce()
/**
* 分割槽數由多 -》 變少
*/
public static void coalesce() {
final List<Integer> list = Arrays.asList(1, 2, 3, 4, 5, 6);
final JavaRDD<Integer> rdd = (JavaRDD<Integer>) sc.parallelize(list, 3);
rdd.coalesce(1)
.foreach(new VoidFunction<Integer>() {
@Override
public void call(Integer integer) throws Exception {
println(integer + "");
}
});
}
/**
* 分割槽數由多 -》 變少
* 1,2,3,4,5,6
*/
def coalesce()={
val list:List[Int] = List(1,2,3,4,5,6)
val rdd: RDD[Int] = sc.parallelize(list,3)
rdd.coalesce(1).foreach(println(_))
}
mapPartitionsWithIndex()
/**
* map: 每次獲取和處理的就是一條資料
* mapParitions: 每次獲取和處理的就是一個分割槽的資料
* mapPartitionsWithIndex:每次獲取和處理的就是一個分割槽的資料,並且知道處理的分割槽的分割槽號.
*
* 0_1
* 0_2
* 0_3
* 0_4
* 1_5
* 1_6
* 1_7
* 1_8
*/
public static void mapPartitionsWithIndex() {
final List<Integer> list = Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8);
final JavaRDD<Integer> rdd = sc.parallelize(list, 2);//HashParitioners Rangepartitionw 自定義分割槽
rdd.mapPartitionsWithIndex(new Function2<Integer, Iterator<Integer>, Iterator<String>>() {
@Override
public Iterator<String> call(Integer index, Iterator<Integer> iterator) throws Exception {
final ArrayList<String> list = new ArrayList<>();
while (iterator.hasNext()) {
list.add(index + "_" + iterator.next());
}
return list.iterator();
}
}, true)
.foreach(new VoidFunction<String>() {
@Override
public void call(String s) throws Exception {
println(s);
}
});
}
/**
* map: 每次獲取和處理的就是一條資料
* mapParitions: 每次獲取和處理的就是一個分割槽的資料
* mapPartitionsWithIndex:每次獲取和處理的就是一個分割槽的資料,並且知道處理的分割槽的分割槽號.
*
* 0_1
* 0_2
* 1_3
* 1_4
* 1_5
* 2_6
* 2_7
* 2_8
*/
def mapPartitionsWithIndex()={
val list:List[Int] = List(1,2 ,3,4,5 ,6,7,8)
val rdd: RDD[Int] = sc.parallelize(list, 3)
rdd.mapPartitionsWithIndex(
(x,iter) => {
val array: ArrayBuffer[Any] = ArrayBuffer()
var result = List[String]()
//var i = 0
while(iter.hasNext){
array += iter.next()
}
array.map(i => x + "_" + i).iterator
}
,true).foreach(println(_))
}
cogroup()
/**
* When called on datasets of type (K, V) and (K, W),
* returns a dataset of (K, (Iterable<V>, Iterable<W>)) tuples.
*
* ID:1 Name: [東方不敗, 東方不敗] Scores: [90, 98]
* ID:3 Name: [嶽不群, 嶽不群] Scores: [89, 67]
* ID:2 Name: [林平之, 林平之] Scores: [91, 78]
*
*/
public static void cogroup() {
//sh s sha shan shang sa san sang
final List<Tuple2<Integer, String>> list1 = Arrays.asList(
new Tuple2<Integer, String>(1, "東方不敗"),
new Tuple2<Integer, String>(2, "林平之"),
new Tuple2<Integer, String>(3, "嶽不群"),
new Tuple2<Integer, String>(1, "東方不敗"),
new Tuple2<Integer, String>(2, "林平之"),
new Tuple2<Integer, String>(3, "嶽不群")
);
final List<Tuple2<Integer, Integer>> list2 = Arrays.asList(
new Tuple2<Integer, Integer>(1, 90),
new Tuple2<Integer, Integer>(2, 91),
new Tuple2<Integer, Integer>(3, 89),
new Tuple2<Integer, Integer>(1, 98),
new Tuple2<Integer, Integer>(2, 78),
new Tuple2<Integer, Integer>(3, 67)
);
final JavaPairRDD<Integer, String> rdd1 = sc.parallelizePairs(list1);
final JavaPairRDD<Integer, Integer> rdd2 = sc.parallelizePairs(list2);
final JavaPairRDD<Integer, Tuple2<Iterable<String>, Iterable<Integer>>> rdd3 =
(JavaPairRDD<Integer, Tuple2<Iterable<String>, Iterable<Integer>>>) rdd1.cogroup(rdd2);
rdd3.foreach(new VoidFunction<Tuple2<Integer, Tuple2<Iterable<String>, Iterable<Integer>>>>() {
@Override
public void call(Tuple2<Integer, Tuple2<Iterable<String>, Iterable<Integer>>> tuple) throws Exception {
final Integer id = tuple._1;
final Iterable<String> names = tuple._2._1;
final Iterable<Integer> scores = tuple._2._2;
println("ID:" + id + " Name: " + names + " Scores: " + scores);
}
});
}
/**
* groupByKey是對單個 RDD 的資料進行分組,還可以使用一個叫作 cogroup() 的函式對多個共享同一個鍵的 RDD 進行分組
* 例如
* RDD1.cogroup(RDD2) 會將RDD1和RDD2按照相同的key進行分組,得到(key,RDD[key,Iterable[value1],Iterable[value2] ])的形式
* cogroup也可以多個進行分組
* 例如RDD1.cogroup(RDD2,RDD3,…RDDN), 可以得到(key,Iterable[value1],Iterable[value2],Iterable[value3],…,Iterable[valueN])
*
* (1,(CompactBuffer(東方不敗, 東方不敗),CompactBuffer(90, 98)))
* (3,(CompactBuffer(嶽不群, 嶽不群),CompactBuffer(89, 67)))
* (2,(CompactBuffer(林平之, 林平之),CompactBuffer(91, 78)))
*
* ID:1Name:CompactBuffer(東方不敗, 東方不敗)Scores:CompactBuffer(90, 98)
* ID:3Name:CompactBuffer(嶽不群, 嶽不群)Scores:CompactBuffer(89, 67)
* ID:2Name:CompactBuffer(林平之, 林平之)Scores:CompactBuffer(91, 78)
*/
def cogroup()= {
val list1: List[(Int, String)] = List((1, "東方不敗"), (2, "林平之"), (3, "嶽不群"), (1, "東方不敗"), (2, "林平之"), (3, "嶽不群"))
val list2: List[(Int, Int)] = List((1, 90), (2, 91), (3, 89), (1, 98), (2, 78), (3, 67))
val rdd1: RDD[(Int, String)] = sc.parallelize(list1)
val rdd2: RDD[(Int, Int)] = sc.parallelize(list2)
rdd1.cogroup(rdd2).foreach(x=>println("ID:"+x._1+"Name:"+x._2._1+"Scores:"+x._2._2))
}
repartitionAndSortWithinPartitions()
/**
* 少 -》 多
*/
public static void repartitionAndSortWithinPartitions() {//調優
final List<Integer> list = Arrays.asList(1, 2, 11, 3, 12, 4, 5);
final JavaRDD<Integer> rdd = sc.parallelize(list, 1);
final JavaPairRDD<Integer, Integer> pairRDD = rdd.mapToPair(new PairFunction<Integer, Integer, Integer>() {
@Override
public Tuple2<Integer, Integer> call(Integer number) throws Exception {
return new Tuple2<>(number, number);
}
});
//new HashPartitioner(2) new RangePartitioner<>()
pairRDD.repartitionAndSortWithinPartitions(new Partitioner() {
@Override
public int numPartitions() {
return 2;
}
@Override
public int getPartition(Object key) {
final Integer number = Integer.valueOf(key.toString());
if (number % 2 == 0) {
return 0;
} else {
return 1;
}
}
}).mapPartitionsWithIndex(new Function2<Integer, Iterator<Tuple2<Integer, Integer>>,
Iterator<String>>() {
@Override
public Iterator<String> call(Integer index, Iterator<Tuple2<Integer, Integer>> iterator) throws Exception {
final ArrayList<String> list = new ArrayList<>();
while (iterator.hasNext()) {
list.add(index + "_" + iterator.next());
}
return list.iterator();
}
}, false)
.foreach(new VoidFunction<String>() {
@Override
public void call(String s) throws Exception {
println(s);
}
});
}
/**
* 少 -》 多
* 如果需要在repartition重分割槽之後,還要進行排序,建議直接使用repartitionAndSortWithinPartitions運算元。因為該運算元可以一邊進行重分割槽的shuffle操作,一邊進行排序。shuffle與sort兩個操作同時進行,比先shuffle再sort來說,效能可能是要高的。
* 此方法需要K,V的資料
* (1,3)
* (1,2)
* (1,4)
* (2,3)
* (2,4)
* (5,4)
*/
def repartitionAndSortWithinPartitions()= {
val list = List((1,3),(1,2),(5,4),(1, 4),(2,3),(2,4))
val rdd: RDD[(Int, Int)] = sc.parallelize(list, 1)
rdd.repartitionAndSortWithinPartitions(new HashPartitioner(3)).foreach(println(_))
}
sample()
/**
* 有放回
* 無放回
*/
public static void sample() {
final List<Integer> list = Arrays.asList(1, 2, 3, 4, 5, 6, 7, 9, 10);
final JavaRDD<Integer> rdd = sc.parallelize(list);
/**
* withReplacement: Boolean,
* true: 有放回的抽樣
* false: 無放回抽象
* fraction: Double:
* RDD 裡面的每個元素被抽到的概率有多大
* seed: Long:
* 隨機種子
*/
final JavaRDD<Integer> rdd2 = rdd.sample(false, 0.5);
rdd2.foreach(new VoidFunction<Integer>() {
@Override
public void call(Integer integer) throws Exception {
println(integer + "");
}
});
}
/**
* 結果不確定
* 如果寫第三個引數(隨機種子)那麼結果是固定的
*/
def sample()={
val list: List[Int] = List (1,2,3,4,5,6,7,9,10)
val rdd: RDD[Int] = sc.parallelize(list)
rdd.sample(false,0.5).foreach(println(_))
}
pipe()
public static void pipe() {
final List<Integer> list = Arrays.asList(1, 2, 3, 4, 5, 6, 7, 9, 10);
final JavaRDD<Integer> rdd = sc.parallelize(list);
// final JavaRDD<String> pipe = rdd.pipe("sh wordcouont.sh");
}
/**
* 呼叫Shell命令
* 在Linux系統中,有許多對資料進行處理的shell命令,我們可能通過pipe變換將一些shell命令用於Spark中生成新的RDD。
*/
def pipe()={
val list: List[Int] = List(1,2,3,4,5,6,7,9,10)
val rdd: RDD[Int] = sc.parallelize(list)
rdd.pipe("sh wordcount.sh")
}