1. 程式人生 > >用Java理解Spark運算元之Transformation運算元

用Java理解Spark運算元之Transformation運算元

前言

RDD運算元分類,大致可以分為兩類,即:
Transformation:轉換運算元,這類轉換並不觸發提交作業,完成作業中間過程處理。
Action:行動運算元,這類運算元會觸發SparkContext提交Job作業。
因為Action運算元比較容易理解,所以這裡只解析Transformation運算元。其實Transformation運算元的解析在網上有很多資源,但是大多都是用scala語言寫的例子,這讓很多對scala及lambda表示式的程式設計師很難進行理解。所以我將自己基於JAVA理解其的心路歷程寫下來。

解析

1、map、flatmap及mapPartitions
map十分容易理解,他是將源JavaRDD的一個一個元素的傳入call方法,並經過演算法後一個一個的返回從而生成一個新的JavaRDD。
例子:

private static JavaSparkContext sc;

    public static void main(String[] args) {
        SparkConf conf=new SparkConf();
        conf.set("spark.testing.memory", "2147480000");     //因為jvm無法獲得足夠的資源
        sc = new JavaSparkContext("local", "Spark App",conf);   //本地模式使用local
        List<Integer> list=Arrays.asList(1
,2,3,4,5,6,7,8,9); JavaRDD<Integer> arrayRDD=sc.parallelize(list); //map運算元是對RDD中的每個元素都進行一次操作後生成一個新的RDD JavaRDD<Integer> bRDD =arrayRDD.map(new Function<Integer, Integer>() { @Override public Integer call(Integer v1) throws Exception { return
v1*2; } }); System.out.println(arrayRDD.collect()); System.out.println(bRDD.collect()); }

結果:

[1, 2, 3, 4, 5, 6, 7, 8, 9]
[2, 4, 6, 8, 10, 12, 14, 16, 18]

可以看出,對於map運算元,源JavaRDD的每個元素都會進行計算,由於是依次進行傳參,所以他是有序的,新RDD的元素順序與源RDD是相同的。而由有序又引出接下來的flatMap。

flatMap與map一樣,是將RDD中的元素依次的傳入call方法,他比map多的功能是能在任何一個傳入call方法的元素後面新增任意多元素,而能達到這一點,正是因為其進行傳參是依次進行的。
示例:

int[] array=new int[]{1,2,3,4,5,6,7,8,9};
        List<Integer> list=new ArrayList<Integer>();
        for (Integer i : array) {
            list.add(i);
        }
        JavaRDD<Integer> rdd=sc.parallelize(list,2);
        //flatMap和map一樣是一個一個的傳,但是他可以在每一個傳入的值新增多個引數
        JavaRDD<Integer> result=rdd.flatMap(new FlatMapFunction<Integer, Integer>() {

            @Override
            public Iterable<Integer> call(Integer t) throws Exception {
                List<Integer> list=new ArrayList<Integer>();
                for(int i=0;i<t;i++){
                    list.add(t+i);
                }
                return list;     //返回的這個list就是傳入的元素及新增的內容
            }
        });
        System.out.println(result.collect());

結果:

[1, 2, 3, 3, 4, 5, 4, 5, 6, 7, 5, 6, 7, 8, 9, 6, 7, 8, 9, 10, 11, 7, 8, 9, 10, 11, 12, 13, 8, 9, 10, 11, 12, 13, 14, 15, 9, 10, 11, 12, 13, 14, 15, 16, 17]

flatMap的特性決定了這個運算元在對需要隨時增加元素的時候十分好用,比如在對源RDD查漏補缺時。

map和flatMap都是依次進行引數傳遞的,但有時候需要RDD中的兩個元素進行相應操作時(例如:算存款所得時,下一個月所得的利息是要原本金加上上一個月所得的本金的),這兩個運算元便無法達到目的了,這是便需要mapPartitions運算元,他傳參的方式是將整個RDD傳入,然後將一個迭代器傳出生成一個新的RDD,由於整個RDD都傳入了,所以便能完成前面說的業務。

示例:計算存款所得利息

package com.yc.operator;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.Iterator;
import java.util.List;

import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.FlatMapFunction;


public class OperatorMapPartitions {
    private static JavaSparkContext sc;

