1. 程式人生 > >spark——詳解rdd常用的轉化和行動操作

spark——詳解rdd常用的轉化和行動操作

本文始發於個人公眾號:TechFlow,原創不易,求個關注


今天是spark第三篇文章,我們繼續來看RDD的一些操作。

我們前文說道在spark當中RDD的操作可以分為兩種,一種是轉化操作(transformation),另一種是行動操作(action)。在轉化操作當中,spark不會為我們計算結果,而是會生成一個新的RDD節點,記錄下這個操作。只有在行動操作執行的時候,spark才會從頭開始計算整個計算。

而轉化操作又可以進一步分為針對元素的轉化操作以及針對集合的轉化操作。

針對元素的轉化操作

針對元素的轉化操作非常常用,其中最常用的就是map和flatmap。從名字上看這兩者都是map操作,map操作我們都知道,在之前的MapReduce文章以及Python map、reduce用法的文章當中都有提及。簡而言之就是可以將一個操作對映在每一個元素上。

比如假設我們有一個序列[1, 3, 4, 7],我們希望將當中每一個元素執行平方操作。我們當然可以用for迴圈執行,但是在spark當中更好的辦法是使用map。

nums = sc.parallelize([1, 3, 4, 7])
spuare = nums.map(lambda x: x * x)

我們知道map是一個轉化操作,所以square仍然是一個RDD,我們直接將它輸出不會得到結果,只會得到RDD的相關資訊:

內部RDD的轉化圖是這樣的:

我們想看結果就必須要執行行動操作,比如take,我們take一下檢視一下結果:

和我們的預期一致,對於之前一直關注的同學來說map操作應該已經很熟悉了,那麼這個flatmap又是什麼呢?

差別就在這個flat,我們都知道flat是扁平的意思,所以flatmap就是說map執行之後的結果扁平化。說白了也就是說如果map執行之後的結果是一個數組的話,那麼會將陣列拆開,把裡面的內容拿出來組合到一起。

我們一起來看一個例子:

texts = sc.parallelize(['now test', 'spark rdd'])
split = texts.map(lambda x: x.split(' '))

由於我們執行map的物件是一個字串,一個字串執行split操作之後會得到一個字串陣列。如果我們執行map,得到的結果會是:

如果我們執行flatmap呢?我們也可以試一下:

對比一下,有沒有注意到差別?

是了,map執行的結果是一個array的array,因為每一個string split之後就是一個array,我們把array拼接到一起自然是一個array的array。而flatMap會把這些array攤平之後放在一起,這也是兩者最大的差別。

針對集合的轉化操作

上面介紹了針對元素的轉化操作,下面來看看針對集合的轉化操作。

針對集合的操作大概有union,distinct,intersection和subtract這幾種。我們可以先看下下圖有一個直觀地感受,之後我們再一一分析:

首先來看distinct,這個顧名思義,就是去除重複。和SQL當中的distinct是一樣的,這個操作的輸入是兩個集合RDD,執行之後會生成一個新的RDD,這個RDD當中的所有元素都是unique的。有一點需要注意,執行distinct的開銷很大,因為它會執行shuffle操作將所有的資料進行亂序,以確保每個元素只有一份。如果你不明白shuffle操作是什麼意思,沒有關係,我們在後序的文章當中會著重講解。只需要記住它的開銷很大就行了。

第二種操作是union,這個也很好理解,就是把兩個RDD當中的所有元素合併。你可以把它當成是Python list當中的extend操作,同樣和extend一樣,它並不會做重複元素的檢測,所以如果合併的兩個集合當中有相同的元素並不會被過濾,而是會被保留。

第三個操作是intersection,它的意思是交集,也就是兩個集合重疊的部分。這個應該蠻好理解的,我們看下下圖:

下圖當中藍色的部分,也就是A和B兩個集合的交集部分就是A.intersection(B)的結果,也就是兩個集合當中共有的元素。同樣,這個操作也會執行shuffle,所以開銷一樣很大,並且這個操作會去掉重複的元素。

最後一個是subtract,也就是差集,就是屬於A不屬於B的元素,同樣我們可以用圖來表示:

上圖當中灰色陰影部分就是A和B兩個集合的差集,同樣,這個操作也會執行shuffle,非常耗時。

除了以上幾種之外,還有cartesian,即笛卡爾積,sample抽樣等集合操作,不過相對而言用的稍微少一些,這裡就不過多介紹了,感興趣的同學可以瞭解一下,也並不複雜。

行動操作

