1. 程式人生 > >Spark RDD操作:combineByKey函式詳解

Spark RDD操作:combineByKey函式詳解

當資料集一鍵值對形式組織的時候,聚合具有相同鍵的元素進行一些統計是很常見的操作。對於Pair RDD常見的聚合操作如:reduceByKey,foldByKey,groupByKey,combineByKey。這裡重點要說的是combineByKey。因為combineByKey是Spark中一個比較核心的高階函式,groupByKey,reduceByKey都是基於combineByKey實現的。

combineByKey的定義

def combineByKey[C](  
      createCombiner: V => C,  
      mergeValue: (C, V) => C,  
      mergeCombiners: (C, C) => C,  
      partitioner: Partitioner,  
      mapSideCombine: Boolean = true,  
      serializer: Serializer = null)  

其中的引數:

createCombiner: V => C ,這個函式把當前的值作為引數,此時我們可以對其做些附加操作(型別轉換)並把它返回 (這一步類似於初始化操作)

mergeValue: (C, V) => C,該函式把元素V合併到之前的元素C(createCombiner)上 (這個操作在每個分割槽內進行)

mergeCombiners: (C, C) => C,該函式把2個元素C合併 (這個操作在不同分割槽間進行)

numPartitions:結果RDD分割槽數,預設保持原有的分割槽數
partitioner:分割槽函式,預設為HashPartitioner
mapSideCombine:是否需要在Map端進行combine操作,類似於MapReduce中的combine,預設為true

combineByKey來求解平均數的例子

val initialScores = Array(("Fred", 88.0), ("Fred", 95.0), ("Fred", 91.0), ("Wilma", 93.0), ("Wilma", 95.0), ("Wilma", 98.0))  
val d1 = sc.parallelize(initialScores)  
type MVType = (Int, Double) //定義一個元組型別(科目計數器,分數)  
d1.combineByKey(  
  score => (1, score),  
  (c1: MVType, newScore) => (c1._1 + 1, c1._2 + newScore),  
  (c1: MVType, c2: MVType) => (c1._1 + c2._1, c1._2 + c2._2)  
).map { case (name, (num, socre)) => (name, socre / num) }.collect  
引數含義的解釋
a 、score => (1, score),我們把分數作為引數,並返回了附加的元組型別。 以"Fred"為列,當前其分數為88.0 =>(1,88.0) 1表示當前科目的計數器,此時只有一個科目
b、(c1: MVType, newScore) => (c1._1 + 1, c1._2 + newScore),注意這裡的c1就是createCombiner初始化得到的(1,88.0)。在一個分割槽內,我們又碰到了"Fred"的一個新的分數91.0。當然我們要把之前的科目分數和當前的分數加起來即c1._2 + newScore,然後把科目計算器加1即c1._1 + 1
c、 (c1: MVType, c2: MVType) => (c1._1 + c2._1, c1._2 + c2._2),注意"Fred"可能是個學霸,他選修的科目可能過多而分散在不同的分割槽中。所有的分割槽都進行mergeValue後,接下來就是對分割槽間進行合併了,分割槽間科目數和科目數相加分數和分數相加就得到了總分和總科目數

相關推薦

Spark RDD操作combineByKey函式

當資料集一鍵值對形式組織的時候,聚合具有相同鍵的元素進行一些統計是很常見的操作。對於Pair RDD常見的聚合操作如:reduceByKey,foldByKey,groupByKey,combineByKey。這裡重點要說的是combineByKey。因為combineBy

Spark核心RDDfoldByKey函式

