Spark學習筆記:四、WordCount字頻統計入門程式(基於IntelliJ IDEA使用Scala+SBT)
一、環境準備:
Ubuntu16.04
IDEA Ultimate(破解版、教育版)
Java JDK 1.8
Hadoop2.7 (偽單機模式)
Spark 2.1.0
Hadoop與Spark的安裝過程本文省略
二、IDEA + SBT
IDEA安裝Scala外掛(自帶sbt),新建scala sbt專案
配置專案中的build.sbt
name := "SBTTest"
version := "1.0"
scalaVersion := "2.11.8"
libraryDependencies += "org.apache.spark" %% "spark-core" % "2.1.0"
編輯後,在IDEA介面右下角彈出框中選擇Enable auto-import
,等待……(大概十幾分鍾吧)
.
.
.
然後建立WordCount.scala檔案:
import org.apache.spark.SparkContext
import org.apache.spark.SparkContext._
import org.apache.spark.SparkConf
import org.apache.log4j.{Level,Logger}
object WordCount {
def main(args: Array[String]): Unit = {
// 遮蔽日誌
Logger.getLogger ("org.apache.spark").setLevel(Level.WARN)
Logger.getLogger("org.eclipse.jetty.server").setLevel(Level.OFF)
val inputFile = "file:///home/hadoop/word.txt"
val conf = new SparkConf().setAppName("WordCount").setMaster("local[1]")
val sc = new SparkContext(conf)
val textFile = sc.textFile (inputFile)
val wordCount = textFile.flatMap(line => line.split(" ")).map(word => (word, 1)).reduceByKey((a, b) => a + b)
wordCount.foreach(println)
}
}
程式碼解釋:
textFile包含了多行文字內容,textFile.flatMap(line => line.split(” “))會遍歷textFile中的每行文字內容,當遍歷到其中一行文字內容時,會把文字內容賦值給變數line,並執行Lamda表示式line => line.split(” “)。line => line.split(” “)是一個Lamda表示式,左邊表示輸入引數,右邊表示函式裡面執行的處理邏輯,這裡執行line.split(” “),也就是針對line中的一行文字內容,採用空格作為分隔符進行單詞切分,從一行文字切分得到很多個單詞構成的單詞集合。這樣,對於textFile中的每行文字,都會使用Lamda表示式得到一個單詞集合,最終,多行文字,就得到多個單詞集合。textFile.flatMap()操作就把這多個單詞集合“拍扁”得到一個大的單詞集合。
然後,針對這個大的單詞集合,執行map()操作,也就是map(word => (word, 1)),這個map操作會遍歷這個集合中的每個單詞,當遍歷到其中一個單詞時,就把當前這個單詞賦值給變數word,並執行Lamda表示式word => (word, 1),這個Lamda表示式的含義是,word作為函式的輸入引數,然後,執行函式處理邏輯,這裡會執行(word, 1),也就是針對輸入的word,構建得到一個tuple,形式為(word,1),key是word,value是1(表示該單詞出現1次)。
程式執行到這裡,已經得到一個RDD,這個RDD的每個元素是(key,value)形式的tuple。最後,針對這個RDD,執行reduceByKey((a, b) => a + b)操作,這個操作會把所有RDD元素按照key進行分組,然後使用給定的函式(這裡就是Lamda表示式:(a, b) => a + b),對具有相同的key的多個value進行reduce操作,返回reduce後的(key,value),比如(“hadoop”,1)和(“hadoop”,1),具有相同的key,進行reduce以後就得到(“hadoop”,2),這樣就計算得到了這個單詞的詞頻。