1. 程式人生 > >Spark+IDEA+WordCount

Spark+IDEA+WordCount

文章目錄

IDEA+Maven[pom.xml]

適合借鑑,各個版本不同,可做修改,以免出現錯誤!

pom.xml

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <groupId>com.awin</groupId>
    <artifactId>spark</artifactId>
    <version>1.0-SNAPSHOT</version>
    <properties>
        <spark.version>2.1.0</spark.version>
        <scala.version>2.11</scala.version>
    </properties>

    <dependencies>
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-core_${scala.version}</artifactId>
            <version>${spark.version}</version>
        </dependency>

        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-streaming_${scala.version}</artifactId>
            <version>${spark.version}</version>
        </dependency>

        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-sql_${scala.version}</artifactId>
            <version>${spark.version}</version>
        </dependency>

        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-hive_${scala.version}</artifactId>
            <version>${spark.version}</version>
        </dependency>

        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-mllib_${scala.version}</artifactId>
            <version>${spark.version}</version>
        </dependency>

    </dependencies>



</project>

使用Java開發WordCount

import java.util.Arrays;

import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.FlatMapFunction;
import org.apache.spark.api.java.function.Function2;
import org.apache.spark.api.java.function.PairFunction;
import org.apache.spark.api.java.function.VoidFunction;

import scala.Tuple2;

/**
 * 使用java開發本地測試的wordcount程式
 * @author Administrator
 *
 */
public class WordCount {
	
