1. 程式人生 > >03、操作RDD(transformation和action案例實戰)

03、操作RDD(transformation和action案例實戰)

// 這裡通過textFile()方法,針對外部檔案建立了一個RDD,lines,但是實際上,程式執行到這裡為止,spark.txt檔案的資料是不會載入到記憶體中的。lines,只是代表了一個指向spark.txt檔案的引用。val lines = sc.textFile("spark.txt")// 這裡對lines RDD進行了map運算元,獲取了一個轉換後的lineLengths RDD。但是這裡連資料都沒有,當然也不會做任何操作。lineLengths RDD也只是一個概念上的東西而已。val lineLengths = lines.map(line => line.length)// 之列,執行了一個action操作,reduce。此時就會觸發之前所有transformation操作的執行,Spark會將操作拆分成多個task到多個機器上並行執行,每個task會在本地執行map操作,並且進行本地的reduce聚合。最後會進行一個全域性的reduce聚合,然後將結果返回給Driver程式。val totalLength = lineLengths.reduce(_ + _)

 3、案例:統計檔案每行出現的次數

    3.1、java

package sparkcore.java;import org.apache.spark.SparkConf;import org.apache.spark.api.java.JavaPairRDD;import org.apache.spark.api.java.JavaRDD;import org.apache.spark.api.java.JavaSparkContext;import org.apache.spark.api.java.function.Function2;import org.apache.spark.api.java.function.PairFunction;
import org.apache.spark.api.java.function.VoidFunction;import scala.Tuple2;/** * 統計每行出現的次數,即同一行在檔案裡出現的次數 */publicclass LineCount {    public static void main(String[] args) {        // 建立SparkConf        SparkConf conf = new SparkConf().setAppName("LineCount").setMaster("local");        // 建立JavaSparkContext
        JavaSparkContext sc = new JavaSparkContext(conf);        // 建立初始RDD,lines,每個元素是一行文字        JavaRDD<String> lines = sc.textFile("test.txt");        // 對lines RDD執行mapToPair運算元,將每一行對映為(line, 1)的這種key-value對的格式        // 然後後面才能統計每一行出現的次數        JavaPairRDD<String, Integer> pairs = lines.mapToPair(
                new PairFunction<String, String, Integer>() {                    private static final long serialVersionUID = 1L;                    @Override                    public Tuple2<String, Integer> call(String tthrows Exception {                        return new Tuple2<String, Integer>(t, 1);                    }                });        // 對pairs RDD執行reduceByKey運算元,統計出每一行出現的總次數        JavaPairRDD<String, Integer> lineCounts = pairs.reduceByKey(                new Function2<Integer, Integer, Integer>() {                    private static final long serialVersionUID = 1L;                    @Override                    public Integer call(Integer v1, Integer v2throws Exception {                        return v1 + v2;                    }                });        // 執行一個action操作,foreach,打印出每一行出現的次數        lineCounts.foreach(new VoidFunction<Tuple2<String, Integer>>() {            private static final long serialVersionUID = 1L;            @Override            public void call(Tuple2<String, Integer> tthrows Exception {                System.out.println(t._1 + " : " + t._2);            }        });        // 關閉JavaSparkContext        sc.close();    }}

    3.2、scala

package sparkcore.scalaimport org.apache.spark.SparkConfimport org.apache.spark.SparkContextobject LineCount {  def main(args: Array[String]) {    val conf = new SparkConf()      .setAppName("LineCount")      .setMaster("local")    val sc = new SparkContext(conf);    val lines = sc.textFile("test.txt"1)    val pairs = lines.map { (_, 1) }    val lineCounts = pairs.reduceByKey { _ + _ }    lineCounts.foreach(lineCount => println(lineCount._1 + " : " + lineCount._2 ))  }}