1. 程式人生 > 實用技巧 >spark學習進度05(RDD概念、程式碼、三種建立方式)

spark學習進度05(RDD概念、程式碼、三種建立方式)

一、RDD概念

1、RDD在哪裡:

2、RDD是什麼:

是一個容錯的, 並行的資料結構, 可以讓使用者顯式地將資料儲存到磁碟和記憶體中, 並能控制資料的分割槽.RDD 作為資料結構, 本質上是一個只讀的分割槽記錄集合. 一個 RDD 可以包含多個分割槽, 每個分割槽就是一個 DataSet 片段.RDD 之間可以相互依賴, 如果 RDD 的每個分割槽最多隻能被一個子 RDD 的一個分割槽使用,則稱之為窄依賴, 若被多個子 RDD 的分割槽依賴,則稱之為寬依賴. 不同的操作依據其特性, 可能會產生不同的依賴. 例如 map 操作會產生窄依賴, 而 join 操作則產生寬依賴.

3、RDD的特點:

RDD 是一個程式設計模型

RDD 允許使用者顯式的指定資料存放在記憶體或者磁碟RDD 是分散式的,

使用者可以控制 RDD 的分割槽RDD 提供了豐富的操作RDD 提供了 map, flatMap, filter 等操作符,

用以實現 Monad 模式RDD 提供了 reduceByKey, groupByKey 等操作符,

用以操作 Key-Value 型資料RDD 提供了 max, min, mean 等操作符, 用以運算元字型的資料

RDD 是混合型的程式設計模型, 可以支援迭代計算, 關係查詢, MapReduce, 流計算

RDD 是隻讀的

RDD 之間有依賴關係, 根據執行操作的操作符的不同, 依賴關係可以分為寬依賴和窄依賴

4、補充:

RDD的分割槽:

整個 WordCount 案例的程式從結構上可以用上圖表示, 分為兩個大部分

儲存

檔案如果存放在 HDFS 上, 是分塊的, 類似上圖所示, 這個wordcount.txt分了三塊

計算

Spark 不止可以讀取 HDFS, Spark 還可以讀取很多其它的資料集, Spark 可以從資料集中創建出 RDD

例如上圖中, 使用了一個 RDD 表示 HDFS 上的某一個檔案, 這個檔案在 HDFS 中是分三塊, 那麼 RDD 在讀取的時候就也有三個分割槽, 每個 RDD 的分割槽對應了一個 HDFS 的分塊

後續 RDD 在計算的時候, 可以更改分割槽, 也可以保持三個分割槽, 每個分割槽之間有依賴關係, 例如說 RDD2 的分割槽一依賴了 RDD1 的分割槽一

RDD 之所以要設計為有分割槽的, 是因為要進行分散式計算, 每個不同的分割槽可以在不同的執行緒, 或者程序, 甚至節點中, 從而做到平行計算

二、RDD程式碼

1、SparkCore的入口SparkContext

SparkContext是 spark-core 的入口元件, 是一個 Spark 程式的入口, 在 Spark 0.x 版本就已經存在SparkContext了, 是一個元老級的 API

package cn.itcast.spark.rdd

import junit.framework.Test
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.rdd.RDD

/**
  * @Author 帶上我快跑
  * @Data 2021/1/8 19:00
  * @菩-薩-說-我-寫-的-都-對@
  */
class WordCount3 {
  def main(args: Array[String]): Unit = {

    //1、建立spark comtext
    val conf = new SparkConf().setAppName("word_count")
    val sc = new SparkContext(conf)
    //2、載入檔案
    /*Rdd的特點
    /1、Rdd是資料集
    2、RDD是程式設計模型
    3、RDD相互之間有依賴管理
    4、RDD是可以分割槽的
     */
    //準備檔案
    //讀取檔案
    val rdd1: RDD[String] = sc.textFile("hdfs://hadooplinux01:9000/data/wordcount.txt")
    //3、處理
    //把整句話拆分為單個單詞
    val rdd2: RDD[String] = rdd1.flatMap(item => item.split(" "))
    //吧每個單詞指定一個詞頻
    val rdd3: RDD[(String, Int)] = rdd2.map(item => (item,1) )
    //聚合
    val rdd4: RDD[(String, Int)] = rdd3.reduceByKey((curr, agg) => curr + agg)
    //4、得到結果
    val result = rdd4.collect()
    //列印結果
    result.foreach(item => println(item))
  }
  @Test
  def sparkContext(): Unit={
    //1.建立sparkconf
    val conf = new SparkConf().setMaster("local[6]").setAppName("spark_context")
    //2、建立sparkcontext
    val sc = new SparkContext(conf)

  }
}

2、RDD的建立方式:(RDD彈性分散式資料集)

2.1通過本地集合建立RDD

2.2通過外部陣列建立RDD

2.3通過RDD衍生出RDD

  • source是通過讀取 HDFS 中的檔案所建立的

  • words是通過source呼叫運算元map生成的新 RDD