    public static void main(String[] args) {
        SparkConf conf=new SparkConf();
        conf.set("spark.testing.memory", "21474800000000");     //因為jvm無法獲得足夠的資源
        sc = new JavaSparkContext("local", "Spark App",conf);   //本地模式使用local
        List<Integer> list=Arrays.asList(1,2,3,4,5,6,7,8,9);

        JavaRDD<Integer> arrayRDD=sc.parallelize(list);
        //mapPartitions就是將整個RDD傳入,並對整個RDD操作後傳出
        JavaRDD<Double> resultRDD=arrayRDD.mapPartitions(new FlatMapFunction<Iterator<Integer>, Double>() {  

            @Override
            public Iterable<Double> call(Iterator<Integer> t) throws Exception {
                final double interest=0.1;
                List<Double> list=new ArrayList<Double>();
                double money=10000;
                //因為有整個RDD的所有元素,所以就能迴圈計算,能節省很多程式碼及演算法複雜度
                while(t.hasNext()){
                    money+=money*interest;          
                    list.add(money);
                }
                return list;
            }
        });
        System.out.println(resultRDD.collect());



    }
}

2、 keyBy,mapValues及flatMapValues
前面所說的幾樣其實都是類集合RDD,而很多時候我們需要的RDD型別卻是K,V對,那麼首先,我們便來說說將JavaRDD轉為JavaPairRDD的運算元keyBy。其實顧名思義,keyBy即使將給從RDD傳入的引數一個Key,讓其成為K,V對,這個K就是call方法的返回值,而這個K對應的V則是傳入call方法的引數:

示例:

List<String> list=Arrays.asList("張三","李四","王五","趙六","田七");
        JavaRDD<String> rdd=sc.parallelize(list, 2);
        //運算元keyBy即是傳入的RDD作為值,返回值作為鍵
        JavaPairRDD<Integer, String> result=rdd.keyBy(new Function<String, Integer>() {

            @Override
            public Integer call(String v1) throws Exception {
                return v1.length();
            }

        });
        System.out.println("這是keyBy的------------------------------------------------");
        System.out.println(result.collect());

結果:

[(2,張三), (2,李四), (2,王五), (2,趙六), (2,田七)]

這沒什麼可說的,很容易理解。而我們的到了K,V,又要使用什麼演算法呢?首先就是mapValues。顧名思義,只修改value而不修改key,所以call方法中,只傳入value,返回值是修改後的心value。

示例:(接上例)

//mapValues運算元是根據傳入的RDD,僅修改其value而不修改key
        JavaPairRDD<Integer, String> result1=result.mapValues(new Function<String, String>() {
            @Override
            public String call(String v1) throws Exception {
                return "改了";
            }
        });
        System.out.println("這是keyValues的------------------------------------------------");
        System.out.println(result1.collect());

結果:

[(2,改了), (2,改了), (2,改了), (2,改了), (2,改了)]

value全部改了。

在理解了flatMap之後flatMapValues其實很好理解了。即是在修改value的同時可以新增鍵值對,鍵是和傳入的V相同的鍵。用法與flatMap相似:

示例:(接上例)

JavaPairRDD<Integer, String> result2=result1.flatMapValues(new Function<String, Iterable<String>>() {

            @Override
            public Iterable<String> call(String v1) throws Exception {
                List<String> list=Arrays.asList("wrm","wln","張三");
                return list;
            }
        });
        System.out.println(result2.collect());

結果:

[(2,wrm), (2,wln), (2,張三), (2,wrm), (2,wln), (2,張三), (2,wrm), (2,wln), (2,張三), (2,wrm), (2,wln), (2,張三), (2,wrm), (2,wln), (2,張三)]

  • reduce,reduceByKey
    reduce其實是講RDD中的所有元素進行合併,當執行call方法時,會傳入兩個引數,在call方法中將兩個引數合併後返回,而這個返回值回合一個新的RDD中的元素再次傳入call方法中,繼續合併,直到合併到只剩下一個元素時。

示例:

List<Integer> list=Arrays.asList(1,2,3,3,3,6,7,8,9);
        JavaRDD<Integer> arrayRDD=sc.parallelize(list);
        //執行reduce時,會兩個兩個的從RDD中取出物件,然後進行一定操作合併
        //合併後的物件會和一個新的物件同時傳入作為輸出,直到得到最後一個結果
        Integer result=arrayRDD.reduce(new Function2<Integer, Integer, Integer>() {

            @Override
            public Integer call(Integer v1, Integer v2) throws Exception {
                return v1+v2;
            }
        });
        System.out.println(result);

