1. 程式人生 > >【Spark】SparkRDD開發手冊(JavaAPI函數語言程式設計)

【Spark】SparkRDD開發手冊(JavaAPI函數語言程式設計)

文章目錄

前言

不用怕,問題不大~

github已更新

scala版過段時間會再更新

  • 自誇一下…

老實說,相比那些抄襲來抄襲去的blog,這篇RDD的JavaAPI…我是每個常用API都結合例項解釋並且用了Java函數語言程式設計寫的…我這懶人夠良心了…

如果能幫到你們的話,我很開心。

不需要抄部落格,部落格不是為了點選量,是為了總結自己,轉載標明一下作者和連結,謝謝~

  • wiki解釋

Spark RDD

(英語:Resilient Distributed Dataset,彈性分散式資料集)是一種資料儲存集合。只能由它支援的資料來源或是由其他RDD經過一定的轉換(Transformation)來產生。在RDD上可以執行的操作有兩種轉換(Transformation)和行動(Action),每個 RDD 都記錄了自己是如何由持久化儲存中的源資料計算得出的,即其血統(Lineage)

  • 重點
  1. RDD是一個分散式集合
  2. SparkRDD特性惰性行為,只有啟用Action操作才會真正開始計算
  3. 兩種轉換操作Transformation(惰性,只代表邏輯)和行動Action(起床,開始執行)
  4. 血統(Lineage)怎麼理解呢?血統具有繼承的特性,能追根溯源。Lineage記錄了你的Transformation和Action,通過 Lineage 獲取足夠的資訊來重新運算和恢復丟失的資料分割槽,這也是它的容錯原理,某節點掛掉,再通過Lineage記錄重新執行一次操作即可
  • 建議
  1. 函數語言程式設計瞭解下
  2. 你是寫java的,可以先Java入手,但是不能丟棄scala版本,spark的爸爸畢竟是scala… (scala也更容易)
  3. 不要demo拿去就執行,先看看這些簡單操作的目的,程式碼註釋我都寫了
  • 常用RDD(所有方法我都寫了DEMO)

    Transformation行為:

    方法 官方解釋 Hui解釋
    map(func) 返回一個新的分散式資料集,該資料集由每一個輸入元素經過func函式轉換後組成 元素轉換 . 引數->你希望要的引數
    fitler(func) 返回一個新的資料集,該資料集由經過func函式計算後返回值為true的輸入元素組成 元素轉換. 引數->陣列引數
    flatMap(func) 類似於map,但是每一個輸入元素可以被對映為0或多個輸出元素(因此func返回一個序列,而不是單一元素) 元素轉換. 引數->pair引數 (key value)
    mapPartitions(func) 類似於map,但獨立地在RDD上每一個分片上執行,因此在型別為T的RDD上執行時,func函式型別必須是Iterator[T]=>Iterator[U] 元素轉換,在每一個分割槽內部進行元素轉換.
    mapPartitionsWithIndex(func) 類似於mapPartitons,但func帶有一個整數引數表示分片的索引值。因此在型別為T的RDD上執行時,func函式型別必須是(Int,Iterator[T])=>Iterator[U] 元素轉換,在每一個分割槽內部進行元素轉換.
    sample(withReplacement,fraction,seed) 根據fraction指定的比例對資料進行取樣,可以選擇是否用隨機數進行替換,seed用於隨機數生成器種子 取樣
    union(otherDataSet) 返回一個新資料集,新資料集是由原資料集和引數資料集聯合而成 並集
    distinct([numTasks]) 返回一個包含原資料集中所有不重複元素的新資料集 去重
    groupByKey([numTasks]) 在一個(K,V)資料集上呼叫,返回一個(K,Seq[V])對的資料集。注意預設情況下,只有8個並行任務來操作,但是可以傳入一個可選的numTasks引數來改變它 分組
    reduceByKey(func,[numTasks]) 在一個(K,V)對的資料集上呼叫,返回一個(K,V)對的資料集,使用指定的reduce函式,將相同的key的值聚合到一起。與groupByKey類似,reduceByKey任務的個數是可以通過第二個可選引數來設定的 聚合
    sortByKey([[ascending],numTasks]) 在一個(K,V)對的資料集上呼叫,K必須實現Ordered介面,返回一個按照Key進行排序的(K,V)對資料集。升序或降序由ascending布林引數決定 排序
    join(otherDataset0,[numTasks]) 在型別為(K,V)和(K,W)資料集上呼叫,返回一個相同的key對應的所有元素在一起的(K,(V,W))資料集 集合關聯. 合併相同key的value
    cogroup(otherDataset,[numTasks]) 在型別為(K,V)和(K,W)資料集上呼叫,返回一個(K,Seq[V],Seq[W])元祖的資料集。這個操作也可以稱為groupwith 分組
    cartesain(ohterDataset) 笛卡爾積,在型別為T和U型別的資料集上呼叫,返回一個(T,U)對資料集(兩兩的元素對) 笛卡爾積

    Action行為:

    方法 Hui解釋
    reduce(func) 通過函式func(接收兩個引數,返回一個引數)聚集資料集中的所有元素。這個功能必須可交換且可關聯的,從而可以正確的並行執行 聚合
    collect() 在驅動程式中,以陣列形式返回資料集中的所有元素。通常在使用filter或者其他操作返回一個足夠小的資料子集後再使用會比較有用 查詢全部
    count() 返回資料集元素個數 數量
    first() 返回資料集第一個元素(類似於take(1)) 第一個元素
    take(n) 返回一個由資料集前n個元素組成的陣列 前面N個元素
    sample(withReplacement,num,seed) 注意 這個操作目前並非並行執行,而是由驅動程式計算所有的元素返回一個數組,該陣列由從資料集中隨機取樣的num個元素組成,可以選擇是否由隨機數替換不足的部分,seed使用者指定隨機數生成器種子 取樣
    saveAsTextFile(path) 將資料集的元素以textfile的形式儲存到本地檔案系統–HDFS或者任何其他Hadoop支援的檔案系統。對於每個元素,Spark將會呼叫toString方法,將它轉換為檔案中的文字行 儲存檔案(TXT格式)
    saveAsSequenceFile(path) 將資料集中的元素以Hadoop sequencefile的格式儲存到指定的目錄下,可以是本地系統、HDFS或者任何其他的Hadoop支援的檔案系統。這個只限於由key-value對組成,並實現了Hadoop的Writable介面,或者可以隱式的轉換為Writable的RDD(Spark包括了基本型別轉換,例如Int、Double、String等) 儲存檔案(Sequence格式)
    countByKey() 對(K,V)型別的RDD有效,返回一個(K,Int)對的map,表示每一個key對應的元素個數 數key
    foreach(func) 在資料集的每一個元素上,執行函式func進行更新。通常用於邊緣效果,例如更新一個疊加器,或者和外部儲存系統進行互動,如HBase 迴圈

