【Spark】SparkRDD開發手冊(JavaAPI函數語言程式設計)
阿新 • • 發佈:2018-11-11
文章目錄
前言
不用怕,問題不大~
github已更新
scala版過段時間會再更新
- 自誇一下…
老實說,相比那些抄襲來抄襲去的blog,這篇RDD的JavaAPI…我是每個常用API都結合例項解釋並且用了Java函數語言程式設計寫的…我這懶人夠良心了…
如果能幫到你們的話,我很開心。
不需要抄部落格,部落格不是為了點選量,是為了總結自己,轉載標明一下作者和連結,謝謝~
- wiki解釋
Spark RDD
- 重點
- RDD是一個分散式集合
- SparkRDD特性惰性行為,只有啟用Action操作才會真正開始計算
- 兩種轉換操作Transformation(惰性,只代表邏輯)和行動Action(起床,開始執行)
- 血統(Lineage)怎麼理解呢?血統具有繼承的特性,能追根溯源。Lineage記錄了你的Transformation和Action,通過 Lineage 獲取足夠的資訊來重新運算和恢復丟失的資料分割槽,這也是它的容錯原理,某節點掛掉,再通過Lineage記錄重新執行一次操作即可
- 建議
- 函數語言程式設計瞭解下
- 你是寫java的,可以先Java入手,但是不能丟棄scala版本,spark的爸爸畢竟是scala… (scala也更容易)
- 不要demo拿去就執行,先看看這些簡單操作的目的,程式碼註釋我都寫了
-
常用RDD(所有方法我都寫了DEMO)
Transformation行為:
方法 官方解釋 Hui解釋 map(func) 返回一個新的分散式資料集,該資料集由每一個輸入元素經過func函式轉換後組成 元素轉換 . 引數->你希望要的引數 fitler(func) 返回一個新的資料集,該資料集由經過func函式計算後返回值為true的輸入元素組成 元素轉換. 引數->陣列引數 flatMap(func) 類似於map,但是每一個輸入元素可以被對映為0或多個輸出元素(因此func返回一個序列,而不是單一元素) 元素轉換. 引數->pair引數 (key value) mapPartitions(func) 類似於map,但獨立地在RDD上每一個分片上執行,因此在型別為T的RDD上執行時,func函式型別必須是Iterator[T]=>Iterator[U] 元素轉換,在每一個分割槽內部進行元素轉換. mapPartitionsWithIndex(func) 類似於mapPartitons,但func帶有一個整數引數表示分片的索引值。因此在型別為T的RDD上執行時,func函式型別必須是(Int,Iterator[T])=>Iterator[U] 元素轉換,在每一個分割槽內部進行元素轉換. sample(withReplacement,fraction,seed) 根據fraction指定的比例對資料進行取樣,可以選擇是否用隨機數進行替換,seed用於隨機數生成器種子 取樣 union(otherDataSet) 返回一個新資料集,新資料集是由原資料集和引數資料集聯合而成 並集 distinct([numTasks]) 返回一個包含原資料集中所有不重複元素的新資料集 去重 groupByKey([numTasks]) 在一個(K,V)資料集上呼叫,返回一個(K,Seq[V])對的資料集。注意預設情況下,只有8個並行任務來操作,但是可以傳入一個可選的numTasks引數來改變它 分組 reduceByKey(func,[numTasks]) 在一個(K,V)對的資料集上呼叫,返回一個(K,V)對的資料集,使用指定的reduce函式,將相同的key的值聚合到一起。與groupByKey類似,reduceByKey任務的個數是可以通過第二個可選引數來設定的 聚合 sortByKey([[ascending],numTasks]) 在一個(K,V)對的資料集上呼叫,K必須實現Ordered介面,返回一個按照Key進行排序的(K,V)對資料集。升序或降序由ascending布林引數決定 排序 join(otherDataset0,[numTasks]) 在型別為(K,V)和(K,W)資料集上呼叫,返回一個相同的key對應的所有元素在一起的(K,(V,W))資料集 集合關聯. 合併相同key的value cogroup(otherDataset,[numTasks]) 在型別為(K,V)和(K,W)資料集上呼叫,返回一個(K,Seq[V],Seq[W])元祖的資料集。這個操作也可以稱為groupwith 分組 cartesain(ohterDataset) 笛卡爾積,在型別為T和U型別的資料集上呼叫,返回一個(T,U)對資料集(兩兩的元素對) 笛卡爾積 Action行為:
方法 Hui解釋 reduce(func) 通過函式func(接收兩個引數,返回一個引數)聚集資料集中的所有元素。這個功能必須可交換且可關聯的,從而可以正確的並行執行 聚合 collect() 在驅動程式中,以陣列形式返回資料集中的所有元素。通常在使用filter或者其他操作返回一個足夠小的資料子集後再使用會比較有用 查詢全部 count() 返回資料集元素個數 數量 first() 返回資料集第一個元素(類似於take(1)) 第一個元素 take(n) 返回一個由資料集前n個元素組成的陣列 前面N個元素 sample(withReplacement,num,seed) 注意 這個操作目前並非並行執行,而是由驅動程式計算所有的元素返回一個數組,該陣列由從資料集中隨機取樣的num個元素組成,可以選擇是否由隨機數替換不足的部分,seed使用者指定隨機數生成器種子 取樣 saveAsTextFile(path) 將資料集的元素以textfile的形式儲存到本地檔案系統–HDFS或者任何其他Hadoop支援的檔案系統。對於每個元素,Spark將會呼叫toString方法,將它轉換為檔案中的文字行 儲存檔案(TXT格式) saveAsSequenceFile(path) 將資料集中的元素以Hadoop sequencefile的格式儲存到指定的目錄下,可以是本地系統、HDFS或者任何其他的Hadoop支援的檔案系統。這個只限於由key-value對組成,並實現了Hadoop的Writable介面,或者可以隱式的轉換為Writable的RDD(Spark包括了基本型別轉換,例如Int、Double、String等) 儲存檔案(Sequence格式) countByKey() 對(K,V)型別的RDD有效,返回一個(K,Int)對的map,表示每一個key對應的元素個數 數key foreach(func) 在資料集的每一個元素上,執行函式func進行更新。通常用於邊緣效果,例如更新一個疊加器,或者和外部儲存系統進行互動,如HBase 迴圈
遇到的大坑!
org.apache.spark.SparkException: Task not serializable
Caused by: java.io.NotSerializableException:
解決方案我另外寫了一篇BLOG
https://blog.csdn.net/HuHui_/article/details/83905302
裝備
- maven依賴引入
- 切實的去試一下怎麼玩
- 函數語言程式設計瞭解一下
Core
挑一些稍微難理解的RDD方法,沒有展示的可以看我的github
分析前提注意:
因為太懶了,用的是我寫迪傑斯特拉最短距離演算法生成的結果資料檔案,對這個有興趣的看:https://blog.csdn.net/HuHui_/article/details/83020917
資料格式:進站站點名字,出站站點名字,最短距離經歷站點數,最短距離
舉例: 廣州南站,廣州南站,0,0
- init
private transient SparkConf sparkConf;
private transient JavaSparkContext sparkContext;
private static final String FILE_PATH = TransformationRDDTest.class.getClassLoader().getResource("demo.txt")
.toString();
@Before
public void before() throws Exception {
sparkConf = new SparkConf().setMaster("local[4]").setAppName("test");
// sparkConf.set("spark.serializer", "org.apache.spark.serializer.JavaSerializer");
sparkContext = new JavaSparkContext(sparkConf);
}
/**
* 列印測試.
*
* @param collect the collect
* @since hui_project 1.0.0
*/
private void checkResult(List<?> collect) {
for (Object o : collect) {
System.out.println(o.toString());
}
}
- Transformation RDD:
- flatMap
/**
* 元素轉換. 引數->陣列引數
* demo計算目的:獲取地鐵站資訊切分後 獲取陣列資訊1.出發站 2.終點站 3.經歷站點數 4.距離
* @since hui_project 1.0.0
*/
@Test
public void testFlatMap() {
JavaRDD<String> textRDD = sparkContext.textFile(FILE_PATH);
JavaRDD<String> splitRDD = textRDD
.flatMap(x -> Arrays.asList(x.split(",")).iterator());
checkResult(splitRDD.collect());
}
- mapPartitions
/**
* 元素轉換,在每一個分割槽內部進行元素轉換.
* demo計算目的:算平方。(單元測試比較難看出來分割槽作用)
* @since hui_project 1.0.0
*/
@Test
public void testMapPartitions() {
JavaRDD<Integer> parallelize = sparkContext.parallelize(Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8, 9, 10), 3);
JavaRDD<Tuple2<Integer, Integer>> rdd = parallelize
.mapPartitions(x -> getSquare(x));
checkResult(rdd.collect());
}
/**
* Gets square.
*
* @param it the it
* @return the square
* @since hui_project 1.0.0
*/
@Transient
private Iterator<Tuple2<Integer, Integer>> getSquare(Iterator<Integer> it) {
ArrayList<Tuple2<Integer, Integer>> results = new ArrayList<>();
while (it.hasNext()) {
Integer next = it.next();
results.add(new Tuple2<>(next, next * next));
}
return results.iterator();
}
- mapPartitionsWithIndex
/**
* 元素轉換,在每一個分割槽內部進行元素轉換.
* demo計算目的:算平方。(引數1是分割槽的索引)
*
* @since hui_project 1.0.0
*/
@Test
public void testMapPartitionsWithIndex(){
JavaRDD<Integer> parallelize = sparkContext.parallelize(Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8, 9, 10), 3);
JavaRDD<Tuple2<Integer, Integer>> rdd = parallelize.mapPartitionsWithIndex((x, y) -> getSquareWithIndex(x, y), false);
checkResult(rdd.collect());
}
/**
* Get square with index iterator.
*
* @param partIndex the part index
* @param it the it
* @return the iterator
* @since hui_project 1.0.0
*/
@Transient
public Iterator<Tuple2<Integer, Integer>> getSquareWithIndex(Integer partIndex, Iterator<Integer> it){
ArrayList<Tuple2<Integer, Integer>> results = new ArrayList<>();
while (it.hasNext()) {
Integer next = it.next();
results.add(new Tuple2<>(partIndex, next * next));
}
return results.iterator();
}
- join
/**
* 集合關聯. 合併相同key的value
* demo計算目的:今年和去年都獲獎的同學,獲獎項的科目都有哪些
* @since hui_project 1.0.0
*/
@Test
public void testJoin() {
//今年同學獲獎的科目
JavaPairRDD<Object, Object> rdd1 = sparkContext.parallelize(Arrays.asList(
new Tuple2("xiaoming", "語文")
, new Tuple2("xiaoming", "數學")
, new Tuple2("lihua", "數學")
, new Tuple2("xiaofeng", "藝術")
, new Tuple2("test", "藝術")))
.mapToPair(x -> new Tuple2<>(x._1, x._2));
//去年同學獲獎的科目
JavaPairRDD<Object, Object> rdd2 = sparkContext.parallelize(Arrays.asList(
new Tuple2("xiaoming", "藝術")
, new Tuple2("lihua", "藝術")
, new Tuple2("xiaofeng", "語文")))
.mapToPair(x -> new Tuple2<>(x._1, x._2));
JavaPairRDD<Object, Tuple2<Object, Object>> join = rdd1.join(rdd2);
checkResult(join.collect());
}
- coGroup
/**
* Test co group.
* demo計算目的: 以成績分組 同學([成績優秀學科],[成績中等學科],[成績差勁學科])
* @since hui_project 1.0.0
*/
@Test
public void testCoGroup() {
//成績優秀的學生+科目
JavaRDD<Tuple2<String, String>> scoreDetails1 = sparkContext.parallelize(Arrays.asList(
new Tuple2("xiaoming", "語文")
, new Tuple2("xiaoming", "數學")
, new Tuple2("lihua", "數學")
, new Tuple2("xiaofeng", "藝術")));
//成績中等的學生+科目
JavaRDD<Tuple2<String, String>> scoreDetails2 = sparkContext.parallelize(Arrays.asList(
new Tuple2("xiaoming", "藝術")
, new Tuple2("lihua", "藝術")
, new Tuple2("xiaofeng", "語文")));
//成績差的學生+科目
JavaRDD<Tuple2<String, String>> scoreDetails3 = sparkContext.parallelize(Arrays.asList(
new Tuple2("xiaoming", "英語")
, new Tuple2("lihua", "英語")
, new Tuple2("lihua", "數學")
, new Tuple2("xiaofeng", "數學")
, new Tuple2("xiaofeng", "英語")));
JavaPairRDD<String, String> scoreMapRDD1 = JavaPairRDD.fromJavaRDD(scoreDetails1);
JavaPairRDD<String, String> scoreMapRDD2 = JavaPairRDD.fromJavaRDD(scoreDetails2);
JavaPairRDD<String, String> scoreMapRDD3 = JavaPairRDD.fromJavaRDD(scoreDetails3);
JavaPairRDD<String, Tuple3<Iterable<String>, Iterable<String>, Iterable<String>>> cogroupRDD =
scoreMapRDD1.cogroup(scoreMapRDD2, scoreMapRDD3);
checkResult(cogroupRDD.collect());
}
- Action RDD:
- reduce
/**
* 聚合(整合資料).
*
* @since hui_project 1.0.0
*/
@Test
public void testReduce() {
JavaRDD<Integer> parallelize = sparkContext.parallelize(Arrays.asList(1, 2, 3, 4), 3);
Tuple2<Integer, Integer> reduce = parallelize.mapToPair(x -> new Tuple2<>(x, 1))
.reduce((x, y) -> getReduce(x, y));
System.out.println("陣列sum:" + reduce._1 + " 計算次數:" + (reduce._2 - 1));
}
/**
* 計算邏輯.
* (x)總和->陣列的每一個數相加總和
* (y)總和 ->計算次數
* @param x the x
* @param y the y
* @return the reduce
* @since hui_project 1.0.0
*/
@Transient
public Tuple2 getReduce(Tuple2<Integer, Integer> x, Tuple2<Integer, Integer> y) {
Integer a = x._1();
Integer b = x._2();
a += y._1();
b += y._2();
return new Tuple2(a, b);
}
Github
全部例項:
https://github.com/ithuhui/hui-bigdata-spark
總結
- RDD實操幾次,個人感覺都不難
- API不難,但是要考慮分割槽,記憶體,效能才是重點
- 應用場景一般在資料分析,然後分析過程中儲存結果集