結果:

42

得到了陣列的累加和。

而reduceByKey參考MapValues不難知道,他是僅將RDD中所有K,V對中K值相同的V進行合併。

示例:

JavaPairRDD<String, Integer> result2=arrayRDD.keyBy(new Function<Integer, String>() {

            @Override
            public String call(Integer v1) throws Exception {
                String result="張三"+v1;
                return result;
            }
        });
        System.out.println(result2.collect());
        //執行reduceByKey時,會將key值相同的組合在一起做call方法中的操作。
        JavaPairRDD<String, Integer> result3=result2.reduceByKey(new Function2<Integer, Integer, Integer>() {

            @Override
            public Integer call(Integer v1, Integer v2) throws Exception {
                return v1+v2;
            }
        });
        System.out.println(result3.collect());

結果

[(張三1,1), (張三2,2), (張三3,3), (張三3,3), (張三3,3), (張三6,6), (張三7,7), (張三8,8), (張三9,9)]
[(張三9,9), (張三6,6), (張三8,8), (張三1,1), (張三3,9), (張三7,7), (張三2,2)]

第一行是為合併前,可以看出只有K相同的進行了合併。

  • union,join和groupByKey
    當要將兩個RDD合併時,便要用到union和join,其中union只是簡單的將兩個RDD累加起來,可以看做List的addAll方法。就想List中一樣,當使用union及join時,必須保證兩個RDD的泛型是一致的。

Union示例:

List<Integer> list=Arrays.asList(1,2,2,4);
        JavaRDD<Integer> student=sc.parallelize(list);
        JavaRDD<Integer> student2=student.map(new Function<Integer, Integer>() {

            @Override
            public Integer call(Integer v1) throws Exception {
                return v1*2;
            }
        });
        JavaPairRDD<String, Integer> studentinfo=student.keyBy(new Function<Integer, String>() {

            @Override
            public String call(Integer v1) throws Exception {
                return "A"+v1;
            }

        });
        JavaPairRDD<String, Integer> studentinfo2=student2.keyBy(new Function<Integer, String>() {

            @Override
            public String call(Integer v1) throws Exception {
                return "A"+v1;
            }

        });
        //union可以將兩個RDD中所有的元素合併為一個RDD,但是得確保要合併的兩個RDD的泛型是相同的。
        //且union不去重如果要去重,可以使用distinct()方法
        //合併
        JavaPairRDD<String, Integer> result=studentinfo.union(studentinfo2);
        System.out.println("未去重:"+result.collect());
        //去重
        System.out.println("去重:"+result.distinct().collect());

結果:

未去重:[(A1,1), (A2,2), (A2,2), (A4,4), (A2,2), (A4,4), (A4,4), (A8,8)]
去重:[(A4,4), (A2,2), (A8,8), (A1,1)]

當需要對一個RDD進行去重時,可以使用distinct(),正如上例中使用的那樣。

union只是將兩個RDD簡單的累加在一起,而join則不一樣,join類似於hadoop中的combin操作,只是少了排序這一段,再說join之前說說groupByKey,因為join可以理解為union與groupByKey的結合:groupBy是將RDD中的元素進行分組,組名是call方法中的返回值,而顧名思義groupByKey是將PairRDD中擁有相同key值得元素歸為一組。即:

List<Integer> list=Arrays.asList(1,2,2,4);
        JavaRDD<Integer> student=sc.parallelize(list);
        JavaRDD<Integer> student2=student.map(new Function<Integer, Integer>() {

            @Override
            public Integer call(Integer v1) throws Exception {
                return v1*2;
            }
        });
        JavaPairRDD<String, Integer> studentinfo=student.keyBy(new Function<Integer, String>() {

            @Override
            public String call(Integer v1) throws Exception {
                return "A"+v1;
            }

        });
        JavaPairRDD<String, Integer> studentinfo2=student2.keyBy(new Function<Integer, String>() {

            @Override
            public String call(Integer v1) throws Exception {
                return "A"+v1;
            }

        });

        JavaPairRDD<String, Iterable<Integer>> result=studentinfo.groupByKey();
        System.out.println(result.collect());

