1. 程式人生 > >Spark七 Pair RDD轉化操作

Spark七 Pair RDD轉化操作

Pair RDD: 提供並行操作各個鍵和跨節點重新進行資料分組的操作介面

建立Pair RDD

1) 把普通的RDD轉化為Pair RDD

使用map方法將lines劃分為以首個單詞為鍵,行內容為值的Pair RDD

val pairs = lines.map(x => (x.split(" ")(0), x)

2) 驅動器程式中建立Pair RDD

呼叫SparkContext.parallelize()

val pairs = sc.parallelize(List((1,1), (2, 2), (3, 3)))

轉化操作

1 Pair RDD也是RDD,所以適用於普通RDD的函式也適用於Pair RDD,如filter

對Pair RDD過濾出長度小於20的鍵值對

pairs.filter{case(key, value) => value.length < 20}

2 聚合操作

> reduceByKey():接收一個函式,並使用該函式對值進行合併。為資料集的每個鍵進行並行的規約操作,每個規約操作會將鍵相同的值合併起來。返回鍵和規約結果組成的新的RDD

> foldByKey():使用一個與RDD和合並函式中的資料類性相同的零值作為初始值。對零值和另一個元素合併,結果仍是該元素

> 使用reduceByKey和mapValues()計算每個鍵的對應值的均值

rdd.mapValues(x => (x, 1)).reduceByKey((x, y) => (x._1 + y._1, x._2 + y._2))

> reduceByKey單詞計數

val input = sc.textFile("hdfs://...")

val words = input.flatMap(x => x.split(" "))

val result = words.map(x => (x, 1)).reduceByKey((x, y) => x + y)

> combineByKey()最為常用的基於鍵的聚合函式,讓使用者返回與輸入資料型別不同的返回值

    > combineByKey()使用一個叫createCombiner()函式建立鍵對應的累加器初始值,這個操作是對分割槽的,不是對整個RDD的

    > 如果處理當前分割槽已經初始化的鍵,則使用mergeValue()方法將累加值合併

    > 每個分割槽是獨立處理,一個鍵可能有多個累加器,如果要合併不同分割槽的累加器,使用mergeCombiners()

> 自定義並行度

   val data = Seq(("a", 3), ("b", 4), ("a", 1))

   sc.parallelize(data).reduceByKey((x, y) => x + y) //預設並行度

   sc.parallelize(data).reduceByKey((x, y) => x + y, 10) //自定義並行度

也可使用repartition()和coalesce()方法對資料混洗

資料分組

單個RDD分組:groupByKey()將資料根據鍵分組,RDD(K, V)轉為RDD(K, Iterable(V))

多個共享一個鍵的RDD進行分組,RDD(K, V)和RDD(K, M)轉為RDD(K, Iterable(V),Iterable(M))

連線

Join() rightOuterJoin() leftOuterJoin()

資料排序

sortByKey()

行動操作


相關推薦

Spark Pair RDD轉化操作

