1. 程式人生 > >spark RDD運算元(二) filter,map ,flatMap

spark RDD運算元(二) filter,map ,flatMap

作者: 翟開順
首發:CSDN

先來一張spark快速大資料中的圖片進行快速入門,後面有更詳細的例子

filter

舉例,在F:\sparktest\sample.txt 檔案的內容如下

aa bb cc aa aa aa dd dd ee ee ee ee 
ff aa bb zks
ee kks
ee  zz zks

我要將包含zks的行的內容給找出來
scala版本

    val lines = sc.textFile("F:\\sparktest\\sample.txt").filter(line=>line.contains("zks"
)) //列印內容 lines.collect().foreach(println(_)); -------------輸出------------------ ff aa bb zks ee zz zks

java版本

        JavaRDD<String> lines = sc.textFile("F:\\sparktest\\sample.txt");
        JavaRDD<String> zksRDD = lines.filter(new Function<String, Boolean>() {
            @Override
public Boolean call(String s) throws Exception { return s.contains("zks"); } }); //列印內容 List<String> zksCollect = zksRDD.collect(); for (String str:zksCollect) { System.out.println(str); } ----------------輸出------------------- ff aa bb zks ee zz zks

map

map() 接收一個函式,把這個函式用於 RDD 中的每個元素,將函式的返回結果作為結果RDD程式設計 | 31
RDD 中對應元素的值 map是一對一的關係
舉例,在F:\sparktest\sample.txt 檔案的內容如下

aa bb cc aa aa aa dd dd ee ee ee ee 
ff aa bb zks
ee kks
ee  zz zks

把每一行變成一個數組
scala版本

//讀取資料
scala> val lines = sc.textFile("F:\\sparktest\\sample.txt")
//用map,對於每一行資料,按照空格分割成一個一個數組,然後返回的是一對一的關係
scala> var mapRDD = lines.map(line => line.split("\\s+"))
---------------輸出-----------
res0: Array[Array[String]] = Array(Array(aa, bb, cc, aa, aa, aa, dd, dd, ee, ee, ee, ee), Array(ff, aa, bb, zks), Array(ee, kks), Array(ee, zz, zks))

//讀取第一個元素
scala> mapRDD.first
---輸出----
res1: Array[String] = Array(aa, bb, cc, aa, aa, aa, dd, dd, ee, ee, ee, ee)

java版本

        JavaRDD<Iterable<String>> mapRDD = lines.map(new Function<String, Iterable<String>>() {
            @Override
            public Iterable<String> call(String s) throws Exception {
                String[] split = s.split("\\s+");
                return Arrays.asList(split);
            }
        });
        //讀取第一個元素
        System.out.println(mapRDD.first());
    ---------------輸出-------------
    [aa, bb, cc, aa, aa, aa, dd, dd, ee, ee, ee, ee]

flatMap

有時候,我們希望對某個元素生成多個元素,實現該功能的操作叫作 flatMap()
faltMap的函式應用於每一個元素,對於每一個元素返回的是多個元素組成的迭代器(想要了解更多,請參考scala的flatMap和map用法)
例如我們將資料切分為單詞
scala版本

    scala>  val lines = sc.textFile("F:\\sparktest\\sample.txt")
    scala> val flatMapRDD = lines.flatMap(line=>line.split("\\s"))
    scala> flatMapRDD.first() 
---輸出----
res0: String = aa

java版本,spark2.0以下

    JavaRDD<String> lines = sc.textFile("F:\\sparktest\\sample.txt");
    JavaRDD<String> flatMapRDD = lines.flatMap(new FlatMapFunction<String, String>() {
        @Override
        public Iterable<String> call(String s) throws Exception {
            String[] split = s.split("\\s+");
            return Arrays.asList(split);
        }
    });
    //輸出第一個
    System.out.println(flatMapRDD.first());
------------輸出----------
aa

java版本,spark2.0以上
spark2.0以上,對flatMap的方法有所修改,就是flatMap中的Iterator和Iteratable的小區別

        JavaRDD<String> flatMapRDD = lines.flatMap(new FlatMapFunction<String, String>() {
            @Override
            public Iterator<String> call(String s) throws Exception {
                String[] split = s.split("\\s+");
                return Arrays.asList(split).iterator();
            }
        });