Spark RDD操作之Map系運算元
阿新 • • 發佈:2019-03-07
本篇部落格將介紹Spark RDD的Map系運算元的基本用法。
1、map
map將RDD的元素一個個傳入call方法,經過call方法的計算之後,逐個返回,生成新的RDD,計算之後,記錄數不會縮減。示例程式碼,將每個數字加10之後再打印出來, 程式碼如下
import java.util.Arrays; import org.apache.spark.SparkConf; import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaSparkContext; import org.apache.spark.api.java.function.Function; import org.apache.spark.api.java.function.VoidFunction; public class Map { public static void main(String[] args) { SparkConf conf = new SparkConf().setAppName("spark map").setMaster("local[*]"); JavaSparkContext javaSparkContext = new JavaSparkContext(conf); JavaRDD<Integer> listRDD = javaSparkContext.parallelize(Arrays.asList(1, 2, 3, 4)); JavaRDD<Integer> numRDD = listRDD.map(new Function<Integer, Integer>() { @Override public Integer call(Integer num) throws Exception { return num + 10; } }); numRDD.foreach(new VoidFunction<Integer>() { @Override public void call(Integer num) throws Exception { System.out.println(num); } }); } }
執行結果:
2、flatMap
flatMap和map的處理方式一樣,都是把原RDD的元素逐個傳入進行計算,但是與之不同的是,flatMap返回值是一個Iterator,也就是會一生多,超生
import java.util.Arrays; import java.util.Iterator; import org.apache.spark.SparkConf; import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaSparkContext; import org.apache.spark.api.java.function.FlatMapFunction; import org.apache.spark.api.java.function.VoidFunction; public class FlatMap { public static void main(String[] args) { SparkConf conf = new SparkConf().setAppName("spark map").setMaster("local[*]"); JavaSparkContext javaSparkContext = new JavaSparkContext(conf); JavaRDD<String> listRDD = javaSparkContext .parallelize(Arrays.asList("hello wold", "hello java", "hello spark")); JavaRDD<String> rdd = listRDD.flatMap(new FlatMapFunction<String, String>() { private static final long serialVersionUID = 1L; @Override public Iterator<String> call(String input) throws Exception { return Arrays.asList(input.split(" ")).iterator(); } }); rdd.foreach(new VoidFunction<String>() { private static final long serialVersionUID = 1L; @Override public void call(String num) throws Exception { System.out.println(num); } }); } }
執行結果:
<