1. 程式人生 > >使用java和scala編寫spark-WordCount示例

使用java和scala編寫spark-WordCount示例

前言:

    最近博主在學習spark相關知識,感覺是個挺不錯的框架,它的分散式處理大資料集的思想還是值得我們好好學習的。

    個人感覺以後java開發肯定不僅僅是SSM這一套東西了,當資料量越來越大時,我們需要學習使用這些大資料工具。

    本次部落格學習使用java和scala兩種方式來開發spark的wordCount示例

    由於採用spark的local模式,所以我們可以完全不用啟動spark,使用eclipse,新增spark相關jar包在本地跑就可以了

 

準備工作:

    1.準備資料

        在本地建立spark.txt檔案,並新增一些語句

    2.eclipse工具,用於java開發

    3.scala ide for eclipse用於scala開發

        下載介面:http://scala-ide.org/download/sdk.html  

    4.本地安裝JDK8(由於筆者使用的spark版本為2.2.0,故需要jdk8及以上版本)

    5.本地安裝scala(一定要注意,scala的版本需要與spark的版本匹配,當spark版本為2.2.0時,scala版本為2.11,不能太高也不能低,一定要注意,否則建立scala project會報錯

 

 

1.java開發wordCount程式(使用工具eclipse)

    1)建立maven專案 spark

    2)新增Maven依賴

<properties>
		<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
		<scala.version>2.11</scala.version>
		<spark.version>2.2.0</spark.version>
	</properties>

	<dependencies>
		<dependency>
			<groupId>org.apache.spark</groupId>
			<artifactId>spark-core_${scala.version}</artifactId>
			<version>${spark.version}</version>
		</dependency>
		<dependency>
			<groupId>org.apache.spark</groupId>
			<artifactId>spark-streaming_${scala.version}</artifactId>
			<version>${spark.version}</version>
		</dependency>
		<dependency>
			<groupId>org.apache.spark</groupId>
			<artifactId>spark-sql_${scala.version}</artifactId>
			<version>${spark.version}</version>
		</dependency>
		<dependency>
			<groupId>org.apache.spark</groupId>
			<artifactId>spark-hive_${scala.version}</artifactId>
			<version>${spark.version}</version>
		</dependency>
		<dependency>
			<groupId>org.apache.spark</groupId>
			<artifactId>spark-mllib_${scala.version}</artifactId>
			<version>${spark.version}</version>
		</dependency>

		<dependency>
			<groupId>junit</groupId>
			<artifactId>junit</artifactId>
			<version>3.8.1</version>
			<scope>test</scope>
		</dependency>
	</dependencies>

    3)在spark專案中新增WordCountJava類,程式碼如下

import java.util.Arrays;
import java.util.Iterator;

import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.FlatMapFunction;
import org.apache.spark.api.java.function.Function2;
import org.apache.spark.api.java.function.PairFunction;
import org.apache.spark.api.java.function.VoidFunction;

import scala.Tuple2;

public class WordCountJava {

	public static void main(String[] args) {
		// 1.建立SparkConf
		SparkConf sparkConf = new SparkConf()
				.setAppName("wordCountLocal")
				.setMaster("local");
		
		// 2.建立JavaSparkContext
		// SparkContext代表著程式入口
		JavaSparkContext sc = new JavaSparkContext(sparkConf);
		
		// 3.讀取本地檔案
		JavaRDD<String> lines = sc.textFile("C:/Users/lucky/Desktop/spark.txt");
		
		// 4.每行以空格切割
		JavaRDD<String> words = lines.flatMap(new FlatMapFunction<String, String>() {
			public Iterator<String> call(String t) throws Exception {
				return Arrays.asList(t.split(" ")).iterator();
			}
		});
		
		// 5.轉換為 <word,1>格式
		JavaPairRDD<String, Integer> pairs = words.mapToPair(new PairFunction<String, String, Integer>() {
			public Tuple2<String, Integer> call(String t) throws Exception {
				return new Tuple2<String, Integer>(t, 1);
			}
		});
		
		// 6.統計相同Word的出現頻率
		JavaPairRDD<String, Integer> wordCount = pairs.reduceByKey(new Function2<Integer, Integer, Integer>() {
			public Integer call(Integer v1, Integer v2) throws Exception {
				return v1 + v2;
			}
		});
		
		// 7.執行action,將結果打印出來
		wordCount.foreach(new VoidFunction<Tuple2<String,Integer>>() {
			public void call(Tuple2<String, Integer> t) throws Exception {
				System.out.println(t._1()+" "+t._2());
			}
		});
		
		// 8.主動關閉SparkContext 
		sc.stop();
	}
}

    4)執行(右鍵 run as java application即可)

 

 

2.scala開發wordCount程式(使用工具scala ide for eclipse)

    1)建立scala project ,命名為spark-study

    2)轉變為maven專案

        右鍵點選spark-study,選中configure,選擇其中的convert to maven project選項,等待成功即可

 

    3)新增maven依賴(同上)

    注意:筆者使用到spark版本為2.2.0,由於spark版本與scala版本不匹配,導致報以下錯誤

breeze_2.11-0.13.1.jar of spark-study build path is cross-compiled with an incompatible version of Scala (2.11.0). In case this report is mistaken, this check can be disabled in the compiler preference page.

    如果讀者看到這個報錯,就先檢查一下自己的scala版本與spark版本是否匹配,如何確定呢,可以在https://mvnrepository.com/artifact/org.apache.spark/spark-core maven倉庫中確定,

    倉庫中有spark對應的scala版本,嚴格按照這個來安裝本地的scala即可,否則報錯

 

    4)在spark-study專案中建立scala object,命名為WordCountScala,程式碼如下

object WordCountScala {
  
  def main(args: Array[String]): Unit = {
    
    // 注意選擇local模式
    val sparkConf = new SparkConf().setMaster("local").setAppName("wordCount");
    val sc = new SparkContext(sparkConf)
    
    // 讀取本地檔案
    val lines = sc.textFile("C:/Users/lucky/Desktop/spark.txt");
    val words = lines.flatMap(line => line.split(" "))
    val pairs = words.map(word => (word,1))
    val wordCounts = pairs.reduceByKey((a,b) => (a+b))
    
    // 最後執行action 操作
    wordCounts.foreach(wordcount => println(wordcount._1 +" " + wordcount._2))    
  }
}

    以上這種方式,我們同樣可以簡寫為:

val sparkConf = new SparkConf().setMaster("local").setAppName("wordCount");
val sc = new SparkContext(sparkConf)
    
sc.textFile("C:/Users/lucky/Desktop/spark.txt")
      .flatMap(line => line.split(" "))
      .map(word => (word,1))
      .reduceByKey((a,b) => (a+b))
      .foreach(wordcount => println(wordcount._1 +" " + wordcount._2))

    5)執行wordCount(右鍵run as Scala Application即可)

    可以看到在本地控制檯列印了結果