1. 程式人生 > >Spark RDD基本操作

Spark RDD基本操作

Spark RDD Scala語言程式設計

RDD(Resilient Distributed Dataset)是一個不可變的分散式物件集合, 每個rdd被分為多個分割槽, 這些分割槽執行在叢集的不同節點上。rdd支援兩種型別的操作:轉化(trainsformation)和行動(action), Spark只會惰性計算rdd, 也就是說, 轉化操作的rdd不會立即計算, 而是在其第一次遇到行動操作時才去計算, 如果想在多個行動操作中複用一個rdd, 可以使用RDD.persist()讓Spark把這個rdd快取下來。

0. 初始化SparkContext

import org.apache.spark.{SparkConf, SparkContext}

val
sc = new SparkContext(new SparkConf().setMaster("local[*]").setAppName("spark-rdd-demo"))

1. 建立RDD

Spark提供了2種建立RDD的方式:

1.1 讀取外部資料集
val lines = sc.parallelize(List("Java", "Scala", "C++"))
1.2 在驅動器程式中對一個集合進行並行化
val lines = sc.textFile("hdfs://dash-dev:9000/input/test.txt")

2. RDD操作

2.1 轉化操作

RDD的轉化操作是返回新RDD的操作, 常用轉化操作總結如下:

表1: 對一個數據為{1,2,3,3}的RDD進行基本的轉化操作

函式名 目的 示例 結果
map() 將函式應用於RDD中每個元素, 將返回值構成新的RDD rdd.map(x=>x+1) {2,3,4,5}
flatMap() 將函式應用於RDD中的每個元素, 將返回的迭代器的所有內容構成新的RDD, 常用來切分單詞 rdd.flatMap(x=>x.to(2)) {1,2,2}
filter() 返回一個通過傳入給filter()的函式的元素組成的RDD rdd.filter(x=> x>2) {3,3}
distinct() 去重 rdd.distinct() {1,2,3}
sample(withReplacement, fraction, [seed]) 對RDD取樣, 以及是否替換 rdd.sample(false, 0.5) 非確定的

表2: 對資料分別為{1,2,3}和{2,3,4}RDD進行鍼對2個RDD的轉化操作

函式名 目的 示例 結果
union() 求2個RDD的並集 rdd.union(other) {1,2,3,4}
intersection() 求2個RDD的交集 rdd.intersection(other) {2,3}
subtract() 求2個RDD的差集 rdd.subtract(other) {1}
cartesian() 求2個RDD的笛卡爾積 rdd.cartesian(other) {1,2}, {1,3}, {1,4}…{3,4}
sample(withReplacement, fraction, [seed]) 對RDD取樣, 以及是否替換 rdd.sample(false, 0.5) 非確定的
2.2 行動操作

RDD的行動操作會把最終求得的結果返回驅動器程式, 或者寫入外部儲存系統中。

表3: 對一個數據為{1,2,3,3}的RDD進行基本RDD的行動操作

函式名 目的 示例 結果
redcue() 並行整合RDD中的所有元素 rdd.reduce((x, y) => x+y) 9
collect() 返回RDD中的所有元素 rdd.collect() {1,2,3,4}
count() 求RDD中的元素個數 rdd.count() 4
countByValue() 各元素在RDD中出現的次數 rdd.countByValue() {1,1}, {2, 1}, {3,2}
take(n) 從RDD中返回n個元素 rdd.take(2) {1,2}
top(n) 從RDD中返回前n個元素 rdd.top(3) {3,3,2}
foreach(func) 對RDD中的每個元素使用給定的函式 rdd.foreach(print) 1,2,3,3
2.3 向Spark傳遞函式

Spark的大部分轉化和行動操作都要依賴於使用者傳遞的函式來計算, 當傳遞的物件是某個物件的成員, 或者包含了對某個物件中一個欄位的引用時(如self.field), Spark就會把整個物件傳送到工作節點上--這比你本意想傳遞的東西大太多了!替代的方案是,把你需要的欄位從物件中拿出來放到一個區域性變數中, 然後傳遞這個區域性變數:

class SearchFunctions(val query: String) {
    def isMatch(s: String): Boolean = {
        s.contains(query)
    }

    def getMatchesFunctionReference(rdd: RDD[String]): RDD[String] = {
        // 問題:"isMatch"表示"this.isMatch", 因此會傳遞整個this
        rdd.map(isMatch)
    }

    def getMatchesFieldReference(rdd: RDD[String]): RDD[String] = {
        // 問題: "query"表示"this.query", 因此會傳遞整個this
        rdd.map(x => x.split(query))
    }

    def getMatchesNoReference(rdd: RDD[String]): RDD[String] = {
        // 安全:只把我們需要的欄位拿出來放入區域性變數中
        val localQuery = this.query
        rdd.map(x => x.split(localQuery))
    }
}

另外, 要注意的是, Spark要求我們傳入的函式及其應用的資料是可序列化的(實現了Java的Serializable介面), 否則會出現NotSerializableException。

相關推薦

Spark RDD基本操作

