1. 程式人生 > >Spark RDD操作之Map系運算元

Spark RDD操作之Map系運算元

開發十年,就只剩下這套架構體系了! >>>   

    本篇部落格將介紹Spark RDD的Map系運算元的基本用法。

    1、map

    map將RDD的元素一個個傳入call方法,經過call方法的計算之後,逐個返回,生成新的RDD,計算之後,記錄數不會縮減。示例程式碼,將每個數字加10之後再打印出來, 程式碼如下

import java.util.Arrays;

import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.api.java.function.VoidFunction;

public class Map {
	public static void main(String[] args) {
		SparkConf conf = new SparkConf().setAppName("spark map").setMaster("local[*]");
		JavaSparkContext javaSparkContext = new JavaSparkContext(conf);
		JavaRDD<Integer> listRDD = javaSparkContext.parallelize(Arrays.asList(1, 2, 3, 4));

		JavaRDD<Integer> numRDD = listRDD.map(new Function<Integer, Integer>() {
			@Override
			public Integer call(Integer num) throws Exception {
				return num + 10;
			}
		});
		numRDD.foreach(new VoidFunction<Integer>() {
			@Override
			public void call(Integer num) throws Exception {
				System.out.println(num);
			}
		});
	}

}

    執行結果:

    2、flatMap

    flatMap和map的處理方式一樣,都是把原RDD的元素逐個傳入進行計算,但是與之不同的是,flatMap返回值是一個Iterator,也就是會一生多,超生

import java.util.Arrays;
import java.util.Iterator;

import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.FlatMapFunction;
import org.apache.spark.api.java.function.VoidFunction;

public class FlatMap {
	public static void main(String[] args) {
		SparkConf conf = new SparkConf().setAppName("spark map").setMaster("local[*]");
		JavaSparkContext javaSparkContext = new JavaSparkContext(conf);
		JavaRDD<String> listRDD = javaSparkContext
				.parallelize(Arrays.asList("hello wold", "hello java", "hello spark"));
		JavaRDD<String> rdd = listRDD.flatMap(new FlatMapFunction<String, String>() {
			private static final long serialVersionUID = 1L;

			@Override
			public Iterator<String> call(String input) throws Exception {
				return Arrays.asList(input.split(" ")).iterator();
			}
		});
		rdd.foreach(new VoidFunction<String>() {
			private static final long serialVersionUID = 1L;
			@Override
			public void call(String num) throws Exception {
				System.out.println(num);
			}
		});
	}

}

    執行結果:

<