遇到的大坑!

org.apache.spark.SparkException: Task not serializable

Caused by: java.io.NotSerializableException:

解決方案我另外寫了一篇BLOG
https://blog.csdn.net/HuHui_/article/details/83905302

裝備

  1. maven依賴引入
  2. 切實的去試一下怎麼玩
  3. 函數語言程式設計瞭解一下

Core

挑一些稍微難理解的RDD方法,沒有展示的可以看我的github

分析前提注意

因為太懶了,用的是我寫迪傑斯特拉最短距離演算法生成的結果資料檔案,對這個有興趣的看:https://blog.csdn.net/HuHui_/article/details/83020917

資料格式:進站站點名字,出站站點名字,最短距離經歷站點數,最短距離

舉例: 廣州南站,廣州南站,0,0

  • init
private transient SparkConf sparkConf;

private transient JavaSparkContext sparkContext;

private static final String FILE_PATH = TransformationRDDTest.class.getClassLoader().getResource("demo.txt")
            .toString();
@Before
    public void before() throws Exception {
        sparkConf = new SparkConf().setMaster("local[4]").setAppName("test");
//        sparkConf.set("spark.serializer", "org.apache.spark.serializer.JavaSerializer");
        sparkContext = new JavaSparkContext(sparkConf);
    }


