1. 程式人生 > >spark原始碼action系列-collect

spark原始碼action系列-collect

RDD.collect的操作

collect操作,在最後的ResultTask.runTask,執行的function的操作為下面程式碼.

由於對ResultTask的runTask這個函式的返回值就是這個runTask函式在執行完成RDD傳入的function後的返回值.這裡要說明下如果task的結果超過了spark.driver.maxResultSize配置的最大值時,預設是1G,直接對task的結果進行丟掉,不處理,

defcollect(): Array[T] = withScope {val results = sc.runJob(this(iter: Iterator[T]) => iter.toArray

)

這個操作把所有的task返回的array進行連線,合併到一個array中進行返回.  Array.concat(results: _*)}

用於在driver端處理各個Task的結果返回的resultHandler函式,這個函式就是把對應的task的結果直接放到driver端接收資料的一個數組中.

valresults = new Array[U](partitions.size)

(index, res) => results(index) = res

在這個操作中,是直接把每個partitionIterator的結果轉換成一個array.上面的紅色部分((iter: Iterator[T]) => iter.toArray

).

從上面的程式碼中可以看出來,針對一個rdd的collect的操作是把當前的rdd中所有的partition中的資料集的iterator直接轉換成一個array[T],這個array也是對應此partition的返回值,使用collect時要確保每個task的返回的資料的大小,同時要保證所有的task中返回的資料的大小不能超過1GB.

在driver端接收到每一個task返回的資料集時,每個task返回的是這個task中所有的資料集的陣列,通過在driver端定義的一個results陣列,這個陣列的長度就是partition的個數,每個task的返回結果儲存到這個對應的index位置上((index, res) => results(index) = res

),最後在把這個二維的資料進行concat操作(  Array.concat(results: _*)),把所有的資料集合併到一個數組中.這個陣列就是執行collect的最終的返回值.

相關推薦

spark原始碼action系列-collect

RDD.collect的操作 collect操作,在最後的ResultTask.runTask中,執行的function的操作為下面程式碼. 由於對ResultTask的runTask這個函式的返回值

spark原始碼action系列-foreach與foreachPartition

RDD.foreachPartition/foreach的操作 在這個action的操作中: 這兩個action主要用於對每個partition中的iterator時行迭代的處理.通過使用者傳入的fu

Spark原始碼分析系列(目錄)

