1. 程式人生 > >spark map flatMap flatMapToPair mapPartitions 的區別和用途

spark map flatMap flatMapToPair mapPartitions 的區別和用途

import akka.japi.Function2;
import org.apache.spark.HashPartitioner;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.FlatMapFunction;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.api.java.function.PairFlatMapFunction;
import org.apache.spark.storage.StorageLevel;
import scala.Tuple2;

import java.io.File;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Iterator;
import java.util.List;

/**
         * map flatMap flatMapToPair mapPartitions 的區別和用途
         *
         * 例如資料是:name:gaoyue age:28
         *
         * 方法一:map,我們可以看到資料的每一行在map之後產生了一個數組,那麼rdd儲存的是一個數組的集合
         * rdd儲存的狀態是Array[Array[String]] = Array(Array(name, gaoyue), Array(age, 28))
         *Array[String] = Array(name, gaoyue, age, 28)
         */

        JavaRDD<String[]> mapresult=lines.map(new Function<String, String[]>() {

            @Override
            public String[] call(String s) throws Exception {
                return s.split(":");
            }
        });




        /**
         * 方法二:flatMap
         * 操作1:同map函式一樣:對每一條輸入進行指定的操作,然後為每一條輸入返回一個物件
         * 操作2:最後將所有物件合併為一個物件
         */
        JavaRDD<String> objectJavaRDD = lines.flatMap(new FlatMapFunction<String, String>() {

            @Override
            public Iterable<String> call(String s) throws Exception {
                return Arrays.asList(s.split(" "));
            }
        });

        /**
         * 方法三:
         * mappartition
         *rdd的mapPartitions是map的一個變種,它們都可進行分割槽的並行處理。兩者的主要區別是呼叫的粒度不一樣:
         * map的輸入變換函式是應用於RDD中每個元素,而mapPartitions的輸入函式是應用於每個分割槽。也就是把每個分割槽中的內容作為整體來處理的。
         *
         */
        lines2.mapPartitions(new FlatMapFunction<Iterator<String>, String>() {
            ArrayList<String> results = new ArrayList<String>();

            @Override
            public Iterable<String> call(Iterator<String> s) throws Exception {
                while (s.hasNext()) {
                    results.addAll(Arrays.asList(s.next().split(":")));
                }
                return results;
            }
        }).saveAsTextFile("/Users/luoluowushengmimi/Documents/result");

        /**
         * flatMapToPair
         * 操作1:同map函式一樣:對每一條輸入進行指定的操作,然後為每一條輸入返回一個key-value物件
         * 操作2:最後將所有key-value物件合併為一個物件 Iterable<Tuple2<String, String>>
         *
         */

        JavaPairRDD<String,String> pair=lines.flatMapToPair(new PairFlatMapFunction<String, String, String>() {

            @Override
            public Iterable<Tuple2<String, String>> call(String s) throws Exception {
                String[] temp=s.split(":");
                ArrayList<Tuple2<String,String>> list=new ArrayList<Tuple2<String,String>>();
                list.add(new Tuple2<String,String>(temp[0],temp[1]));
                return list;
            }
        });