02、建立RDD(集合、本地檔案、HDFS檔案)
阿新 • • 發佈:2018-12-27
Spark Core提供了三種建立RDD的方式,包括:使用程式中的集合建立RDD;使用本地檔案建立RDD;使用HDFS檔案建立RDD。 package sparkcore;import java.util.Arrays;import java.util.List;import org.apache.spark.SparkConf;import org.apache.spark.api.java.JavaRDD;import org.apache.spark.api.java.JavaSparkContext;import org.apache.spark.api.java.function.Function2;/** * 並行化集合建立RDD 案例:累加1到10 */publicclass ParallelizeCollection { public static void main(String[] args) { // 建立SparkConf SparkConf conf = new SparkConf().setAppName("ParallelizeCollection").setMaster("local"); // 建立JavaSparkContext JavaSparkContext sc = new JavaSparkContext(conf); // 要通過並行化集合的方式建立RDD,那麼就呼叫SparkContext以及其子類,的parallelize()方法 List<Integer> numbers = Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8, 9, 10); JavaRDD<Integer> numberRDD = sc.parallelize(numbers); // 執行reduce運算元操作 // 相當於,先進行1 + 2 = 3;然後再用3 + 3 = 6;然後再用6 + 4 = 10。。。以此類推 int sum = numberRDD.reduce(new Function2<Integer, Integer, Integer>() { private static final long serialVersionUID = 1L; public Integer call(Integer num1, Integer num2) throws Exception { return num1 + num2; } }); // 輸出累加的和 System.out.println("1到10的累加和:" + sum); // 關閉JavaSparkContext sc.close(); }}
package sparkcore.scalaimport org.apache.spark.SparkConfimport org.apache.spark.SparkContextobject LocalFile { def main(args: Array[String]) { val conf = new SparkConf() .setAppName("LocalFile").setMaster("local"); val sc = new SparkContext(conf) val lines = sc.textFile("D:/eclipse-jee-neon-3/workspace/sparkcore-scala/test.txt", 1); val count = lines.map { line => line.length() }.reduce(_ + _) println("file's count is " + count) }}package sparkcore.scalaimport org.apache.spark.SparkConfimport org.apache.spark.SparkContextobject HDFSFile { def main(args: Array[String]) { val conf = new SparkConf() .setAppName("HDFSFile").setMaster("local") val sc = new SparkContext(conf) val lines = sc.textFile("hdfs://node1:8020/test.txt", 1); val count = lines.map { _.length() }.reduce(_ + _) println("file's count is " + count) }}
1、並行化集合
如果要通過並行化集合來建立RDD,需要針對程式中的集合,呼叫SparkContext的parallelize()方法。Spark會將集合中的資料拷貝到叢集上去,形成一個分散式的資料集合,也就是一個RDD。相當於是,集合中的部分資料會到一個節點上,而另一部分資料會到其他節點上。然後就可以用並行的方式來操作這個分散式資料集合,即RDD。// 案例:1到10累加求和val arr = Array(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)val rdd = sc.parallelize(arr)val sum = rdd.reduce(_ + _)呼叫parallelize()時,有一個重要的引數可以指定,就是要將集合切分成多少個partition。Spark會為每一個partition執行一個task來進行處理。Spark官方的建議是,為叢集中的每個CPU建立2~4個partition。Spark預設會根據叢集的情況來設定partition的數量。但是也可以在呼叫parallelize()方法時,傳入第二個引數,來設定RDD的partition數量。比如parallelize(arr, 10)1.1、Java
1.2、Scala
package sparkcore.scalaimport org.apache.spark.SparkConfimport org.apache.spark.SparkContextobject ParallelizeCollection { def main(args: Array[String]) { val conf = new SparkConf() .setAppName("ParallelizeCollection") .setMaster("local") val sc = new SparkContext(conf) val numbers = Array(1, 2, 3, 4, 5, 6, 7, 8, 9, 10) val numberRDD = sc.parallelize(numbers, 5) val sum = numberRDD.reduce(_ + _) println("1到10的累加和:" + sum) }}2、使用本地檔案和HDFS建立RDD
Spark是支援使用任何Hadoop支援的儲存系統上的檔案建立RDD的,比如說HDFS、Cassandra、HBase以及本地檔案。通過呼叫SparkContext的textFile()方法,可以針對本地檔案或HDFS檔案建立RDD。有幾個事項是需要注意的:1、如果是針對本地檔案的話,如果是在windows上本地測試,windows上有一份檔案即可;如果是在spark叢集上針對linux本地檔案,那麼需要將檔案拷貝到所有worker節點上。2、Spark的textFile()方法支援針對目錄、壓縮檔案以及萬用字元進行RDD建立。3、Spark預設會為hdfs檔案的每一個block建立一個partition,但是也可以通過textFile()的第二個引數手動設定分割槽數量,只能比block數量多,不能比block數量少。// 案例:檔案字數統計val rdd = sc.textFile("data.txt")val wordCount = rdd.map(line => line.length).reduce(_ + _)Spark的textFile()除了可以針對上述幾種普通的檔案建立RDD之外,還有一些特列的方法來建立RDD:1、SparkContext.wholeTextFiles()方法,可以針對一個目錄中的大量小檔案,返回<filename, fileContent>組成的pair,作為一個PairRDD,而不是普通的RDD。普通的textFile()返回的RDD中,每個元素就是檔案中的一行文字。2、SparkContext.sequenceFile[K, V]()方法,可以針對SequenceFile建立RDD,K和V泛型型別就是SequenceFile的key和value的型別。K和V要求必須是Hadoop的序列化型別,比如IntWritable、Text等。3、SparkContext.hadoopRDD()方法,對於Hadoop的自定義輸入型別,可以建立RDD。該方法接收JobConf、InputFormatClass、Key和Value的Class。4、SparkContext.objectFile()方法,可以針對之前呼叫RDD.saveAsObjectFile()建立的物件序列化的檔案,反序列化檔案中的資料,並建立一個RDD。2.1、Java
package sparkcore.java;import org.apache.spark.SparkConf;import org.apache.spark.api.java.JavaRDD;import org.apache.spark.api.java.JavaSparkContext;import org.apache.spark.api.java.function.Function;import org.apache.spark.api.java.function.Function2;/** * 使用本地檔案建立RDD 案例:統計文字檔案字數 */publicclass LocalFile { public static void main(String[] args) { // 建立SparkConf SparkConf conf = new SparkConf().setAppName("LocalFile").setMaster("local"); // 建立JavaSparkContext JavaSparkContext sc = new JavaSparkContext(conf); // 使用SparkContext以及其子類的textFile()方法,針對本地檔案建立RDD JavaRDD<String> lines = sc.textFile("D:/eclipse-jee-neon-3/workspace/sparkcore-java/test.txt"); // 統計文字檔案內的字數 JavaRDD<Integer> lineLength = lines.map(new Function<String, Integer>() { private static final long serialVersionUID = 1L; public Integer call(String v1) throws Exception { return v1.length(); } }); int count = lineLength.reduce(new Function2<Integer, Integer, Integer>() { private static final long serialVersionUID = 1L; public Integer call(Integer v1, Integer v2) throws Exception { return v1 + v2; } }); System.out.println("檔案總字數是:" + count); // 關閉JavaSparkContext sc.close(); }}package sparkcore.java;import org.apache.spark.SparkConf;import org.apache.spark.api.java.JavaRDD;import org.apache.spark.api.java.JavaSparkContext;import org.apache.spark.api.java.function.Function;import org.apache.spark.api.java.function.Function2;/** * 使用HDFS檔案建立RDD * 案例:統計文字檔案字數 */publicclass HDFSFile { public static void main(String[] args) { // 建立SparkConf // 修改:去除setMaster()設定,修改setAppName() SparkConf conf = new SparkConf() .setAppName("HDFSFile"); // 建立JavaSparkContext JavaSparkContext sc = new JavaSparkContext(conf); // 使用SparkContext以及其子類的textFile()方法,針對HDFS檔案建立RDD // 只要把textFile()內的路徑修改為hdfs檔案路徑即可 JavaRDD<String> lines = sc.textFile("hdfs://node1:8020/test.txt"); // 統計文字檔案內的字數 JavaRDD<Integer> lineLength = lines.map(new Function<String, Integer>() { private static final long serialVersionUID = 1L; @Override public Integer call(String v1) throws Exception { return v1.length(); } }); int count = lineLength.reduce(new Function2<Integer, Integer, Integer>() { private static final long serialVersionUID = 1L; @Override public Integer call(Integer v1, Integer v2) throws Exception { return v1 + v2; } }); System.out.println("檔案總字數是:" + count); // 關閉JavaSparkContext sc.close(); }} 2.2、Scalapackage sparkcore.scalaimport org.apache.spark.SparkConfimport org.apache.spark.SparkContextobject LocalFile { def main(args: Array[String]) { val conf = new SparkConf() .setAppName("LocalFile").setMaster("local"); val sc = new SparkContext(conf) val lines = sc.textFile("D:/eclipse-jee-neon-3/workspace/sparkcore-scala/test.txt", 1); val count = lines.map { line => line.length() }.reduce(_ + _) println("file's count is " + count) }}package sparkcore.scalaimport org.apache.spark.SparkConfimport org.apache.spark.SparkContextobject HDFSFile { def main(args: Array[String]) { val conf = new SparkConf() .setAppName("HDFSFile").setMaster("local") val sc = new SparkContext(conf) val lines = sc.textFile("hdfs://node1:8020/test.txt", 1); val count = lines.map { _.length() }.reduce(_ + _) println("file's count is " + count) }}