1. 程式人生 > >Spark之Scala語言常見應用舉例

Spark之Scala語言常見應用舉例

       作為一個初學者,初次學習Spark,分享一下自己的心得。

        在學習Spark程式設計時,首先得準備編譯環境,確定程式語言,本人用的是Scala語言,IntelliJ IDEA編譯環境,同時得準備四個包,分別是:spark-assembly-1.3.1-hd-2.6.0.jar、scala-compiler.jar、scala-library.jar、scala-reflect.jar。將這四個包匯入,才能開始自己的Scala程式設計之旅。

       由於Hadoop環境沒有搭建好,所以在練習Scala程式設計的時候,就不能再Hadoop之上讀取HDFS的資料,不過不礙事,為了練習程式設計,我們可以讀取本地的txt檔案,然後將結果儲存到txt中,這樣不僅能感受到Spark RDD的強大,也能達到我們練習程式設計的目的。下來主要是用例項說明一下Spark RDD常用的一下操作。

       首先我們得配置SparkConf(),一般是讀取HDFS上的檔案,但是這裡讀取本地txt檔案,配置SparkConf()如下:

<span style="font-size:18px;"><span style="font-size:18px;">conf=new SparkConf().setAppName("Test").setMaster("local[4]")</span></span>

        解釋一下:Local[N]:本地模式,使用 N 個執行緒。

        下面這個程式是使用count()統計行數

<span style="font-size:18px;">object yb {
  /*
   統計行內容出現的次數,即相同行一共出現多少次
   */
  def main(args: Array[String]): Unit={
    val conf=new SparkConf().setAppName("Test").setMaster("local[4]")
    val sc = new SparkContext(conf)
    val lines = sc.textFile("E:/Spark/天池大資料/data_format1/yb.txt")
    val countx=lines.count()//統計行數
    println(countx)//輸出: 10485750 
  }
}</span>

       統計詞頻並按照詞頻排序:

<span style="font-size:18px;">object yb {
  def main(args: Array[String]): Unit={

    val conf=new SparkConf().setAppName("Test").setMaster("local[4]")
    val sc = new SparkContext(conf)
    val lines = sc.textFile("E:/Spark/天池大資料/data_format1/100W.txt")
    /*
    sortByKey引數有二個。1、true(升序),否則反之。2、標識分片數(分割槽數)
    flatMap相當於獲取一個朋友物件的列表。
     */
    val worldcounts=lines.flatMap(line => line.split(" ")).map(word => (word,1)).reduceByKey((a,b) => a+b).map{case (key,value)=>(value,key)}.sortByKey(true,1)
    worldcounts.foreach(println)
  }
}</span>
        Map()和flatMap()區別:
<span style="font-size:18px;"><span style="font-size:18px;">object yb{
   def main (args: Array[String]) {
     val m=List(List(1,2),List(3,4))
     println(m.map(x=>x))
     println(m)
     val x=m.flatten
     println(x)
     println(m.flatMap(x =>x))
  }</span></span>
      由上面程式可得flatMap是由Mapflatten綜合而來,同時也可以發現flatMap最終都會輸出一串的序列,而Map輸出是多個集合。

       union()用法:

<span style="font-size:18px;"><span style="font-size:18px;">object yb{
  def main(args: Array[String]) {
    val m1=List(List(1,2),List(3,4))
    val m2=List(List(1,2),List(3,4))
    val unionx=m1.union(m2)//把兩個資料集聯合起來
    println(unionx)
    val mx1=List(1,2)
    val mx2=List(3,4)
    val unionxx=mx1.union(mx2)//把兩個資料集聯合起來
    println(unionxx)
  }
}</span></span>
      笛卡爾積cartesian()用法:
<span style="font-size:18px;"><span style="font-size:18px;">object yb{
  def main(args: Array[String]) {
    val conf=new SparkConf().setAppName("Test").setMaster("local[4]")
    val sc = new SparkContext(conf)
    val data1=sc.parallelize(List(1,2,3))//並行化,因為笛卡爾積是操作在RDD上的,所以必須是RDD的資料。
    val data2=sc.parallelize(List(4,5,6))
    data1.cartesian(data2).foreach(println)
  }
}</span></span>
      groupByKey()和 reduceByKey()區別:
<span style="font-size:18px;"><span style="font-size:18px;">object yb {
  def main(args: Array[String]){
    val conf=new SparkConf().setAppName("Test").setMaster("local[4]")
    val sc = new SparkContext(conf)
    val lines = sc.textFile("E:/Spark/天池大資料/data_format1/100W.txt")
    /*
    sortByKey引數有二個。1、true(升序),否則反之。2、標識分片數(分割槽數)
     */
    val worldcounts=lines.flatMap(line => line.split(" ")).map(word => (word,1)).reduceByKey((a,b) => a+b).map{case (key,value)=>(value,key)}.sortByKey(false,1)//按照從大到小的順序排序
    val topK=worldcounts.top(10)
    topK.foreach(println)//輸出排名前十的詞頻
   
  }
}</span></span>
    groupByKey不在本地merge統一在主節點merge
    reduceByKey在本地merge然後在到主節點merge

      reduce ()用法:

<span style="font-size:18px;"><span style="font-size:18px;">object yb{
  def main(args: Array[String]) {
    val data=List(1,2,3,4)
   val sum=data.reduce((x,y)=>x+y)
    println(sum)//輸出:10
  }
}//reduce將RDD中元素兩兩傳遞給輸入函式,同時產生一個新的值,新產生的值與RDD中下一個元素再被傳遞給輸入函式直到最後只有一個值為止
    其本質相當於一個滿二叉樹的左右孩子相加賦給根節點的過程。</span></span>