RDD中最常用的行動操作應該就是獲取結果的操作了,畢竟我們算了半天就是為了拿結果,只獲取RDD顯然不是我們的目的。獲取結果的RDD主要是take,top和collect,這三種沒什麼特別的用法,簡單介紹一下。

其中collect是獲取所有結果,會返回所有的元素。take和top都需要傳入一個引數指定條數,take是從RDD中返回指定條數的結果,top是從RDD中返回最前面的若干條結果,top和take的用法完全一樣,唯一的區別就是拿到的結果是否是最前面的。

除了這幾個之外,還有一個很常用的action是count,這個應該也不用多說,計算資料條數的操作,count一下就可以知道有多少條資料了。

reduce

除了這些比較簡單的之外,再介紹另外兩個比較有意思的,首先,先來介紹reduce。reduce顧名思義就是MapReduce當中的reduce,它的用法和Python當中的reduce幾乎完全一樣,它接受一個函式來進行合併操作。我們來看個例子:

在這個例子當中,我們的reduce函式是將兩個int執行加和,reduce機制會重複執行這個操作將所有的資料合併,所以最終得到的結果就是1 + 3 + 4 + 7 = 15.

fold

除了reduce之外還有一個叫做fold的action,它和reduce完全一樣,唯一不同的是它可以自定義一個初始值,並且是針對分割槽的,我們還拿上面的例子舉例:

直接看這個例子可能有點懵逼,簡單解釋一下就明白了,其實不復雜。我們注意到我們在使用parallelize創造資料的時候多加了一個引數2,這個2表示分割槽數。簡單可以理解成陣列[1, 3, 4, 7]會被分成兩部分,但是我們直接collect的話還是原值。

現在我們使用fold,傳入了兩個引數,除了一個函式之外還傳入了一個初始值2。所以整個計算過程是這樣的:

對於第一個分割槽的答案是1 + 3 + 2 = 6,對於第二個分割槽的答案是4 + 7 + 2 = 13,最後將兩個分割槽合併:6 + 13 + 2 = 21。

也就是說我們對於每個分割槽的結果賦予了一個起始值,並且對分割槽合併之後的結果又賦予了一個起始值。

aggregate

老實講這個action是最難理解的,因為它比較反常。首先,對於reduce和fold來說都有一個要求就是返回值的型別必須和rdd的資料型別相同。比如資料的型別是int,那麼返回的結果也要是int。

但是對於有些場景這個是不適用的,比如我們想求平均,我們需要知道term的和,也需要知道term出現的次數,所以我們需要返回兩個值。這個時候我們初始化的值應該是0, 0,也就是對於加和與計數而言都是從0開始的,接著我們需要傳入兩個函式,比如寫成這樣:

nums.aggregate((0, 0), lambda x, y: (x[0] + y, x[1] + 1), lambda x, y: (x[0] + y[0], x[1] + y[1]))

看到這行程式碼會懵逼是必然的,不用擔心,我們一點一點解釋。

首先是第一個lambda函式,這裡的x不是一個值而是兩個值,或者說是一個二元組,也就是我們最後返回的結果,在我們的返回預期裡,第一個返回的數是nums的和,第二個返回的數是nums當中數的個數。而這裡的y則是nums輸入的結果,顯然nums輸入的結果只有一個int,所以這裡的y是一維的。那麼我們要求和當然是用x[0] + y,也就是說把y的值加在第一維上,第二維自然是加一,因為我們每讀取一個數就應該加一。

這點還比較容易理解,第二個函式可能有些費勁,第二個函式和第一個不同,它不是用在處理nums的資料的,而是用來處理分割槽的。當我們執行aggregate的時候,spark並不是單執行緒執行的,它會將nums中的資料拆分成許多分割槽,每個分割槽得到結果之後需要合併,合併的時候會呼叫這個函式。

和第一個函式類似,第一個x是最終結果,而y則是其他分割槽運算結束需要合併進來的值。所以這裡的y是二維的,第一維是某個分割槽的和,第二維是某個分割槽當中元素的數量,那麼我們當然要把它都加在x上。

上圖展示了兩個分割槽的時候的計算過程,其中lambda1就是我們傳入的第一個匿名函式,同理,lambda2就是我們傳入的第二個匿名函式。我想結合圖應該很容易看明白。