/**
     * 列印測試.
     *
     * @param collect the collect
     * @since hui_project 1.0.0
     */
    private void checkResult(List<?> collect) {
        for (Object o : collect) {
            System.out.println(o.toString());
        }
    }
  • Transformation RDD:
  1. flatMap
/**
     * 元素轉換. 引數->陣列引數
     * demo計算目的:獲取地鐵站資訊切分後 獲取陣列資訊1.出發站 2.終點站 3.經歷站點數 4.距離
     * @since hui_project 1.0.0
     */
    @Test
    public void testFlatMap() {
        JavaRDD<String> textRDD = sparkContext.textFile(FILE_PATH);
        JavaRDD<String> splitRDD = textRDD
                .flatMap(x -> Arrays.asList(x.split(",")).iterator());
        checkResult(splitRDD.collect());
    }
  1. mapPartitions
 /**
     * 元素轉換,在每一個分割槽內部進行元素轉換.
     * demo計算目的:算平方。(單元測試比較難看出來分割槽作用)
     * @since hui_project 1.0.0
     */
    @Test
    public void testMapPartitions() {
        JavaRDD<Integer> parallelize = sparkContext.parallelize(Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8, 9, 10), 3);
        JavaRDD<Tuple2<Integer, Integer>> rdd = parallelize
                .mapPartitions(x -> getSquare(x));
        checkResult(rdd.collect());

    }

    /**
     * Gets square.
     *
     * @param it the it
     * @return the square
     * @since hui_project 1.0.0
     */
    @Transient
    private Iterator<Tuple2<Integer, Integer>> getSquare(Iterator<Integer> it) {
        ArrayList<Tuple2<Integer, Integer>> results = new ArrayList<>();
        while (it.hasNext()) {
            Integer next = it.next();
            results.add(new Tuple2<>(next, next * next));
        }
        return results.iterator();
    }
  1. mapPartitionsWithIndex
/**
     * 元素轉換,在每一個分割槽內部進行元素轉換.
     * demo計算目的:算平方。(引數1是分割槽的索引)
     *
     * @since hui_project 1.0.0
     */
    @Test
    public void testMapPartitionsWithIndex(){
        JavaRDD<Integer> parallelize = sparkContext.parallelize(Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8, 9, 10), 3);
        JavaRDD<Tuple2<Integer, Integer>> rdd = parallelize.mapPartitionsWithIndex((x, y) -> getSquareWithIndex(x, y), false);
        checkResult(rdd.collect());
    }

    /**
     * Get square with index iterator.
     *
     * @param partIndex the part index
     * @param it        the it
     * @return the iterator
     * @since hui_project 1.0.0
     */
    @Transient
    public Iterator<Tuple2<Integer, Integer>> getSquareWithIndex(Integer partIndex, Iterator<Integer> it){
        ArrayList<Tuple2<Integer, Integer>> results = new ArrayList<>();
        while (it.hasNext()) {
            Integer next = it.next();
            results.add(new Tuple2<>(partIndex, next * next));
        }
        return results.iterator();
    }
  1. join
