spark記錄(3)spark算子之Transformation
一、map、flatMap、mapParations、mapPartitionsWithIndex
1.1 map
map十分容易理解,他是將源JavaRDD的一個一個元素的傳入call方法,並經過算法後一個一個的返回從而生成一個新的JavaRDD。
(1) 使用Java進行編寫
public static void map() { List<String> list = Arrays.asList("李光洙","劉在石","哈哈","宋智孝"); JavaRDD<String> rdd = jsc.parallelize(list); JavaRDD<String> map = rdd.map(new Function<String, String>() { private static final long serialVersionUID = 1L; @Override public String call(String name) throws Exception { return "hello,"+name; } }); map.foreach(new VoidFunction<String>() { /** * */ private static final long serialVersionUID = 1L; @Override public void call(String msg) throws Exception { System.out.println(msg); } }); }
(2) 使用scala進行編寫
def map(): Unit = { val list = List("李光洙","劉在石","哈哈","宋智孝"); val rdd = sc.parallelize(list) val map = rdd.map(s => "hello," + s).foreach(println) }
(3)運行結果
(4) 總結
可以看出,對於map算子,源JavaRDD的每個元素都會進行計算,由於是依次進行傳參,所以他是有序的,新RDD的元素順序與源RDD是相同的。而由有序又引出接下來的flatMap。
1.2 flatMap
flatMap與map一樣,是將RDD中的元素依次的傳入call方法,他比map多的功能是能在任何一個傳入call方法的元素後面添加任意多元素,而能達到這一點,正是因為其進行傳參是依次進行的。
(1) 使用Java進行編寫
public static void flatmap() { List<String> list = Arrays.asList("李光洙 劉在石","哈哈 宋智孝"); JavaRDD<String> rdd = jsc.parallelize(list); JavaRDD<String> map = rdd.flatMap(new FlatMapFunction<String, String>() { private static final long serialVersionUID = 1L; @Override public Iterable<String> call(String line) throws Exception { return Arrays.asList(line.split(" ")); } }).map(new Function<String, String>() { private static final long serialVersionUID = 1L; @Override public String call(String s) throws Exception { return "你好," + s; } }); map.foreach(new VoidFunction<String>() { @Override public void call(String s) throws Exception { System.out.println(s); } }); }
(2) 使用scala進行編寫
def flatmap(): Unit = { val list = List("李光洙 劉在石","哈哈 宋智孝"); val rdd = sc.parallelize(list) rdd.flatMap(_.split(" ")).map("你好,"+_).foreach(println) }
(3) 運行結果
(4) 總結
flatMap的特性決定了這個算子在對需要隨時增加元素的時候十分好用,比如在對源RDD查漏補缺時。
map和flatMap都是依次進行參數傳遞的,但有時候需要RDD中的兩個元素進行相應操作時(例如:算存款所得時,下一個月所得的利息是要原本金加上上一個月所得的本金的),這兩個算子便無法達到目的了,這是便需要mapPartitions算子,他傳參的方式是將整個RDD傳入,然後將一個叠代器傳出生成一個新的RDD,由於整個RDD都傳入了,所以便能完成前面說的業務。
map是對RDD中元素逐一進行函數操作映射為另外一個RDD,而flatMap操作是將函數應用於RDD之中的每一個元素,將返回的叠代器的所有內容構成新的RDD。而flatMap操作是將函數應用於RDD中每一個元素,將返回的叠代器的所有內容構成RDD。
flatMap與map區別在於map為“映射”,而flatMap“先映射,後扁平化”,map對每一次(func)都產生一個元素,返回一個對象,而flatMap多一步就是將所有對象合並為一個對象。
1.3 mapPartitions
與map方法類似,map是對rdd中的每一個元素進行操作,而mapPartitions(foreachPartition)則是對rdd中的每個分區的叠代器進行操作。如果在map過程中需要頻繁創建額外的對象(例如將rdd中的數據通過jdbc寫入數據庫,map需要為每個元素創建一個鏈接而mapPartition為每個partition創建一個鏈接),則mapPartitions效率比map高的多。
(1) 使用Java進行編寫
public static void mapPartitions() { JavaRDD<String> textFile = jsc.textFile("words",3); textFile.mapPartitions(new FlatMapFunction<Iterator<String>, String>() { private static final long serialVersionUID = 1L; @Override public Iterable<String> call(Iterator<String> is) throws Exception { System.out.println("創建數據庫連接。。。。"); List<String> list = new ArrayList<String>(); while(is.hasNext()) { list.add(is.next()); System.out.println("模擬向數據庫插入批量數據。。。"); } System.out.println("關閉數據庫連接。。。"); return list; } }).collect(); }
(2) 使用scala進行編寫
def mapPartitions: Unit = { val rdd1 = sc.textFile("words") val mapResult = rdd1.mapPartitions(iter =>{ println("打開數據庫。。。") val list = List() while(iter.hasNext){ list.addString(new StringBuilder(iter.next())) println("插入數據庫。。。") } println("關閉數據庫。。。") list.iterator }, false) mapResult.foreach(println) }
(3) 運行結果
(4)總結
mapPartitions比較適合需要分批處理數據的情況,比如將數據插入某個表,每批數據只需要開啟一次數據庫連接,大大減少了連接開支。
spark記錄(3)spark算子之Transformation