Pair RDD: 提供並行操作各個鍵和跨節點重新進行資料分組的操作介面建立Pair RDD1) 把普通的RDD轉化為Pair RDD使用map方法將lines劃分為以首個單詞為鍵,行內容為值的Pair RDDval pairs = lines.map(x => (x.

Spark 的鍵值對(pair RDD操作,Scala實現

一:什麼是Pair RDD?          Spark為包含鍵值對對型別的RDD提供了一些專有操作,這些操作就被稱為Pair RDD,Pair RDD是很多程式的構成要素,因為它們提供了並行操作對各個鍵或跨節點重新進行資料分組的操作介面。 二:Pair RDD的操作例項

Spark核心程式設計-RDD建立操作

目前有兩種型別的基礎RDD:一種是並行集合(Parallelized Collections),接收一個已經存在的scala集合,然後進行各種平行計算;另外一種是從外部儲存建立的RDD,外部儲存可以是文字檔案或者HDFS,也可以是Hadoop的介面API。 一、並行化集合建

Spark鍵值對RDD轉化操作

png mage str info 列表 alt 一個 圖片 pair 1.1 鍵值對RDD的轉化操作\\ 1.1.1 轉化操作列表 針對一個Pair RDD的轉化操作 : 、 針對兩個Pair RDD的轉化操作 : 1.1.2 聚

spark——詳解rdd常用的轉化和行動操作

本文始發於個人公眾號:TechFlow,原創不易,求個關注 今天是spark第三篇文章,我們繼續來看RDD的一些操作。 我們前文說道在spark當中RDD的操作可以分為兩種,一種是轉化操作(transformation),另一種是行動操作(action)。在轉化操作當中,spark不會為我們計算結果,而是會

[Spark][Python]RDD flatMap 操作例子

line var 元素 bsp ini atd 執行函數 clas park RDD flatMap 操作例子: flatMap,對原RDD的每個元素(行)執行函數操作,然後把每行都“拍扁” [[email protected] ~]$

Spark RDD基礎操作

標題 舉例 解釋 Spark的基本資訊 Spark 1個driver(膝上型電腦或者叢集閘道器機器上)和若干個executor(在各個節點上)組成。通

Spark -- RDD簡單操作【統計文字中單行最大單詞數】

一 、什麼是RDD ?          RDD在Spark【Scala語言】中,是一種資料結構【基於記憶體,可持久化】,就好比Java的ArrayList一樣,可以進行各種的Action操作,比如Java中的List集合,可以進行get【獲取元素】、add【增加元

Spark程式設計指南之一:transformation和action等RDD基本操作

文章目錄 基本概念 開發環境 程式設計實戰 初始化SparkContext RDD的生成 RDD基本操作 Key-Value Pairs Transformations f

Java Spark之建立RDD的兩種方式和操作RDD

首先看看思維導圖,我的spark是1.6.1版本,jdk是1.7版本  spark是什麼?  Spark是基於記憶體計算的大資料平行計算框架。Spark基於記憶體計算,提高了在大資料環境下資料處理的實時性,同時保證了高容錯性和高可伸縮性,允許使用者將Spark 部署在大量廉

Spark RDD建立操作

從集合建立RDD parallelize def parallelize[T](seq: Seq[T], numSlices: Int = defaultParallelism)(implicit arg0: ClassTag[T]): RDD[T] 從一個Seq集合建立RDD

Spark運算元:RDD行動Action操作(4)–countByKey、foreach、foreachPartition、sortBy

關鍵字:Spark運算元、Spark函式、Spark RDD行動Action、countByKey、foreach、foreachPartition、sortBy countByKey def countByKey(): Map[K, Long] countByKey用於統

Learning Spark——RDD常用操作

RDD支援兩種操作:轉換(Transformation)操作和行動(Action)操作。 為什麼會分為兩種操作,這兩種操作又有什麼區別呢? 我們先考慮一下平常我們使用的一些函式,舉個例子Long.toString(),這個轉換是把Long型別的轉換為Stri

Spark入門(四):RDD基本操作

1.RDD轉換 RDD的所有轉換操作都不會進行真正的計算 1.1單個RDD轉換操作 # 建立測試RDD val rdd = sc.parallelize(Array("hello world","java","scala easy")) # 1.

Spark RDD基本操作

Spark RDD Scala語言程式設計 RDD(Resilient Distributed Dataset)是一個不可變的分散式物件集合, 每個rdd被分為多個分割槽, 這些分割槽執行在叢集的不同節點上。rdd支援兩種型別的操作:轉化(trainsfo

Java接入Spark之建立RDD的兩種方式和操作RDD

首先看看思維導圖,我的spark是1.6.1版本,jdk是1.7版本 spark是什麼? Spark是基於記憶體計算的大資料平行計算框架。Spark基於記憶體計算,提高了在大資料環境下資料處理的實時性,同時保證了高容錯性和高可伸縮性,允許使用者將Spar

Spark運算元:RDD鍵值轉換操作(5)–leftOuterJoin、rightOuterJoin、subtractByKey

關鍵字:Spark運算元、Spark RDD鍵值轉換、leftOuterJoin、rightOuterJoin、subtractByKey leftOuterJoin def leftOuterJoin[W](other: RDD[(K, W)]): RDD[(K, (V,

sparkSpark運算元:RDD基本轉換操作–map、flagMap、distinct

map將一個RDD中的每個資料項,通過map中的函式對映變為一個新的元素。 輸入分割槽與輸出分割槽一對一,即:有多少個輸入分割槽,就有多少個輸出分割槽。 hadoop fs -cat /tmp/lxw1234/1.txthello worldhello sparkhello

spark mlib 機器學習系列之一:Spark rdd 常見操作

package mlib import org.apache.spark.SparkContext import org.apache.spark.sql.SparkSession object UsefulRddOpts { def main(ar

Spark RDD Actions操作之reduce()

textFile.map(line => line.split(" ").size).reduce((a, b) => if (a > b) a else b) The argu