1. 程式人生 > >Spark學習筆記之WordCount

Spark學習筆記之WordCount

1.pom.xml

 <dependencies>
        <!-- 匯入scala的依賴 -->
        <dependency>
            <groupId>org.scala-lang</groupId>
            <artifactId>scala-library</artifactId>
            <version>${scala.version}</version>
        </dependency>
        <!-- 匯入spark的依賴 -->
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-core_2.11</artifactId>
            <version>${spark.version}</version>
        </dependency>
        <!--spark sql依賴-->
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-sql_2.11</artifactId>
            <version>${spark.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-streaming_2.11</artifactId>
            <version>${spark.version}</version>
        </dependency>
        <!-- 指定hadoop-client API的版本 -->
        <dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-client</artifactId>
            <version>${hadoop.version}</version>
        </dependency>
        <!-- https://mvnrepository.com/artifact/mysql/mysql-connector-java -->
        <dependency>
            <groupId>mysql</groupId>
            <artifactId>mysql-connector-java</artifactId>
            <version>6.0.6</version>
        </dependency>
        <!-- https://mvnrepository.com/artifact/org.apache.kafka/kafka -->
        <dependency>
            <groupId>org.apache.kafka</groupId>
            <artifactId>kafka_2.11</artifactId>
            <version>0.8.2.1</version>
        </dependency>
        <!--spark streaming 和 -->
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-streaming-kafka-0-8_2.11</artifactId>
            <version>${spark.version}</version>
        </dependency>
        <dependency>
            <groupId>redis.clients</groupId>
            <artifactId>jedis</artifactId>
            <version>2.9.0</version>
        </dependency>
    </dependencies>

2.wordcount

package com.htkj.spark.wordcount;


import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.FlatMapFunction;
import org.apache.spark.api.java.function.Function2;
import org.apache.spark.api.java.function.PairFunction;
import org.apache.spark.api.java.function.VoidFunction;
import scala.Tuple2;

import java.util.Arrays;
import java.util.Iterator;


public class WordCount{

    public static void main(String[] args) {
        // 編寫Spark應用程式
        // 本地執行,是可以執行在main方法中,執行的

        // 第一步:建立SparkConf物件,設定Spark應用的配置資訊
        // 使用setMaster()可以設定Spark應用程式要連線的Spark叢集的master節點的url
        // 如果設定為local則代表,在本地執行
        SparkConf conf = new SparkConf()
                .setAppName("WordCountLocal")
                .setMaster("local");

        // 第二步:建立JavaSparkContext物件
        // 在Spark中,SparkContext是Spark所有功能的一個入口,你無論是用java、scala,python編寫
        // 都必須要有一個SparkContext,它的主要作用,包括初始化Spark應用程式所需的一些核心元件,包括
        // 排程器(DAGSchedule、TaskScheduler),還會去到Spark Master節點上進行註冊,等等
        // SparkContext,是Spark應用中,可以說是最最重要的一個物件
        // 但是呢,在Spark中,編寫不同型別的Spark應用程式,使用的SparkContext是不同的,如果使用scala,
        // 使用的就是原生的SparkContext物件
        // 但是如果使用Java,那麼就是JavaSparkContext物件
        // 如果是開發Spark SQL程式,那麼就是SQLContext、HiveContext
        // 如果是開發Spark Streaming程式,那麼就是它獨有的SparkContext
        // 以此類推
        JavaSparkContext sc = new JavaSparkContext(conf);

        // 第三步:要針對輸入源(hdfs檔案、本地檔案,等等),建立一個初始的RDD
        // 輸入源中的資料會打散,分配到RDD的每個partition中,從而形成一個初始的分散式的資料集
        // 因為是本地測試,針對本地檔案
        // SparkContext中,用於根據檔案型別的輸入源建立RDD的方法,叫做textFile()方法
        // 在Java中,建立的普通RDD,都叫做JavaRDD
        // 在這裡呢,RDD中,有元素這種概念,如果是hdfs或者本地檔案呢,建立的RDD,每一個元素就相當於
        // 是檔案裡的一行
        JavaRDD<String> lines = sc.textFile("C://Users//Administrator//Desktop//spark.txt");

        // 第四步:對初始RDD進行transformation操作,也就是一些計算操作
        // 通常操作會通過建立function,並配合RDD的map、flatMap等運算元來執行
        // function,通常,如果比較簡單,則建立指定Function的匿名內部類
        // 但是如果function比較複雜,則會單獨建立一個類,作為實現這個function介面的類

        // 先將每一行拆分成單個的單詞
        // FlatMapFunction,有兩個泛型引數,分別代表了輸入和輸出型別
        // 我們這裡呢,輸入肯定是String,因為是一行一行的文字,輸出,其實也是String,因為是每一行的文字
        // 這裡先簡要介紹flatMap運算元的作用,其實就是,將RDD的一個元素,給拆分成一個或多個元素
        JavaRDD<String> words = lines.flatMap(new FlatMapFunction<String, String>() {

            private static final long serialVersionUID = 1L	;

            @Override
            public Iterator<String> call(String line) throws Exception {
                return Arrays.asList(line.split(" ")).iterator();
            }

        });

        // 接著,需要將每一個單詞,對映為(單詞, 1)的這種格式
        // 因為只有這樣,後面才能根據單詞作為key,來進行每個單詞的出現次數的累加
        // mapToPair,其實就是將每個元素,對映為一個(v1,v2)這樣的Tuple2型別的元素
        // 這裡的tuple2是scala型別,包含了兩個值
        // mapToPair這個運算元,要求的是與PairFunction配合使用,第一個泛型引數代表了輸入型別
        // 第二個和第三個泛型引數,代表的輸出的Tuple2的第一個值和第二個值的型別
        // JavaPairRDD的兩個泛型引數,分別代表了tuple元素的第一個值和第二個值的型別
        JavaPairRDD<String, Integer> pairs = words.mapToPair(

                new PairFunction<String, String, Integer>() {

                    private static final long serialVersionUID = 1L;

                    @Override
                    public Tuple2<String, Integer> call(String word) throws Exception {
                        return new Tuple2<String, Integer>(word, 1);
                    }

                });

        // 接著,需要以單詞作為key,統計每個單詞出現的次數
        // 這裡要使用reduceByKey這個運算元,對每個key對應的value,都進行reduce操作
        // 比如JavaPairRDD中有幾個元素,分別為(hello, 1) (hello, 1) (hello, 1) (world, 1)
        // reduce操作,相當於是把第一個值和第二個值進行計算,然後再將結果與第三個值進行計算
        // 比如這裡的hello,那麼就相當於是,首先是1 + 1 = 2,然後再將2 + 1 = 3
        // 最後返回的JavaPairRDD中的元素,也是tuple,但是第一個值就是每個key,第二個值就是key的value
        // reduce之後的結果,相當於就是每個單詞出現的次數
        JavaPairRDD<String, Integer> wordCounts = pairs.reduceByKey(

                new Function2<Integer, Integer, Integer>() {

                    private static final long serialVersionUID = 1L;

                    @Override
                    public Integer call(Integer v1, Integer v2) throws Exception {
                        return v1 + v2;
                    }

                });

        // 到這裡為止,通過幾個Spark運算元操作,已經統計出了單詞的次數
        // 但是,之前使用的flatMap、mapToPair、reduceByKey這種操作,都叫做transformation操作
        // 一個Spark應用中,光是有transformation操作,是不行的,是不會執行的,必須要有一種叫做action
        // 最後,可以使用一種叫做action操作的,比如說,foreach,來觸發程式的執行
        wordCounts.foreach(new VoidFunction<Tuple2<String,Integer>>() {

            private static final long serialVersionUID = 1L;

            @Override
            public void call(Tuple2<String, Integer> wordCount) throws Exception {
                System.out.println(wordCount._1 + " appeared " + wordCount._2 + " times.");
            }

        });

        sc.close();
    }

}