foldByKey(zeroValue: V, numPartitions: Int)(func: (V, V) => V): RDD[(K, V)]foldByKey(zeroValue: V

python 學習彙總27itertools函式( tcy)

itertools函式 2018/11/14 2.1.建立新iter: count(start=0, step=1)#無限迴圈數;按Ctrl + C退出 # 返回均勻間隔值無限流;通常用作map()生成連續資料點的引數。此外,用於zip()新增序列號 g = itertools.count

專題8javascript函式

arguments是什麼? arguments 是一個對應於傳遞給函式的引數的類陣列物件。在(非箭頭)函式呼叫時,建立的一個 它類似於Array,但除了長度之外沒有任何Array屬性 的物件 ,它儲存的是實際傳遞給函式的引數(侷限於函式宣告的引數列表)。此物件包含傳遞給函式

Linux 多工程式設計——多程序vfork() 函式

所需標頭檔案: #include <sys/types.h> #include <unistd.h> pid_t vfork(void); 功能: vfork() 函式和 fork() 函式(fork()如何使用,請點此連結)一樣都是在已有的

018include函式

include函式詳解(瞭解——雖然用的很少): include函式的用法,目前有三種使用方式: 1、include(module,namespace=None): module:子url的模組字串(即:app); namespace:例項名稱空間;這個地方要注意一點,如果指定例項名稱空間,那

linuxselect()函式

一.Select 函式詳細介紹 Select在Socket程式設計中還是比較重要的,可是對於初學Socket的人來說都不太愛用Select寫程式,他們只是習慣寫諸如connect、 accept、recv或recvfrom這樣的阻塞程式(所謂阻塞方式block

演算法學習筆記函式

## 引言 **母函式**(Generating function,**生成函式**)是**組合數學**中一種重要的方法,這裡只對最簡單的普通母函式作簡單介紹。其主要思想是,把離散序列和**冪級數**對應起來。 先來看一個最經典的例子:給你1克、2克、3克、4克的砝碼各一枚,問稱出1~10克的方案分別有多

Spark函式系列之RDD基本轉換

摘要:  RDD:彈性分散式資料集,是一種特殊集合 ‚ 支援多種來源 ‚ 有容錯機制 ‚ 可以被快取 ‚ 支援並行操作,一個RDD代表一個分割槽裡的資料集  RDD有兩種操作運算元:      Transformation(轉換):Transformation屬於延遲計

Spark函式系列之RDD基本轉換+例項

 RDD:彈性分散式資料集,是一種特殊集合 ‚ 支援多種來源 ‚ 有容錯機制 ‚ 可以被快取 ‚ 支援並行操作,一個RDD代表一個分割槽裡的資料集   RDD有兩種操作運算元:       &nbs

Spark核心RDDSort排序

val conf = new SparkConf() val sc = new SparkContext(conf) val array = Array((1, 6, 3), (2, 3, 3), (1, 1, 2), (1, 3, 5), (2, 1, 2)) val rdd1 = sc.paralleli

spark三種清理資料的方式UDF,自定義函式spark.sql;Python中的zip()與*zip()函式//及python中的*args和**kwargs

(1)UDF的方式清理資料 import sys reload(sys) sys.setdefaultencoding('utf8') import re import json from pyspark.sql import SparkSession

Spark——RDD操作

一、基本RDD 1、針對各個元素的轉化操作 最常用的轉化操作是map()和filter()。轉化操作map()J接收一個函式,把這個函式用於RDD中的每一個元素,將函式的返回結果作為結果RDD中對應元素。而轉化操作filter()則接收一個函式,將RDD滿足

Opencv基礎 Mat類裡setTo函式

https://blog.csdn.net/oMoDao1/article/details/80324360 函式原型:   /** @brief Sets all or some of the array elements to the specified value. &n

keras4)LSTM函式

LSTM層 keras.layers.recurrent.LSTM(units, activation='tanh', recurrent_activation='hard_sigmoid', use_bias=True, kernel_initializer='glorot_uni

Liunx環境基礎開發工具使用總結(基本操作命令及使用

目錄 1vim的基本概念 2vim的基本操作 3vim正常模式命令集 1:插入模式 2:移動游標 3:刪除文字 4:複製 5:替換 6:撤銷 7:更改 8:跳到指定行 4vim末行模式命令集 分屏操作 編譯器的使用 1:背景知識 2:g

ThinkPHP函式--D函式例項化模型

D方法:應該是用的比較多的方法了,用於例項化自定義模型類,是ThinkPHP框架對Model類例項化的一種封裝,並實現了單例模式,支援跨專案和分組呼叫,呼叫格式如下: D('[專案://][分組/]模型','模型層名稱') 方法的返回值是例項化的模型物件。 D方法可以自動檢測模型類,如果存在自

Linux 多工程式設計——多程序建立fork() 和vfork() 函式

一、fork() 函式詳解 需要的標頭檔案: #include <sys/types.h> #include <unistd.h> pid_t fork(void); 功能: 用於從一個已存在的程序中建立一個新程序,新程序稱為子程序,原程序稱為父程序。

C語言學習筆記printf()函式

C語言中有關printf()函式的詳細使用方法: 修飾符: - digit(s) :欄位寬度的最小值。如果該欄位不能容納要列印的數或者字串,系統就會使更寬的欄位。 如%4d。 - .digit(s):精度,將結果保留到小數點後的多少位。 - h: 和整數轉

Android NDK——必知必會之JNI的C++操作函式和小結(三)

引言 上一篇講解了一些關於JNI和NDK的必知必會的理論知識和機制,由於篇幅問題把關於JNI的重要的函式放到這篇,具體使用留到下一篇,此係列文章基連結: 一、JNI中的函式概述 在JNI層我們基本上都是通過env指標來呼叫jni.h標頭檔案裡定義的函式,JNI