1. 程式人生 > >spark RDD運算元(四)之建立鍵值對RDD mapToPair flatMapToPair

spark RDD運算元(四)之建立鍵值對RDD mapToPair flatMapToPair

mapToPair

舉例,在F:\sparktest\sample.txt 檔案的內容如下

aa bb cc aa aa aa dd dd ee ee ee ee 
ff aa bb zks
ee kks
ee  zz zks

將每一行的第一個單詞作為鍵,1 作為value建立pairRDD
scala版本
scala是沒有mapToPair函式的,scala版本只需要map就可以了

    scala> val lines = sc.textFile("F:\\sparktest\\sample.txt")

    scala> val pairs = lines.map(x => (x.split("\\s+"
)(0), 1)) scala> pairs.collect res0: Array[(String, Int)] = Array((aa,1), (ff,1), (ee,1), (ee,1))

java版本

    JavaRDD<String> lines = sc.textFile("F:\\sparktest\\sample.txt");
    //輸入的是一個string的字串,輸出的是一個(String, Integer) 的map
    JavaPairRDD<String, Integer> pairRDD = lines.mapToPair(new
PairFunction<String, String, Integer>() { @Override public Tuple2<String, Integer> call(String s) throws Exception { return new Tuple2<String, Integer>(s.split("\\s+")[0], 1); } });

flatMapToPair

類似於xxx連線 mapToPair是一對一,一個元素返回一個元素,而flatMapToPair可以一個元素返回多個,相當於先flatMap,在mapToPair
例子: 將每一個單詞都分成鍵為
scala版本

    val lines = sc.textFile("F:\\sparktest\\sample.txt")
    val flatRDD = lines.flatMap(x => (x.split("\\s+")))
    val pairs = flatRDD.map(x=>(x,1))

    scala> pairs.collect
    res1: Array[(String, Int)] = Array((aa,1), (bb,1), (cc,1), (aa,1), (aa,1), (aa,1), (dd,1), (dd,1), (ee,1), (ee,1), (ee,1), (ee,1), (ff,1), (aa,1), (bb,1), (zks,1), (ee,1), (kks,1), (ee,1), (zz,1), (zks,1))

java版本 spark2.0以下

JavaPairRDD<String, Integer> wordPairRDD = lines.flatMapToPair(new PairFlatMapFunction<String, String, Integer>() {
            @Override
            public Iterable<Tuple2<String, Integer>> call(String s) throws Exception {
                ArrayList<Tuple2<String, Integer>> tpLists = new ArrayList<Tuple2<String, Integer>>();
                String[] split = s.split("\\s+");
                for (int i = 0; i <split.length ; i++) {
                    Tuple2 tp = new Tuple2<String,Integer>(split[i], 1);
                    tpLists.add(tp);
                }
            return tpLists;
            }
        });

java版本 spark2.0以上
主要是iterator和iteratable的一些區別

        JavaPairRDD<String, Integer> wordPairRDD = lines.flatMapToPair(new PairFlatMapFunction<String, String, Integer>() {
            @Override
            public Iterator<Tuple2<String, Integer>> call(String s) throws Exception {
                ArrayList<Tuple2<String, Integer>> tpLists = new ArrayList<Tuple2<String, Integer>>();
                String[] split = s.split("\\s+");
                for (int i = 0; i <split.length ; i++) {
                    Tuple2 tp = new Tuple2<String,Integer>(split[i], 1);
                    tpLists.add(tp);
                }
                return tpLists.iterator();
            }
        });