	public static void main(String[] args) {
		// 編寫Spark應用程式
		// 本地執行,是可以執行在eclipse中的main方法中,執行的
		
		// 第一步:建立SparkConf物件,設定Spark應用的配置資訊
		// 使用setMaster()可以設定Spark應用程式要連線的Spark叢集的master節點的url
		// 但是如果設定為local則代表,在本地執行
		SparkConf conf = new SparkConf()
				.setAppName("WordCountLocal")
				.setMaster("local");  
		
		// 第二步:建立JavaSparkContext物件
		// 在Spark中,SparkContext是Spark所有功能的一個入口,你無論是用java、scala,甚至是python編寫
			// 都必須要有一個SparkContext,它的主要作用,包括初始化Spark應用程式所需的一些核心元件,包括
			// 排程器(DAGSchedule、TaskScheduler),還會去到Spark Master節點上進行註冊,等等
		// 一句話,SparkContext,是Spark應用中,可以說是最最重要的一個物件
		// 但是呢,在Spark中,編寫不同型別的Spark應用程式,使用的SparkContext是不同的,如果使用scala,
			// 使用的就是原生的SparkContext物件
			// 但是如果使用Java,那麼就是JavaSparkContext物件
			// 如果是開發Spark SQL程式,那麼就是SQLContext、HiveContext
			// 如果是開發Spark Streaming程式,那麼就是它獨有的SparkContext
			// 以此類推
			
		JavaSparkContext sc = new JavaSparkContext(conf);
	
		// 第三步:要針對輸入源(hdfs檔案、本地檔案,等等),建立一個初始的RDD
		// 輸入源中的資料會打散,分配到RDD的每個partition中,從而形成一個初始的分散式的資料集
		// 我們這裡呢,因為是本地測試,所以呢,就是針對本地檔案
		// SparkContext中,用於根據檔案型別的輸入源建立RDD的方法,叫做textFile()方法
		// 在Java中,建立的普通RDD,都叫做JavaRDD
		// 在這裡呢,RDD中,有元素這種概念,如果是hdfs或者本地檔案呢,建立的RDD,每一個元素就相當於
		// 是檔案裡的一行
		
		JavaRDD<String> lines = sc.textFile("C://Users//Administrator//Desktop//spark.txt");
	
		// 第四步:對初始RDD進行transformation操作,也就是一些計算操作
		// 通常操作會通過建立function,並配合RDD的map、flatMap等運算元來執行
		// function,通常,如果比較簡單,則建立指定Function的匿名內部類
		// 但是如果function比較複雜,則會單獨建立一個類,作為實現這個function介面的類
		
		// 先將每一行拆分成單個的單詞
		// FlatMapFunction,有兩個泛型引數,分別代表了輸入和輸出型別
		// 我們這裡呢,輸入肯定是String,因為是一行一行的文字,輸出,其實也是String,因為是每一行的文字
		// 這裡先簡要介紹flatMap運算元的作用,其實就是,將RDD的一個元素,給拆分成一個或多個元素
		
		JavaRDD<String> words = lines.flatMap(new FlatMapFunction<String, String>() {
			
			private static final long serialVersionUID = 1L;
			
			@Override
			public Iterator<String> call(String line) throws Exception {
				return Arrays.asList(line.split(" ")).iterator();  
			}
			
		});
		
		
		// 接著,需要將每一個單詞,對映為(單詞, 1)的這種格式
			// 因為只有這樣,後面才能根據單詞作為key,來進行每個單詞的出現次數的累加
		// mapToPair,其實就是將每個元素,對映為一個(v1,v2)這樣的Tuple2型別的元素
			// 如果大家還記得scala裡面講的tuple,那麼沒錯,這裡的tuple2就是scala型別,包含了兩個值
		// mapToPair這個運算元,要求的是與PairFunction配合使用,第一個泛型引數代表了輸入型別
			// 第二個和第三個泛型引數,代表的輸出的Tuple2的第一個值和第二個值的型別
		// JavaPairRDD的兩個泛型引數,分別代表了tuple元素的第一個值和第二個值的型別
		JavaPairRDD<String, Integer> pairs = words.mapToPair(
				
				new PairFunction<String, String, Integer>() {

					private static final long serialVersionUID = 1L;
		
					@Override
					public Tuple2<String, Integer> call(String word) throws Exception {
						return new Tuple2<String, Integer>(word, 1);
					}
					
				});
		
		// 接著,需要以單詞作為key,統計每個單詞出現的次數
		// 這裡要使用reduceByKey這個運算元,對每個key對應的value,都進行reduce操作
		// 比如JavaPairRDD中有幾個元素,分別為(hello, 1) (hello, 1) (hello, 1) (world, 1)
		// reduce操作,相當於是把第一個值和第二個值進行計算,然後再將結果與第三個值進行計算
		// 比如這裡的hello,那麼就相當於是,首先是1 + 1 = 2,然後再將2 + 1 = 3
		// 最後返回的JavaPairRDD中的元素,也是tuple,但是第一個值就是每個key,第二個值就是key的value
		// reduce之後的結果,相當於就是每個單詞出現的次數
		JavaPairRDD<String, Integer> wordCounts = pairs.reduceByKey(
				
				new Function2<Integer, Integer, Integer>() {
					
					private static final long serialVersionUID = 1L;
		
					@Override
					public Integer call(Integer v1, Integer v2) throws Exception {
						return v1 + v2;
					}
					
				});
		
		// 到這裡為止,我們通過幾個Spark運算元操作,已經統計出了單詞的次數
		// 但是,之前我們使用的flatMap、mapToPair、reduceByKey這種操作,都叫做transformation操作
		// 一個Spark應用中,光是有transformation操作,是不行的,是不會執行的,必須要有一種叫做action
		// 接著,最後,可以使用一種叫做action操作的,比如說,foreach,來觸發程式的執行
		wordCounts.foreach(new VoidFunction<Tuple2<String,Integer>>() {
			
			private static final long serialVersionUID = 1L;
			
			@Override
			public void call(Tuple2<String, Integer> wordCount) throws Exception {
				System.out.println(wordCount._1 + " appeared " + wordCount._2 + " times.");    
			}
			
		});
		
		sc.close();
	}
	
}

Scala版WordCount

import org.apache.spark.SparkConf
import org.apache.spark.SparkContext

/**
 * @author zhangyang
 */
object WordCount {
  
  def main(args: Array[String]) {
    val conf = new SparkConf()
        .setAppName("WordCount");
    val sc = new SparkContext(conf)
  
    val lines = sc.textFile("hdfs://spark1:9000/spark.txt", 1); //分割槽
    val words = lines.flatMap { line => line.split(" ") }   
    val pairs = words.map { word => (word, 1) }   
    val wordCounts = pairs.reduceByKey { _ + _ }
    
    wordCounts.foreach(wordCount => println(wordCount._1 + " appeared " + wordCount._2 + " times."))  
  }
  
}