Spark RDD Scala語言程式設計 RDD(Resilient Distributed Dataset)是一個不可變的分散式物件集合, 每個rdd被分為多個分割槽, 這些分割槽執行在叢集的不同節點上。rdd支援兩種型別的操作:轉化(trainsfo

spark RDD 基本操作

1. map:    對當前元素做一個對映    val array = Array(1,2,3)    val rdd = sc.parallelize(array).map(r => 2*r) 2. filter:    過濾出符合條件的元組  va

Spark程式設計指南之一:transformation和action等RDD基本操作

文章目錄 基本概念 開發環境 程式設計實戰 初始化SparkContext RDD的生成 RDD基本操作 Key-Value Pairs Transformations f

Spark入門(四):RDD基本操作

1.RDD轉換 RDD的所有轉換操作都不會進行真正的計算 1.1單個RDD轉換操作 # 建立測試RDD val rdd = sc.parallelize(Array("hello world","java","scala easy")) # 1.

Spark學習——RDD基本操作

Spark操作分為transformation和action,現將常用的記錄在此: //transformation //將元素一個一個轉換 JavaRDD<String> map = raw.map(new Fu

Spark RDD基本概念、寬窄依賴、轉換行為操作

目錄 RDD概述 RDD的內部程式碼 案例 小總結 轉換、行動運算元 寬、窄依賴 Reference 本文介紹

Spark RDD基礎操作

標題 舉例 解釋 Spark的基本資訊 Spark 1個driver(膝上型電腦或者叢集閘道器機器上)和若干個executor(在各個節點上)組成。通

RDD基本操作之Action

介紹 比較 text inf count program 圖片 訪問 器) Action介紹 在RDD上計算出來一個結果 把結果返回給driver program或保存在文件系統,count(),save 常用的Action reduce() 接收一個函數,作用在RDD

Spark SQL基本操作以及函式的使用

引語:   本篇部落格主要介紹了Spark SQL中的filter過濾資料、去重、集合等基本操作,以及一些常用日期函式,隨機函式,字串操作等函式的使用,並列編寫了示例程式碼,同時還給出了程式碼當中用到的一些資料,放在最文章最後。 SparkSQL簡介   Spark SQL是Sp

Spark -- RDD簡單操作【統計文字中單行最大單詞數】

一 、什麼是RDD ?          RDD在Spark【Scala語言】中,是一種資料結構【基於記憶體,可持久化】,就好比Java的ArrayList一樣,可以進行各種的Action操作,比如Java中的List集合,可以進行get【獲取元素】、add【增加元

Spark RDD建立操作

從集合建立RDD parallelize def parallelize[T](seq: Seq[T], numSlices: Int = defaultParallelism)(implicit arg0: ClassTag[T]): RDD[T] 從一個Seq集合建立RDD

Learning Spark——RDD常用操作

RDD支援兩種操作:轉換(Transformation)操作和行動(Action)操作。 為什麼會分為兩種操作,這兩種操作又有什麼區別呢? 我們先考慮一下平常我們使用的一些函式,舉個例子Long.toString(),這個轉換是把Long型別的轉換為Stri

spark mlib 機器學習系列之一:Spark rdd 常見操作

package mlib import org.apache.spark.SparkContext import org.apache.spark.sql.SparkSession object UsefulRddOpts { def main(ar

Spark RDD Actions操作之reduce()

textFile.map(line => line.split(" ").size).reduce((a, b) => if (a > b) a else b) The argu

3.4 Spark RDD Action操作6-saveAsHadoopFile、saveAsHadoopDataset

1 saveAsHadoopFile def saveAsHadoopFile(path: String, keyClass: Class[], valueClass: Class[], outputFormatClass: Class[_ <: Ou

3.4 Spark RDD Action操作7-saveAsNewAPIHadoopFile、saveAsNewAPIHadoopDataset

1 saveAsNewAPIHadoopFile def saveAsNewAPIHadoopFile[F <: OutputFormat[K, V]](path: String)(implicit fm: ClassTag[F]): Unit de

3.4 Spark RDD Action操作4-countByKey、foreach、foreachPartition、sortBy

1 countByKey def countByKey(): Map[K, Long] countByKey用於統計RDD[K,V]中每個K的數量。 例子: scala> var rdd1 = sc.makeRDD(Array((“A”,0),(“

rdd 基本操作

val col == runt get flat partition lte top package com.jason.example import org.apache.spark.rdd.RDD class RddTest extends SparkI

Spark 系列(十四)—— Spark Streaming 基本操作

一、案例引入 這裡先引入一個基本的案例來演示流的建立:獲取指定埠上的資料並進行詞頻統計。專案依賴和程式碼實現如下: <dependency> <groupId>org.apache.spark</groupId> <artifactId>spar

Spark算子:RDD基本轉換操作(1)–map、flatMap、distinct

ive 註意 pre spl cti result log bsp blog Spark算子:RDD基本轉換操作(1)–map、flatMap、distinct 關鍵字:Spark算子、Spark RDD基本轉換、map、flatMap、distinct map 將