行動操作除了這幾個之外還有一些,由於篇幅原因我們先不贅述了,在後序的文章當中如果有出現,我們會再進行詳細解釋的。初學者學習spark比較抗拒的一個主要原因就是覺得太過複雜,就連操作還區分什麼轉化操作和行動操作。其實這一切都是為了惰性求值從而優化效能。這樣我們就可以把若干個操作合併在一起執行,從而減少消耗的計算資源,對於分散式計算框架而言,效能是非常重要的指標,理解了這一點,spark為什麼會做出這樣的設計也就很容易理解了。

不僅spark如此,TensorFlow等深度學習框架也是如此,本質上許多看似反直覺的設計都是有更深層的原因的,理解了之後其實也很容易猜到,凡是拿到最終結果的操作往往都是行動操作,如果只是一些計算,那麼十有八九是轉化操作。

持久化操作

Spark當中的RDD是惰性求值的,有的時候我們會希望多次使用同一個RDD。如果我們只是簡單地呼叫行動操作,那麼spark會多次重複計算RDD和它對應的所有資料以及其他依賴,這顯然會帶來大量開銷。我們很自然地會希望對於我們經常使用的RDD可以快取起來,在我們需要的時候隨時拿來用,而不是每次用到的時候都需要重新跑。

為了解決這個問題,spark當中提供了持久化的操作。所謂的持久化可以簡單理解成快取起來。用法也很簡單,我們只需要對RDD進行persist即可:

texts = sc.parallelize(['now test', 'hello world'])
split = texts.split(lambda x: x.split(' '))
split.persist()

呼叫完持久化之後,RDD會被快取進記憶體或磁碟當中,我們需要的時候可以隨時調出來使用,就不用把前面的整個流程全部跑一遍了。並且spark當中支援多種級別的持久化操作,我們可以通過StorageLevel的變數來控制。我們來看下這個StorageLevel的取值:

我們根據需要選擇對應的快取級別即可。當然既然有持久化自然就有反持久化,對於一些已經不再需要快取的RDD,我們可以呼叫unpersist將它們從快取當中去除。

今天的內容雖然看起來各種操作五花八門,但是有些並不是經常用到,我們只需要大概有個印象,具體操作的細節可以等用到的時候再做仔細的研究。希望大家都能忽略這些並不重要的細節,抓住核心的本質。

今天的文章就是這些,如果覺得有所收穫,請順手點個關注或者轉發吧,你們的舉手之勞對我來說很重要。

相關推薦

spark——rdd常用轉化行動操作

本文始發於個人公眾號:TechFlow,原創不易,求個關注 今天是spark第三篇文章,我們繼續來看RDD的一些操作。 我們前文說道在spark當中RDD的操作可以分為兩種,一種是轉化操作(transformation),另一種是行動操作(action)。在轉化操作當中,spark不會為我們計算結果,而是會

Spark RDD | RDD特性、lineage、快取、checkpoint、依賴關係

RDD(Resilient Distributed Datasets)彈性的分散式資料集,又稱Spark core,它代表一個只讀的、不可變、可分割槽,裡面的元素可分散式平行計算的資料集。 RDD是一個很抽象的概念,不易於理解,但是要想學好Spark,必須要掌握RDD,熟悉它的程式設計模型,這是學習Spark

POI操作Excel,讀取xlsxlsx格式的文件

shee xss split 類型 後綴 .sh lan xls lin package org.ian.webutil; import java.io.File; import java.io.FileInputStream; import java.io.FileN

跨域問題相關知識(原生jsjquery兩種方法實現jsonp跨域)

syn con 加載 developer 兩種方法 ray exe 編寫 分組 1、同源策略 同源策略(Same origin policy),它是由Netscape提出的一個著名的安全策略。同源策略是一種約定,它是瀏覽器最核心也最基本的安全功能,如果缺少了同源策略,則瀏覽

(轉)Java JVM 工作原理流程

移植 獲得 代碼 適配 調用 tac 階段 main方法 等待 作為一名Java使用者,掌握JVM的體系結構也是必須的。說起Java,人們首先想到的是Java編程語言,然而事實上,Java是一種技術,它由四方面組成:Java編程語言、Java類文件格式、Java虛擬機和Ja

Java JVM 工作原理流程

str literal 狀態 應用 流程 href ctu 局部變量 自定義 作為一名Java使用者,掌握JVM的體系結構也是必須的。說起Java,人們首先想到的是Java編程語言,然而事實上,Java是一種技術,它由四方面組成:Java編程語言、Java類文件格式、Jav

MyBatis之Mapper常用技巧

mybatis mapperselect先看一個簡單的案例:<select id="selectPerson" parameterType="int" resultType="hashmap"> SELECT * FROM PERSON WHERE ID = #{id} </selec

