idea開發第一個spark程式---統計文字單詞數
阿新 • • 發佈:2018-12-09
在建立專案之前確保自己本地安裝好了scala環境和java環境,因為spark是scala編寫的,scala和java一樣都是需要編譯成位元組碼,然後在JVM裡面執行。我本地的scala版本是2.11.0版本,hadoop是2.7.6版本
第一步:開啟idea,然後建立一個maven專案
在pom裡面加入如下依賴:
<properties> <spark.version>2.3.1</spark.version> <scala.version>2.11</scala.version> <geotools.version>20-SNAPSHOT</geotools.version> </properties> <dependencies> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-core_${scala.version}</artifactId> <version>${spark.version}</version> </dependency> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-streaming_${scala.version}</artifactId> <version>${spark.version}</version> </dependency> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-sql_${scala.version}</artifactId> <version>${spark.version}</version> </dependency> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-hive_${scala.version}</artifactId> <version>${spark.version}</version> </dependency> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-mllib_${scala.version}</artifactId> <version>${spark.version}</version> </dependency> <dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-client</artifactId> <version>2.7.6</version> </dependency> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-streaming-kafka_${scala.version}</artifactId> <version>1.6.3</version> </dependency> <dependency> <groupId>com.esri.geometry</groupId> <artifactId>esri-geometry-api</artifactId> <version>1.2.1</version> </dependency> <dependency> <groupId>com.googlecode.json-simple</groupId> <artifactId>json-simple</artifactId> <version>1.1.1</version> </dependency> </dependencies> <build> <plugins> <plugin> <groupId>org.scala-tools</groupId> <artifactId>maven-scala-plugin</artifactId> <version>2.15.2</version> <executions> <execution> <goals> <goal>compile</goal> <goal>testCompile</goal> </goals> </execution> </executions> </plugin> <plugin> <artifactId>maven-compiler-plugin</artifactId> <version>3.6.0</version> <configuration> <source>1.8</source> <target>1.8</target> </configuration> </plugin> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-surefire-plugin</artifactId> <version>2.19</version> <configuration> <skip>true</skip> </configuration> </plugin> </plugins> </build>
第二步:在編輯器點選File-->Project Structure 點選下圖的Scala SDK,瀏覽進入本地的scala安裝位置,匯入scala庫
第三步:書寫java版的類:
import org.apache.spark.SparkConf; import org.apache.spark.api.java.JavaPairRDD; import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaSparkContext; import org.apache.spark.api.java.function.FlatMapFunction; import org.apache.spark.api.java.function.Function2; import org.apache.spark.api.java.function.PairFunction; import org.apache.spark.api.java.function.VoidFunction; import scala.Tuple2; import java.util.Arrays; import java.util.Iterator; /** * Created by zhanglu on 2018/9/7. */ public class WordCount { public static void main(String[] args) { // 編寫spark程式 // 第一步:建立sparkConf物件,設定spark的配置資訊 SparkConf sparkConf=new SparkConf(); sparkConf.setAppName("WordCount"); sparkConf.setMaster("local"); // 第二步:建立javaSparkContext物件,初始化spark的各種元件 JavaSparkContext javaSparkContext=new JavaSparkContext(sparkConf); // 第三步:針對輸入的資料來源(hdfs,本地檔案)建立一個RDD,輸入資料會分配到RDD的各個分割槽上面 // 形成一個初始的分散式資料集。textFile()通過輸入資料的型別建立RDD,檔案裡面的每一行就相當於RDD裡面的每一個元素 JavaRDD<String> javaRDD=javaSparkContext.textFile("E://個人/word_count.txt"); // 第四步:對初始的RDD進行transformation,就是計算操作 // 將每一行元素拆成單個單詞,通常操作會通過建立一個function,並配合RDD的map.flatMap等運算元來操作 JavaRDD<String> counts=javaRDD.flatMap(new FlatMapFunction<String, String>() { private static final long serialVersionUID=1L; @Override public Iterator<String> call(String s) throws Exception { return Arrays.asList(s.split("")).iterator(); } }); // 將每個單詞對映成(單詞,1)的tuple形式 JavaPairRDD<String,Integer> pairs=counts.mapToPair(new PairFunction<String, String, Integer>() { private static final long serialVersionUID=1L; @Override public Tuple2<String, Integer> call(String s) throws Exception { return new Tuple2<String,Integer>(s,1); } }); // 將上面tuple進行reduce操作,迭代計算 JavaPairRDD<String,Integer> wordCounts=pairs.reduceByKey(new Function2<Integer, Integer, Integer>() { private static final long serialVersionUID=1L; @Override public Integer call(Integer integer, Integer integer2) throws Exception { return integer+integer2; } }); // 以上flatMap和mapToPair和reduceBykey都是transformation操作,spark程式還需要action操作,程式才可以執行 wordCounts.foreach(new VoidFunction<Tuple2<String, Integer>>() { private static final long serialVersionUID=1L; @Override public void call(Tuple2<String, Integer> stringIntegerTuple2) throws Exception { System.out.println(stringIntegerTuple2._1+" appeared "+stringIntegerTuple2._2+" times "); } }); javaSparkContext.close(); } }