記錄自己學習研究 Spark 的探索過程,為後續總結奠定基礎。 本文程式碼研究以 Spark 2.3.0 原始碼為基準,如果看本文,請閱讀時,下載對應的 Spark 版本。 圖1 伯克利的資料分析軟體棧BDAS(Berkeley Data Analytics Stack) 這裡要先說BDAS(伯克利

Spark 原始碼分析系列

如下,是 spark 原始碼分析系列的一些文章彙總,持續更新中...... Spark RPC spark 原始碼分析之五--Spark RPC剖析之建立NettyRpcEnv spark 原始碼分析之六--Spark RPC剖析之Dispatcher和Inbox、Outbox剖析 spark 原始碼

spark源代碼action系列-foreach與foreachPartition

ims class 問題 font 用戶 dsm scope 來看 color RDD.foreachPartition/foreach的操作 在這個action的操作中: 這兩個action主要用於對每一個partition中的iterator時行叠代的處理.

Spark原始碼系列:RDD repartition、coalesce 對比

在上一篇文章中 Spark原始碼系列:DataFrame repartition、coalesce 對比 對DataFrame的repartition、coalesce進行了對比,在這篇文章中,將會對RDD的repartition、coalesce進行對比。 RDD重新分割槽的手段與Da

Spark原始碼系列(九)Spark SQL初體驗之解析過程詳解

首先宣告一下這個版本的程式碼是1.1的,之前講的都是1.0的。 Spark支援兩種模式,一種是在spark裡面直接寫sql,可以通過sql來查詢物件,類似.net的LINQ一樣,另外一種支援hive的HQL。不管是哪種方式,下面提到的步驟都會有,不同的是具體的執行過程。下面

Spark入門實戰系列--2.Spark編譯與部署(中)--Hadoop編譯安裝

二進制包 1.10 不能 mapr 修復 att 機器 mave end 【註】該系列文章以及使用到安裝包/測試數據 能夠在《[傾情大奉送–Spark入門實戰系列] (http://blog.csdn.net/yirenboy/article/deta

spark源碼系列之累加器實現機制及自定義累加器

大數據 spark一,基本概念 累加器是Spark的一種變量,顧名思義該變量只能增加。有以下特點: 1,累加器只能在Driver端構建及並只能是Driver讀取結果,Task只能累加。 2,累加器不會改變Spark Lazy計算的特點。只會在Job觸發的時候進行相關累加操作。 3,現有累加器的類型。相信有很

Spark原始碼分析之Spark Shell(上)

https://www.cnblogs.com/xing901022/p/6412619.html 文中分析的spark版本為apache的spark-2.1.0-bin-hadoop2.7。 bin目錄結構: -rwxr-xr-x. 1 bigdata bigdata 1089 Dec

編譯spark原始碼的方法,及編譯、案例測試問題總結

一、編譯spark方法 1.編譯環境 首先,需要安裝jdk、maven,相關安裝教程請參考:http://blog.csdn.net/u012829611/article/details/77651855 http://blog.csdn.net/u012829611/artic

解決Spark Arrays.toString(Dataset.collect())報錯

以前用java.util.Arrays.toString(Dataset.collect())的時候是可以輸出的,不知為何,今天編譯的時候報了錯誤: [INFO] -------------------------------------------------------------

zookeeper原始碼閱讀系列

1;github 下載 zookeeper原始碼 2:修改build.xml檔案和ivy.xml a:build.xml 將地址: get src=”http://downloads.sourceforge.net/project/ant-eclipse/ant-eclipse/1

Spark 原始碼簡單跟蹤

本文介紹下Spark 到底是如何執行sc.TextFile(...).map(....).count() 這種程式碼的,從driver端到executor端。 另外還有pid,iter都是哪來的呢? 如果你照著原始碼點進去你會很困惑。為莫名其妙怎麼就有了這些iterator呢? Transf

CAS原始碼追蹤系列一:Filter的初始化

目錄 程式碼跟蹤 Spring-web:DelegatingFilterProxy CAS:AuthenticationFilter 總結 最近研究了一下SSO(Single Sign On:單點登入)原理。 於是想借助CAS(基於SSO原理的實現框架)加深一下

Java| Java 7 原始碼學習系列--String

String表示字串,Java中所有字串的字面值都是String類的例項,例如“ABC”。字串是常量,在定義之後不能被改變,字串緩衝區支援可變的字串。因為 String 物件是不可變的,所以可以共享它們。例如: String str = "abc"; 相當於 char data[] =

[Mybatis原始碼分析系列]]03 TypeAliasRegistry

TypeAliasRegistry 負責註冊,儲存,獲取MyBatis別名的類 typeAliases(別名介紹) 類型別名是為 Java 型別設定一個短的名字。它只和 XML 配置有關,存在的意義僅在於用來減少類完全限定名的冗餘。例如: <typeAliases>

[Mybatis原始碼分析系列] 01 解析mybatis-config.xml配製檔案並返回SqlSessionFactory的類SqlSessionFactoryBuilder

前言 公司一直在使用Jpa + Hibernate那一套東西,但是這套技術封裝的太過後重。不利於開發人員掌握,而在優化sql方面也是蛋疼的很。所以在後臺不是特別重要的專案中引入了MyBatis。當然光會使用,是滿足不了本吊絲的胃口,所以走上了分析MyBatis原始碼的道路,並有了這一系列

[MyBatis原始碼分析系列] ResolverUtil

ResolverUtil ResolverUtil用於查詢在類路徑可用並滿足任意條件的類。最常見的兩種情況是一個類繼承或實現了另一個類,或者此類被指定的註解標記了。然而,通過使用Test類,可以滿足任意條件的搜尋。 類載入器用於定位類路徑下指定包下面的必要類,然後載入並檢驗他們。預設

Dubbo 原始碼分析系列之三 —— 架構原理

1 核心功能 首先要了解Dubbo提供的三大核心功能: Remoting:遠端通訊 提供對多種NIO框架抽象封裝,包括“同步轉非同步”和“請求-響應”模式的資訊交換方式。 Cluster: 服務框架 提供基於介面方法的透明遠端過程呼叫,包括多協議支援,以及