[js高手之路] es6系列教程 - Map以及常用api

.com size style 系列教程 image clear rsquo images div ECMAScript 6中的Map類型是一種存儲著許多鍵值對的有序列表。鍵值對支持所有的數據類型. 鍵 0 和 ‘0’會被當做兩個不同的鍵,不會發生強

MySQL基準測試sysbench工具

threads 組件 程序 原因 str 前言 全面 無法連接 不同 前言 作為一名後臺開發,對數據庫進行基準測試,以掌握數據庫的性能情況是非常必要的。本文介紹了MySQL基準測試的基本概念,以及使用sysbench對MySQL進行基準測試的詳細方法。 文章有疏漏之處,歡迎

MySQL基準測試sysbench工具(轉)

lua 增刪 後臺 simple ads 執行時間 bench 進行 響應 前言 作為一名後臺開發,對數據庫進行基準測試,以掌握數據庫的性能情況是非常必要的。本文介紹了MySQL基準測試的基本概念,以及使用sysbench對MySQL進行基準測試的詳細方法。 文章有疏漏

最大子段

最大 負數 nbsp 端點 關於 一段 描述 計數器 曾經 題目名稱:最大子段和 題目描述:給出一段序列,選出其中連續且非空的一段使得這段和最大。 輸入格式: 第一行是一個正整數N,表示了序列的長度。 第2行包含N個絕對值不大於10000的整數A[i],描述了這段序列。

分區工具parted的常用分區使用方法【轉】

ima nbsp 磁盤 很好 main adding ext3 當前 padding 來源:http://blog.51cto.com/zhangmingqian/1068779 分區工具parted的詳解及常用分區使用方法 一、 parted的用途

java類型轉換(自動轉換強制轉換)

代碼 oid 高精 log 相加 println 類型轉換詳解 範圍 void 自動轉換 class Hello { public static void main(String[] args) { //自動轉換 int a = 5; byte b = 6

datetime 模塊 -- 基本的日期時間類型

日歷 cell max 5-0 .cn 賦值 RR 意義 struct 轉自:https://www.cnblogs.com/fclbky/articles/4098204.html datetime 模塊提供了各種類用於操作日期和時間,該模塊側重於高效率的格式化輸出在 P

如何在Centos6Centos7兩個版本上,執行Cobbler無人值守安裝!

信息 cgroup 內容 manager 外網 oar 關聯 done 足夠 Cobbler介紹: Cobbler是一個Linux服務器快速網絡安裝的服務,而且在經過調整也可以支持網絡安裝windows。該工具使用python開發,小巧輕便(才15k行python代碼),可

syslog、日誌服務器安裝、卸載、如何安裝卸載EventLog Analyzer

技術 src 如何 RoCE analyze sys ESS watermark 詳解 syslog、日誌服務器安裝、卸載詳解、如何安裝和卸載EventLog Analyzer

1U服務器2U服務器哪個更好?

體積 cto 發揮 使用 服務器 承擔 個數 最大 lar 機架式服務器的外形看來不像計算機,而像交換機,有1U(1U=1.75英寸)、2U、4U等規格。 在很多企業在選擇機架式服務器的時候都比較糾結的是,到底是選1U還是2U好呢? 現在我們就來詳細了解一下1U機架式服務

基於接口回調JUC中CallableFutureTask實現原理

cnblogs blog 異步編程 但是 迷糊 對象 extend href 增加 Callable接口和FutureTask實現類,是JUC(Java Util Concurrent)包中很重要的兩個技術實現,它們使獲取多線程運行結果成為可能。它們底層的實現,就是基於接口

一篇文章大資料技術應用場景

什麼是大資料 說起大資料,估計大家都覺得只聽過概念,但是具體是什麼東西,怎麼定義,沒有一個標準的東西,因為在我們的印象中好像很多公司都叫大資料公司,業務形態則有幾百種,感覺不是很好理解,所以我建議還是從字面上來理解大資料,在維克托邁爾-舍恩伯格及肯尼斯庫克耶編寫的《大資料時代》提到了大資料的4個特徵:

SELECT INTO INSERT INTO SELECT 兩種表複製語句(SQL資料庫Oracle資料庫的區別)

https://www.cnblogs.com/mq0036/p/4155136.html 我們經常會遇到需要表複製的情況,如將一個table1的資料的部分欄位複製到table2中,或者將整個table1複製到table2中,這時候我們就要使用SELECT INTO 和 INSER