結果:

[(A4,[4]), (A1,[1]), (A2,[2, 2])]

看著就和hadoop的進入reduce的K,

//groupBy與join類似於hadoop中的combin操作,只是少了排序這一段
        //groupBy是將RDD中的元素進行分組,組名是call方法中的返回值
        //而groupByKey是將PairRDD中擁有相同key值得元素歸為一組
        JavaPairRDD<String, Iterable<Integer>> result=studentinfo.groupByKey();
        System.out.println(result.collect());
        //join是將兩個PairRDD合併,並將有相同key的元素分為一組,可以理解為groupByKey和Union的結合
        JavaPairRDD<String, Tuple2<Integer, Integer>> result2=studentinfo.join(studentinfo2);
        System.out.println(result2.collect());

結果:

[(A4,(4,4)), (A4,(4,4)), (A2,(2,2)), (A2,(2,2))]

3、sample,cartesian
spark的運算元中還有一些用於特定計算的運算元,例如sample用作抽樣。他的用法即所需注意事項在我的示例中寫得很詳細,就不贅述了,直接上程式碼:

package com.yc.operator;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;

import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;

public class OperatorSample {
    private static JavaSparkContext sc;
    public static void main(String[] args) {
        SparkConf conf=new SparkConf();
        conf.set("spark.testing.memory", "2147480000");     //因為jvm無法獲得足夠的資源
        sc = new JavaSparkContext("local", "Spark App",conf);   //本地模式使用local
        List<Integer> list=new ArrayList<Integer>();
        for(int i=1;i<=100;i++){
            list.add(i);
        }
        JavaRDD<Integer> any=sc.parallelize(list);
        //sample用來從RDD中抽取樣本。他有三個引數
        //withReplacement:表示樣本是否放回 true放回
        //fraction:抽取樣本的比例
        //seed:隨機數生成種子
        //由於樣本的抽取其實是以一個固定的演算法實現的,所以要達到隨機抽樣需用隨機數生成seed
        JavaRDD<Integer> sample=any.sample(true, 0.1, 0);
        System.out.println("seed=0:"+sample.collect());
        sample=any.sample(true, 0.1, 0);
        System.out.println("seed=0:"+sample.collect()); //由於seed相同,所以抽出樣本是相同的

        //這裡使用系統時間作為seed,發現抽出的樣本是隨機的
        JavaRDD<Integer> sample1=any.sample(true, 0.1,System.currentTimeMillis());
        System.out.println("seed隨機生成1"+sample1.collect());
        sample1=any.sample(true, 0.1,System.currentTimeMillis());
        System.out.println("seed隨機生成2"+sample1.collect());

    }
}

結果

seed=0:[10, 23, 25, 35, 50, 68, 69, 79, 79, 85, 91, 91]
seed=0:[10, 23, 25, 35, 50, 68, 69, 79, 79, 85, 91, 91]
seed隨機生成1[37, 53, 63, 63, 81, 98]
seed隨機生成2[35, 63, 73, 80, 84, 85]

而cartesian是用於求笛卡爾積的,同樣直接上程式碼:

package com.yc.operator;

import java.util.Arrays;
import java.util.List;

import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.rdd.CartesianRDD;

public class OperatorCartesian {
    private static JavaSparkContext sc;
    public static void main(String[] args) {
        SparkConf conf=new SparkConf();
        conf.set("spark.testing.memory", "2147480000");     //因為jvm無法獲得足夠的資源
        sc = new JavaSparkContext("local", "Spark App",conf);   //本地模式使用local
        List<Integer> list=Arrays.asList(1,2,3,4);
        List<Integer> list2=Arrays.asList(5,6,7,1);
        JavaRDD<Integer> arrayRDD=sc.parallelize(list);
        JavaRDD<Integer> arrayRDD2=sc.parallelize(list2);
        //運算元cartesian就是用來求兩個RDD的笛卡爾積的。
        JavaPairRDD<Integer, Integer> result=arrayRDD.cartesian(arrayRDD2);
        System.out.println(result.collect());
    }
}

希望這篇日誌能幫到大家。
碼字不易,轉載請註明出處!