03、操作RDD(transformation和action案例實戰)
阿新 • • 發佈:2018-12-27
// 這裡通過textFile()方法,針對外部檔案建立了一個RDD,lines,但是實際上,程式執行到這裡為止,spark.txt檔案的資料是不會載入到記憶體中的。lines,只是代表了一個指向spark.txt檔案的引用。val lines = sc.textFile("spark.txt")// 這裡對lines RDD進行了map運算元,獲取了一個轉換後的lineLengths RDD。但是這裡連資料都沒有,當然也不會做任何操作。lineLengths RDD也只是一個概念上的東西而已。val lineLengths = lines.map(line => line.length)// 之列,執行了一個action操作,reduce。此時就會觸發之前所有transformation操作的執行,Spark會將操作拆分成多個task到多個機器上並行執行,每個task會在本地執行map操作,並且進行本地的reduce聚合。最後會進行一個全域性的reduce聚合,然後將結果返回給Driver程式。val totalLength = lineLengths.reduce(_ + _) import org.apache.spark.api.java.function.VoidFunction;import scala.Tuple2;/** * 統計每行出現的次數,即同一行在檔案裡出現的次數 */publicclass LineCount { public static void main(String[] args) { // 建立SparkConf SparkConf conf = new SparkConf().setAppName("LineCount").setMaster("local"); // 建立JavaSparkContext JavaSparkContext sc = new JavaSparkContext(conf); // 建立初始RDD,lines,每個元素是一行文字 JavaRDD<String> lines = sc.textFile("test.txt"); // 對lines RDD執行mapToPair運算元,將每一行對映為(line, 1)的這種key-value對的格式 // 然後後面才能統計每一行出現的次數 JavaPairRDD<String, Integer> pairs = lines.mapToPair( new PairFunction<String, String, Integer>() { private static final long serialVersionUID = 1L; @Override public Tuple2<String, Integer> call(String t) throws Exception { return new Tuple2<String, Integer>(t, 1); } }); // 對pairs RDD執行reduceByKey運算元,統計出每一行出現的總次數 JavaPairRDD<String, Integer> lineCounts = pairs.reduceByKey( new Function2<Integer, Integer, Integer>() { private static final long serialVersionUID = 1L; @Override public Integer call(Integer v1, Integer v2) throws Exception { return v1 + v2; } }); // 執行一個action操作,foreach,打印出每一行出現的次數 lineCounts.foreach(new VoidFunction<Tuple2<String, Integer>>() { private static final long serialVersionUID = 1L; @Override public void call(Tuple2<String, Integer> t) throws Exception { System.out.println(t._1 + " : " + t._2); } }); // 關閉JavaSparkContext sc.close(); }}