1. 程式人生 > >scala wordcount

scala wordcount

一.不使用spark

 1 package cn.scala_base.collection
 2 import scala.io.Source.fromFile;
 3 import scala.io.Source
 4 import scala.collection.mutable.Map
 5 
 6 /**
 7  * 藉助scala實現wordcount
 8  */
 9 object WordCount {
10   val wordMap = Map[String, Int]();
11 
12   def putMap(tuple: Tuple2[Array[String], Int]) {
13 val arr = tuple._1; 14 for (x <- arr) { 15 if (wordMap.contains(x)) { 16 var count = wordMap(x).toInt + 1; 17 wordMap(x) = count; 18 } else { 19 wordMap += (x -> 1); 20 } 21 } 22 23 } 24 25 def putMap2(tuple: Tuple2[String, Int]) {
26 val str = tuple._1; 27 if (wordMap.contains(str)) { 28 var count = wordMap(str).toInt + 1; 29 wordMap(str) = count; 30 } else { 31 wordMap += (str -> 1); 32 } 33 34 } 35 36 def main(args: Array[String]): Unit = { 37 38 //讀取文字 39 val text1 = Source.fromFile("
D:/inputword/hello.txt", "gbk").getLines(); 40 val text2 = Source.fromFile("D:/inputword/one.txt", "gbk").getLines(); 41 val text3 = Source.fromFile("D:/inputword/two.txt", "gbk").getLines(); 42 43 /** 44 * 單詞總數;35 45 * atguigu 12 46 * hadoop 7 47 * hello 5 48 * spark 6 49 * world 5 50 * 51 */ 52 53 /* //統計每個檔案的總單詞數 54 val res1 = List(text1,text2,text3).map(_.map(_.mkString).flatMap(_.split(" ")).map((_,1)).map(_._2).reduceLeft(_+_)); 55 println(res1);//List(18, 10, 7)*/ 56 57 //精確統計三個檔案中每個單詞出現的總次數 58 59 //如果資料來源是iternator,最後一步應該使用foreach對元素進行操作 60 val res = List(text1, text2, text3).map(_.map(_.split(" ")).map((_, 1)).foreach(putMap(_))); 61 62 //或者 flatMap把切割後的陣列中的元素取出,變成單個的字串 63 // val res = List(text1,text2,text3).map(_.flatMap(_.split(" ")).map((_,1)).foreach(putMap2(_)) ) 64 65 //遍歷 66 for (key <- wordMap.keySet) { 67 println(key + ":" + wordMap(key)); 68 } 69 70 } 71 72 }

 

 

二.在spark叢集上執行wordcount

新建一個maven工程

pom.xml

 1 <dependencies>
 2           <dependency>
 3             <groupId>junit</groupId>
 4             <artifactId>junit</artifactId>
 5             <version>4.9</version>
 6         </dependency>
 7   
 8         <dependency>
 9             <groupId>org.apache.spark</groupId>
10             <artifactId>spark-core_2.11</artifactId>
11             <version>2.0.2</version>
12         </dependency>
13           
14         <dependency>
15             <groupId>org.apache.spark</groupId>
16             <artifactId>spark-sql_2.11</artifactId>
17             <version>2.0.2</version>
18         </dependency>
19           
20         <dependency>
21             <groupId>org.apache.spark</groupId>
22             <artifactId>spark-hive_2.11</artifactId>
23             <version>2.0.2</version>
24             <scope>provided</scope>
25         </dependency>
26         
27         <dependency>
28             <groupId>io.hops</groupId>
29             <artifactId>hadoop-client</artifactId>
30             <version>2.7.3</version>
31             <scope>provided</scope>
32         </dependency>
33   </dependencies>
34   
35   <build>
36     <plugins>
37             <plugin>
38                 <groupId>org.scala-tools</groupId>
39                 <artifactId>maven-scala-plugin</artifactId>
40                 <version>2.15.2</version>
41                 <executions>
42                     <execution>
43                         <goals>
44                             <goal>compile</goal>
45                             <goal>testCompile</goal>
46                         </goals>
47                     </execution>
48                 </executions>
49             </plugin>
50       </plugins>
51    </build>
 1 object WordCountCluster {
 2   def main(args: Array[String]): Unit = {
 3     
 4     val conf = new SparkConf().setAppName("WordCountCluster");
 5     
 6     val sc = new SparkContext(conf);
 7     
 8     val lines = sc.textFile("hdfs://hadoop002:9000/word.txt",1);
 9     
10     //切割
11     val fields = lines.flatMap(_.split(" "));
12     
13     //對映成元組
14     val wordTuple = fields.map((_,1));
15     
16     //統計                                 
17     val result = wordTuple.reduceByKey(_+_);
18     result.foreach(r => println(r._1+":"+r._2));
19     
20   }
21 }

匯出jar並上傳,同時上傳word.txt到hdfs上

編寫scalawordcount.sh

1 /opt/module/spark-2.0.2-bin-hadoop2.7/bin/spark-submit \
2 --class spark_base.wordcount.WordCountCluster \
3 --num-executors 3 \
4 --driver-memory 800m \
5 --executor-memory 1000m \
6 --executor-cores 3 \
7 /opt/module/spark-test/scala/scala-wc.jar \

chmod 777 scalawordcount.sh

./scalawordcount.sh