/**
     * 集合關聯. 合併相同key的value
     * demo計算目的:今年和去年都獲獎的同學,獲獎項的科目都有哪些
     * @since hui_project 1.0.0
     */
    @Test
    public void testJoin() {
        //今年同學獲獎的科目
        JavaPairRDD<Object, Object> rdd1 = sparkContext.parallelize(Arrays.asList(
                new Tuple2("xiaoming", "語文")
                , new Tuple2("xiaoming", "數學")
                , new Tuple2("lihua", "數學")
                , new Tuple2("xiaofeng", "藝術")
                , new Tuple2("test", "藝術")))
                .mapToPair(x -> new Tuple2<>(x._1, x._2));
        //去年同學獲獎的科目
        JavaPairRDD<Object, Object> rdd2 = sparkContext.parallelize(Arrays.asList(
                new Tuple2("xiaoming", "藝術")
                , new Tuple2("lihua", "藝術")
                , new Tuple2("xiaofeng", "語文")))
                .mapToPair(x -> new Tuple2<>(x._1, x._2));
        JavaPairRDD<Object, Tuple2<Object, Object>> join = rdd1.join(rdd2);
        checkResult(join.collect());
    }
  1. coGroup
 /**
     * Test co group.
     * demo計算目的: 以成績分組 同學([成績優秀學科],[成績中等學科],[成績差勁學科])
     * @since hui_project 1.0.0
     */
    @Test
    public void testCoGroup() {
        //成績優秀的學生+科目
        JavaRDD<Tuple2<String, String>> scoreDetails1 = sparkContext.parallelize(Arrays.asList(
                new Tuple2("xiaoming", "語文")
                , new Tuple2("xiaoming", "數學")
                , new Tuple2("lihua", "數學")
                , new Tuple2("xiaofeng", "藝術")));
        //成績中等的學生+科目
        JavaRDD<Tuple2<String, String>> scoreDetails2 = sparkContext.parallelize(Arrays.asList(
                new Tuple2("xiaoming", "藝術")
                , new Tuple2("lihua", "藝術")
                , new Tuple2("xiaofeng", "語文")));
        //成績差的學生+科目
        JavaRDD<Tuple2<String, String>> scoreDetails3 = sparkContext.parallelize(Arrays.asList(
                new Tuple2("xiaoming", "英語")
                , new Tuple2("lihua", "英語")
                , new Tuple2("lihua", "數學")
                , new Tuple2("xiaofeng", "數學")
                , new Tuple2("xiaofeng", "英語")));

        JavaPairRDD<String, String> scoreMapRDD1 = JavaPairRDD.fromJavaRDD(scoreDetails1);
        JavaPairRDD<String, String> scoreMapRDD2 = JavaPairRDD.fromJavaRDD(scoreDetails2);
        JavaPairRDD<String, String> scoreMapRDD3 = JavaPairRDD.fromJavaRDD(scoreDetails3);

        JavaPairRDD<String, Tuple3<Iterable<String>, Iterable<String>, Iterable<String>>> cogroupRDD =
                scoreMapRDD1.cogroup(scoreMapRDD2, scoreMapRDD3);
        checkResult(cogroupRDD.collect());
    }
  • Action RDD:
  1. reduce
/**
     * 聚合(整合資料).
     *
     * @since hui_project 1.0.0
     */
    @Test
    public void testReduce() {
        JavaRDD<Integer> parallelize = sparkContext.parallelize(Arrays.asList(1, 2, 3, 4), 3);
        Tuple2<Integer, Integer> reduce = parallelize.mapToPair(x -> new Tuple2<>(x, 1))
                .reduce((x, y) -> getReduce(x, y));
        System.out.println("陣列sum:" + reduce._1 + " 計算次數:" + (reduce._2 - 1));
    }

    /**
     * 計算邏輯.
     * (x)總和->陣列的每一個數相加總和
     * (y)總和 ->計算次數
     * @param x the x
     * @param y the y
     * @return the reduce
     * @since hui_project 1.0.0
     */
    @Transient
    public Tuple2 getReduce(Tuple2<Integer, Integer> x, Tuple2<Integer, Integer> y) {
        Integer a = x._1();
        Integer b = x._2();
        a += y._1();
        b += y._2();
        return new Tuple2(a, b);
    }

Github

全部例項:

https://github.com/ithuhui/hui-bigdata-spark

總結

  1. RDD實操幾次,個人感覺都不難
  2. API不難,但是要考慮分割槽,記憶體,效能才是重點
  3. 應用場景一般在資料分析,然後分析過程中儲存結果集