Spark+IDEA+WordCount
阿新 • • 發佈:2018-12-16
文章目錄
IDEA+Maven[pom.xml]
適合借鑑,各個版本不同,可做修改,以免出現錯誤!
pom.xml
<?xml version="1.0" encoding="UTF-8"?> <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <groupId>com.awin</groupId> <artifactId>spark</artifactId> <version>1.0-SNAPSHOT</version> <properties> <spark.version>2.1.0</spark.version> <scala.version>2.11</scala.version> </properties> <dependencies> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-core_${scala.version}</artifactId> <version>${spark.version}</version> </dependency> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-streaming_${scala.version}</artifactId> <version>${spark.version}</version> </dependency> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-sql_${scala.version}</artifactId> <version>${spark.version}</version> </dependency> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-hive_${scala.version}</artifactId> <version>${spark.version}</version> </dependency> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-mllib_${scala.version}</artifactId> <version>${spark.version}</version> </dependency> </dependencies> </project>
使用Java開發WordCount
import java.util.Arrays; import org.apache.spark.SparkConf; import org.apache.spark.api.java.JavaPairRDD; import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaSparkContext; import org.apache.spark.api.java.function.FlatMapFunction; import org.apache.spark.api.java.function.Function2; import org.apache.spark.api.java.function.PairFunction; import org.apache.spark.api.java.function.VoidFunction; import scala.Tuple2; /** * 使用java開發本地測試的wordcount程式 * @author Administrator * */ public class WordCount { public static void main(String[] args) { // 編寫Spark應用程式 // 本地執行,是可以執行在eclipse中的main方法中,執行的 // 第一步:建立SparkConf物件,設定Spark應用的配置資訊 // 使用setMaster()可以設定Spark應用程式要連線的Spark叢集的master節點的url // 但是如果設定為local則代表,在本地執行 SparkConf conf = new SparkConf() .setAppName("WordCountLocal") .setMaster("local"); // 第二步:建立JavaSparkContext物件 // 在Spark中,SparkContext是Spark所有功能的一個入口,你無論是用java、scala,甚至是python編寫 // 都必須要有一個SparkContext,它的主要作用,包括初始化Spark應用程式所需的一些核心元件,包括 // 排程器(DAGSchedule、TaskScheduler),還會去到Spark Master節點上進行註冊,等等 // 一句話,SparkContext,是Spark應用中,可以說是最最重要的一個物件 // 但是呢,在Spark中,編寫不同型別的Spark應用程式,使用的SparkContext是不同的,如果使用scala, // 使用的就是原生的SparkContext物件 // 但是如果使用Java,那麼就是JavaSparkContext物件 // 如果是開發Spark SQL程式,那麼就是SQLContext、HiveContext // 如果是開發Spark Streaming程式,那麼就是它獨有的SparkContext // 以此類推 JavaSparkContext sc = new JavaSparkContext(conf); // 第三步:要針對輸入源(hdfs檔案、本地檔案,等等),建立一個初始的RDD // 輸入源中的資料會打散,分配到RDD的每個partition中,從而形成一個初始的分散式的資料集 // 我們這裡呢,因為是本地測試,所以呢,就是針對本地檔案 // SparkContext中,用於根據檔案型別的輸入源建立RDD的方法,叫做textFile()方法 // 在Java中,建立的普通RDD,都叫做JavaRDD // 在這裡呢,RDD中,有元素這種概念,如果是hdfs或者本地檔案呢,建立的RDD,每一個元素就相當於 // 是檔案裡的一行 JavaRDD<String> lines = sc.textFile("C://Users//Administrator//Desktop//spark.txt"); // 第四步:對初始RDD進行transformation操作,也就是一些計算操作 // 通常操作會通過建立function,並配合RDD的map、flatMap等運算元來執行 // function,通常,如果比較簡單,則建立指定Function的匿名內部類 // 但是如果function比較複雜,則會單獨建立一個類,作為實現這個function介面的類 // 先將每一行拆分成單個的單詞 // FlatMapFunction,有兩個泛型引數,分別代表了輸入和輸出型別 // 我們這裡呢,輸入肯定是String,因為是一行一行的文字,輸出,其實也是String,因為是每一行的文字 // 這裡先簡要介紹flatMap運算元的作用,其實就是,將RDD的一個元素,給拆分成一個或多個元素 JavaRDD<String> words = lines.flatMap(new FlatMapFunction<String, String>() { private static final long serialVersionUID = 1L; @Override public Iterator<String> call(String line) throws Exception { return Arrays.asList(line.split(" ")).iterator(); } }); // 接著,需要將每一個單詞,對映為(單詞, 1)的這種格式 // 因為只有這樣,後面才能根據單詞作為key,來進行每個單詞的出現次數的累加 // mapToPair,其實就是將每個元素,對映為一個(v1,v2)這樣的Tuple2型別的元素 // 如果大家還記得scala裡面講的tuple,那麼沒錯,這裡的tuple2就是scala型別,包含了兩個值 // mapToPair這個運算元,要求的是與PairFunction配合使用,第一個泛型引數代表了輸入型別 // 第二個和第三個泛型引數,代表的輸出的Tuple2的第一個值和第二個值的型別 // JavaPairRDD的兩個泛型引數,分別代表了tuple元素的第一個值和第二個值的型別 JavaPairRDD<String, Integer> pairs = words.mapToPair( new PairFunction<String, String, Integer>() { private static final long serialVersionUID = 1L; @Override public Tuple2<String, Integer> call(String word) throws Exception { return new Tuple2<String, Integer>(word, 1); } }); // 接著,需要以單詞作為key,統計每個單詞出現的次數 // 這裡要使用reduceByKey這個運算元,對每個key對應的value,都進行reduce操作 // 比如JavaPairRDD中有幾個元素,分別為(hello, 1) (hello, 1) (hello, 1) (world, 1) // reduce操作,相當於是把第一個值和第二個值進行計算,然後再將結果與第三個值進行計算 // 比如這裡的hello,那麼就相當於是,首先是1 + 1 = 2,然後再將2 + 1 = 3 // 最後返回的JavaPairRDD中的元素,也是tuple,但是第一個值就是每個key,第二個值就是key的value // reduce之後的結果,相當於就是每個單詞出現的次數 JavaPairRDD<String, Integer> wordCounts = pairs.reduceByKey( new Function2<Integer, Integer, Integer>() { private static final long serialVersionUID = 1L; @Override public Integer call(Integer v1, Integer v2) throws Exception { return v1 + v2; } }); // 到這裡為止,我們通過幾個Spark運算元操作,已經統計出了單詞的次數 // 但是,之前我們使用的flatMap、mapToPair、reduceByKey這種操作,都叫做transformation操作 // 一個Spark應用中,光是有transformation操作,是不行的,是不會執行的,必須要有一種叫做action // 接著,最後,可以使用一種叫做action操作的,比如說,foreach,來觸發程式的執行 wordCounts.foreach(new VoidFunction<Tuple2<String,Integer>>() { private static final long serialVersionUID = 1L; @Override public void call(Tuple2<String, Integer> wordCount) throws Exception { System.out.println(wordCount._1 + " appeared " + wordCount._2 + " times."); } }); sc.close(); } }
Scala版WordCount
import org.apache.spark.SparkConf import org.apache.spark.SparkContext /** * @author zhangyang */ object WordCount { def main(args: Array[String]) { val conf = new SparkConf() .setAppName("WordCount"); val sc = new SparkContext(conf) val lines = sc.textFile("hdfs://spark1:9000/spark.txt", 1); //分割槽 val words = lines.flatMap { line => line.split(" ") } val pairs = words.map { word => (word, 1) } val wordCounts = pairs.reduceByKey { _ + _ } wordCounts.foreach(wordCount => println(wordCount._1 + " appeared " + wordCount._2 + " times.")) } }