使用java和scala編寫spark-WordCount示例
前言:
最近博主在學習spark相關知識,感覺是個挺不錯的框架,它的分散式處理大資料集的思想還是值得我們好好學習的。
個人感覺以後java開發肯定不僅僅是SSM這一套東西了,當資料量越來越大時,我們需要學習使用這些大資料工具。
本次部落格學習使用java和scala兩種方式來開發spark的wordCount示例
由於採用spark的local模式,所以我們可以完全不用啟動spark,使用eclipse,新增spark相關jar包在本地跑就可以了
準備工作:
1.準備資料
在本地建立spark.txt檔案,並新增一些語句
2.eclipse工具,用於java開發
3.scala ide for eclipse用於scala開發
下載介面:http://scala-ide.org/download/sdk.html
4.本地安裝JDK8(由於筆者使用的spark版本為2.2.0,故需要jdk8及以上版本)
5.本地安裝scala(一定要注意,scala的版本需要與spark的版本匹配,當spark版本為2.2.0時,scala版本為2.11,不能太高也不能低,一定要注意,否則建立scala project會報錯)
1.java開發wordCount程式(使用工具eclipse)
1)建立maven專案 spark
2)新增Maven依賴
<properties> <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding> <scala.version>2.11</scala.version> <spark.version>2.2.0</spark.version> </properties> <dependencies> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-core_${scala.version}</artifactId> <version>${spark.version}</version> </dependency> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-streaming_${scala.version}</artifactId> <version>${spark.version}</version> </dependency> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-sql_${scala.version}</artifactId> <version>${spark.version}</version> </dependency> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-hive_${scala.version}</artifactId> <version>${spark.version}</version> </dependency> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-mllib_${scala.version}</artifactId> <version>${spark.version}</version> </dependency> <dependency> <groupId>junit</groupId> <artifactId>junit</artifactId> <version>3.8.1</version> <scope>test</scope> </dependency> </dependencies>
3)在spark專案中新增WordCountJava類,程式碼如下
import java.util.Arrays;
import java.util.Iterator;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.FlatMapFunction;
import org.apache.spark.api.java.function.Function2;
import org.apache.spark.api.java.function.PairFunction;
import org.apache.spark.api.java.function.VoidFunction;
import scala.Tuple2;
public class WordCountJava {
public static void main(String[] args) {
// 1.建立SparkConf
SparkConf sparkConf = new SparkConf()
.setAppName("wordCountLocal")
.setMaster("local");
// 2.建立JavaSparkContext
// SparkContext代表著程式入口
JavaSparkContext sc = new JavaSparkContext(sparkConf);
// 3.讀取本地檔案
JavaRDD<String> lines = sc.textFile("C:/Users/lucky/Desktop/spark.txt");
// 4.每行以空格切割
JavaRDD<String> words = lines.flatMap(new FlatMapFunction<String, String>() {
public Iterator<String> call(String t) throws Exception {
return Arrays.asList(t.split(" ")).iterator();
}
});
// 5.轉換為 <word,1>格式
JavaPairRDD<String, Integer> pairs = words.mapToPair(new PairFunction<String, String, Integer>() {
public Tuple2<String, Integer> call(String t) throws Exception {
return new Tuple2<String, Integer>(t, 1);
}
});
// 6.統計相同Word的出現頻率
JavaPairRDD<String, Integer> wordCount = pairs.reduceByKey(new Function2<Integer, Integer, Integer>() {
public Integer call(Integer v1, Integer v2) throws Exception {
return v1 + v2;
}
});
// 7.執行action,將結果打印出來
wordCount.foreach(new VoidFunction<Tuple2<String,Integer>>() {
public void call(Tuple2<String, Integer> t) throws Exception {
System.out.println(t._1()+" "+t._2());
}
});
// 8.主動關閉SparkContext
sc.stop();
}
}
4)執行(右鍵 run as java application即可)
2.scala開發wordCount程式(使用工具scala ide for eclipse)
1)建立scala project ,命名為spark-study
2)轉變為maven專案
右鍵點選spark-study,選中configure,選擇其中的convert to maven project選項,等待成功即可
3)新增maven依賴(同上)
注意:筆者使用到spark版本為2.2.0,由於spark版本與scala版本不匹配,導致報以下錯誤
breeze_2.11-0.13.1.jar of spark-study build path is cross-compiled with an incompatible version of Scala (2.11.0). In case this report is mistaken, this check can be disabled in the compiler preference page.
如果讀者看到這個報錯,就先檢查一下自己的scala版本與spark版本是否匹配,如何確定呢,可以在https://mvnrepository.com/artifact/org.apache.spark/spark-core maven倉庫中確定,
倉庫中有spark對應的scala版本,嚴格按照這個來安裝本地的scala即可,否則報錯
4)在spark-study專案中建立scala object,命名為WordCountScala,程式碼如下
object WordCountScala {
def main(args: Array[String]): Unit = {
// 注意選擇local模式
val sparkConf = new SparkConf().setMaster("local").setAppName("wordCount");
val sc = new SparkContext(sparkConf)
// 讀取本地檔案
val lines = sc.textFile("C:/Users/lucky/Desktop/spark.txt");
val words = lines.flatMap(line => line.split(" "))
val pairs = words.map(word => (word,1))
val wordCounts = pairs.reduceByKey((a,b) => (a+b))
// 最後執行action 操作
wordCounts.foreach(wordcount => println(wordcount._1 +" " + wordcount._2))
}
}
以上這種方式,我們同樣可以簡寫為:
val sparkConf = new SparkConf().setMaster("local").setAppName("wordCount");
val sc = new SparkContext(sparkConf)
sc.textFile("C:/Users/lucky/Desktop/spark.txt")
.flatMap(line => line.split(" "))
.map(word => (word,1))
.reduceByKey((a,b) => (a+b))
.foreach(wordcount => println(wordcount._1 +" " + wordcount._2))
5)執行wordCount(右鍵run as Scala Application即可)
可以看到在本地控制檯列印了結果