Spark學習筆記之WordCount
阿新 • • 發佈:2019-09-11
1.pom.xml
<dependencies> <!-- 匯入scala的依賴 --> <dependency> <groupId>org.scala-lang</groupId> <artifactId>scala-library</artifactId> <version>${scala.version}</version> </dependency> <!-- 匯入spark的依賴 --> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-core_2.11</artifactId> <version>${spark.version}</version> </dependency> <!--spark sql依賴--> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-sql_2.11</artifactId> <version>${spark.version}</version> </dependency> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-streaming_2.11</artifactId> <version>${spark.version}</version> </dependency> <!-- 指定hadoop-client API的版本 --> <dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-client</artifactId> <version>${hadoop.version}</version> </dependency> <!-- https://mvnrepository.com/artifact/mysql/mysql-connector-java --> <dependency> <groupId>mysql</groupId> <artifactId>mysql-connector-java</artifactId> <version>6.0.6</version> </dependency> <!-- https://mvnrepository.com/artifact/org.apache.kafka/kafka --> <dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka_2.11</artifactId> <version>0.8.2.1</version> </dependency> <!--spark streaming 和 --> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-streaming-kafka-0-8_2.11</artifactId> <version>${spark.version}</version> </dependency> <dependency> <groupId>redis.clients</groupId> <artifactId>jedis</artifactId> <version>2.9.0</version> </dependency> </dependencies>
2.wordcount
package com.htkj.spark.wordcount;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.FlatMapFunction;
import org.apache.spark.api.java.function.Function2;
import org.apache.spark.api.java.function.PairFunction;
import org.apache.spark.api.java.function.VoidFunction;
import scala.Tuple2;
import java.util.Arrays;
import java.util.Iterator;
public class WordCount{
public static void main(String[] args) {
// 編寫Spark應用程式
// 本地執行,是可以執行在main方法中,執行的
// 第一步:建立SparkConf物件,設定Spark應用的配置資訊
// 使用setMaster()可以設定Spark應用程式要連線的Spark叢集的master節點的url
// 如果設定為local則代表,在本地執行
SparkConf conf = new SparkConf()
.setAppName("WordCountLocal")
.setMaster("local");
// 第二步:建立JavaSparkContext物件
// 在Spark中,SparkContext是Spark所有功能的一個入口,你無論是用java、scala,python編寫
// 都必須要有一個SparkContext,它的主要作用,包括初始化Spark應用程式所需的一些核心元件,包括
// 排程器(DAGSchedule、TaskScheduler),還會去到Spark Master節點上進行註冊,等等
// SparkContext,是Spark應用中,可以說是最最重要的一個物件
// 但是呢,在Spark中,編寫不同型別的Spark應用程式,使用的SparkContext是不同的,如果使用scala,
// 使用的就是原生的SparkContext物件
// 但是如果使用Java,那麼就是JavaSparkContext物件
// 如果是開發Spark SQL程式,那麼就是SQLContext、HiveContext
// 如果是開發Spark Streaming程式,那麼就是它獨有的SparkContext
// 以此類推
JavaSparkContext sc = new JavaSparkContext(conf);
// 第三步:要針對輸入源(hdfs檔案、本地檔案,等等),建立一個初始的RDD
// 輸入源中的資料會打散,分配到RDD的每個partition中,從而形成一個初始的分散式的資料集
// 因為是本地測試,針對本地檔案
// SparkContext中,用於根據檔案型別的輸入源建立RDD的方法,叫做textFile()方法
// 在Java中,建立的普通RDD,都叫做JavaRDD
// 在這裡呢,RDD中,有元素這種概念,如果是hdfs或者本地檔案呢,建立的RDD,每一個元素就相當於
// 是檔案裡的一行
JavaRDD<String> lines = sc.textFile("C://Users//Administrator//Desktop//spark.txt");
// 第四步:對初始RDD進行transformation操作,也就是一些計算操作
// 通常操作會通過建立function,並配合RDD的map、flatMap等運算元來執行
// function,通常,如果比較簡單,則建立指定Function的匿名內部類
// 但是如果function比較複雜,則會單獨建立一個類,作為實現這個function介面的類
// 先將每一行拆分成單個的單詞
// FlatMapFunction,有兩個泛型引數,分別代表了輸入和輸出型別
// 我們這裡呢,輸入肯定是String,因為是一行一行的文字,輸出,其實也是String,因為是每一行的文字
// 這裡先簡要介紹flatMap運算元的作用,其實就是,將RDD的一個元素,給拆分成一個或多個元素
JavaRDD<String> words = lines.flatMap(new FlatMapFunction<String, String>() {
private static final long serialVersionUID = 1L ;
@Override
public Iterator<String> call(String line) throws Exception {
return Arrays.asList(line.split(" ")).iterator();
}
});
// 接著,需要將每一個單詞,對映為(單詞, 1)的這種格式
// 因為只有這樣,後面才能根據單詞作為key,來進行每個單詞的出現次數的累加
// mapToPair,其實就是將每個元素,對映為一個(v1,v2)這樣的Tuple2型別的元素
// 這裡的tuple2是scala型別,包含了兩個值
// mapToPair這個運算元,要求的是與PairFunction配合使用,第一個泛型引數代表了輸入型別
// 第二個和第三個泛型引數,代表的輸出的Tuple2的第一個值和第二個值的型別
// JavaPairRDD的兩個泛型引數,分別代表了tuple元素的第一個值和第二個值的型別
JavaPairRDD<String, Integer> pairs = words.mapToPair(
new PairFunction<String, String, Integer>() {
private static final long serialVersionUID = 1L;
@Override
public Tuple2<String, Integer> call(String word) throws Exception {
return new Tuple2<String, Integer>(word, 1);
}
});
// 接著,需要以單詞作為key,統計每個單詞出現的次數
// 這裡要使用reduceByKey這個運算元,對每個key對應的value,都進行reduce操作
// 比如JavaPairRDD中有幾個元素,分別為(hello, 1) (hello, 1) (hello, 1) (world, 1)
// reduce操作,相當於是把第一個值和第二個值進行計算,然後再將結果與第三個值進行計算
// 比如這裡的hello,那麼就相當於是,首先是1 + 1 = 2,然後再將2 + 1 = 3
// 最後返回的JavaPairRDD中的元素,也是tuple,但是第一個值就是每個key,第二個值就是key的value
// reduce之後的結果,相當於就是每個單詞出現的次數
JavaPairRDD<String, Integer> wordCounts = pairs.reduceByKey(
new Function2<Integer, Integer, Integer>() {
private static final long serialVersionUID = 1L;
@Override
public Integer call(Integer v1, Integer v2) throws Exception {
return v1 + v2;
}
});
// 到這裡為止,通過幾個Spark運算元操作,已經統計出了單詞的次數
// 但是,之前使用的flatMap、mapToPair、reduceByKey這種操作,都叫做transformation操作
// 一個Spark應用中,光是有transformation操作,是不行的,是不會執行的,必須要有一種叫做action
// 最後,可以使用一種叫做action操作的,比如說,foreach,來觸發程式的執行
wordCounts.foreach(new VoidFunction<Tuple2<String,Integer>>() {
private static final long serialVersionUID = 1L;
@Override
public void call(Tuple2<String, Integer> wordCount) throws Exception {
System.out.println(wordCount._1 + " appeared " + wordCount._2 + " times.");
}
});
sc.close();
}
}