spark部分運算元的彙總大全(包含Transformations類運算元,action類運算元,持久化運算元等 )【文字說明+Scala程式碼+程式碼連結】
一.Spark中的運算元總結(原理)
Spark運算元
1).Transformations ,轉換運算元,懶執行,需要Action類運算元觸發。
map/mapToPair,flatMap,filter,reduceByKey,sample,sortBy/sortByKey,groupByKey,join,leftOutJoin,rightOuterJoin,fullOuterJoin,distinct,union,intersection,subtract,repartition,coalesce,zip,zipWithIndex,mapPartitions,
mapPartitionWithIndex,cogroup,mapValues,aggreagateByKey,combineByKey
2).Action,行動運算元,觸發Action類運算元執行。Spark應用程式中(Spark Application)有一個Action運算元就有了一個job。
take,frist,foreach,count,collect,reduce,foreachPartition,countByKey,countByValue
3).持久化運算元。
a).cache
預設將資料持久化到記憶體,cache()=persist()=persist(StorageLevel.MEMORY_ONLY)
b).persist
可以手動指定資料持久化級別。
MEMORY_ONLY
MEMORY_ONLY_SER
MEMORY_AND_DISK
MEMORY_AND_DISK_SER
"_2"代表有副本數,儘量避免使用"DISK_ONLY"級別。
c).checkpoint
將資料可以持久化到磁碟,指定的checkpoint目錄中,切斷checkpointRDD之前的依賴關係,使之後的RDD依賴於checkpoint目錄中的資料。需要設定checkpoint路徑。
RDD lineage 非常長,每一個RDD之間邏輯複雜,計算耗時。對一個RDD進行checkpoint之前最好先cache下。
注意:
a).cache和persist注意事項:
i).cache和persist是懶執行,需要Action運算元觸發。
ii).對一個RDD進行cache/persist之後,可以賦值給一個變數,下次直接使用這個變數就是使用的持久化的資料。
iii).cache/persist之後不能緊跟Action類運算元。
b).checkpoint執行流程:
i).Spark任務執行完成之後,會從後往前回溯,找到CcheckpointRDD做標記。
ii).回溯完成之後,重新計算標記RDD的資料,將資料放入checkpoint目錄中。
iii).切斷RDD之間的依賴關係。
————————————————————————————————————————————————————————
二.運算元例項
package com.bjsxt import org.apache.spark.SparkConf import org.apache.spark.SparkContext import scala.collection.mutable.ListBuffer object Suanzi { def main(args: Array[String]): Unit = { val conf=new SparkConf().setAppName("test").setMaster("local"); val sc=new SparkContext(conf) val rdd2=sc.parallelize(Array( "love1","love2","love3","love4", "love5","love6","love7","love8", "love9","love10","love11","love12" ),3) rdd2.foreach(println) println("_________________________________--") /** * repartition重新分割槽 * 增加分割槽 * 是一個shuffle類的運算元 */ val repartition1=rdd2.repartition(5) repartition1.foreach(println) println("+++++++++++++++++++++++++++++++++++++") /** * repartition重新分割槽 * 減少分割槽 * 是一個shuffle類的運算元 */ val repartition2=rdd2.repartition(1) repartition2.foreach(println) /** * Coalesce * 可以增多,也可以減少分割槽 * 底層有shuffle,但是需要設定,預設是false關閉的 * */ println("##########################################") val coalesce1=rdd2.coalesce(6, false) coalesce1.foreach(println) /** * Coalesce * 可以增多,也可以減少分割槽 * 底層有shuffle,但是需要設定,預設是false關閉的 * */ println("%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%") val coalesce2=rdd2.coalesce(6, true) coalesce2.foreach(println) /** * Coalesce * 可以增多,也可以減少分割槽 * 底層有shuffle,但是需要設定,預設是false關閉的 * */ println("PPPPPPPPPPPPPPPPPPPPPPPPPPPPPPPPPPPPPPPPPPPPP") val coalesce3=rdd2.coalesce(2, true) coalesce3.foreach(println) println("*****************************************************") /** * mapPartitionsWithIndex * 根據索引進行分割槽 */ val mapPartitionsWithIndex = rdd2.mapPartitionsWithIndex((index,iter)=>{ val list=ListBuffer[String]() while(iter.hasNext){ val next=iter.next() println("parallelize index=["+index+"],value=["+next+"]") list.+=(next) } list.iterator },false).collect() println("^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^") /** * reduce,將value的值進行累計求和 */ val rdd1=sc.parallelize(Array(("a",200),("b",200),("a",300),("d",400),("a",200),("b",200),("a",300),("d",400))) val result=rdd1.reduce((t1,t2)=>{ val key=t1._1+"~"+t2._1 val value=t1._2+t2._2 (key,value) }) println(result) println("?????????????????????????????????????????????????") /** * countbyvalue * 根據value值不同進行求和計算 */ val result1=rdd1.countByValue() result1.foreach(println) /** * countbykey * 根據value值不同進行求和計算 */ println("^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^") val result2=rdd1.countByKey() result2.foreach(println) /** * groupByKey * 按照key的值不同進行分組 * 將value的值放在一起 */ println("""""""""""""""""""""""""""""""""""""""""""""""""") rdd1.groupByKey().foreach(println) /** * zip的意思就是合併成(k,v)格式 * 即第一個rdd的值為key,第二個RDD的值為value */ println("zip$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$") val rdd3=sc.parallelize(Array( "love1","love2","love3","love4", "love5","love6","love7","love8", "love9","love10","love11","love12" )) val rdd4=sc.parallelize(Array( "love1","love2","love3","love4", "love5","love6","love7","love8", "love9","love10","love11","love12" )) val zip=rdd3.zip(rdd4) zip.foreach(println) /** * zipWithIndex */ println("parallelize##########@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@") val parallelize=sc.parallelize(Array( "love1","love2","love3","love4", "love5","love6","love7","love8", "love9","love10","love11","love12" ),3) val zipWithIndex= parallelize.zipWithIndex() zipWithIndex.foreach(println) /** * mapPartitionsWithIndex和coalesce的混合使用 */ println("mapPartitionsWithIndex和coalesce的混合使用") val mapPartitionsWithIndex1 = rdd3.mapPartitionsWithIndex((index,iter)=>{ val list=ListBuffer[String]() while(iter.hasNext){ val next=iter.next() println("parallelize index=["+index+"],value=["+next+"]") list.+=(next) } list.iterator },false) val coalesce=mapPartitionsWithIndex1.coalesce(7,true) val result0=coalesce.mapPartitionsWithIndex((index,iter)=>{ val list=ListBuffer[String]() while(iter.hasNext){ val next=iter.next() println("parallelize index=["+index+"],value=["+next+"]") list.+=(next) } list.iterator },true).collect() result0.foreach(println) } }
用心做事,只為方便你我,鼓勵一下我唄!
————————————————————————————————————————————————————————
Repartitions運算元:
重新分割槽
重新分割槽,可以增多分割槽,可以減少分割槽
是一個有shuffle類的運算元
重新分割槽,可以增多分割槽,可以減少分割槽
是一個有shuffle類的運算元
Coalesce也是重新分割槽,可以增多,也可以減少分割槽,還可以有一個過載的方法
可以產生shuffle,也可以不產生shuffle,預設是不產生shuffle的
Repartition底層封裝的是coalesce
Coalesce叢小的分割槽到大的分割槽,同時不讓產生shuffle,這樣是不起作用的。
增多分割槽經常repartition
減少分割槽經常用coalesce
用coalesce,同時不設定true產生shuffle,這樣子是不產生分割槽的。
Repartition重新分割槽,可多可少,預設有shuffle
coalesce重新分割槽,可多可少,可以設定分割槽是否產生shuffle,coalesce(numpartition,shuffle=true/false),預設是false
由少的分割槽到多的分割槽,不讓產生shuffle,是不起作用的
Mappartition with index
Groupbykey:
將tuple型別的list轉換成用(K,V)格式的RDD用parallelizepares
依據key的值,將所有的value的值放在一起。
zipwithIndex:轉換成(k,v)格式的RDD
Zip就是將兩個RDD壓縮成一個RDD
Tuple型別的RDD也可以壓縮成k,v
Groupbykey
Reduce:累加求總數
Countbykey:數一數相同的key有幾個,按照pairRDD,統計相同的key有幾個
Countbyvalue,可以作用到(k,v)格式的RDD上,也可以作用在非(K,V)格式的RDD上
相關推薦
spark部分運算元的彙總大全(包含Transformations類運算元,action類運算元,持久化運算元等 )【文字說明+Scala程式碼+程式碼連結】
一.Spark中的運算元總結(原理) Spark運算元 1).Transformations ,轉換運算元,懶執行,需要Action類運算元觸發。 map/mapToPair,flatMap,filter,reduceByKey,s
基於開源專案OpenCV的人臉識別Demo版整理(不僅可以識別人臉,還可以識別眼睛鼻子嘴等)【模式識別中的翹楚】
最近對人臉識別的程式非常感興趣,但是苦於沒有選修多媒體方向,看了幾篇關於人臉識別的論文,大概也沒看懂多少,什麼灰度處理啊,切割識別啊,雲裡霧裡,傻傻看不明白啊。各種苦惱。 於是就在網上找找,看有木有神馬開原始碼啊,要是有個現成的原始碼就更好了,百度it ,那些原始碼都憂傷的躲在CSDN中,老衲還
Android View的繼承體系大全(包含125個view的所有子類)(by 星空武哥)
安卓的view是一個龐大是繼承體系,今天花了一個晚上我總結一下view的繼承體系。view的直接繼承子類有12個,間接繼承子類有113個。今天我就總結一下所有的view的子類,希
Java版運算元彙總(包括filter,collect,take,first,sample等)【Java純程式碼】
package com.bjsxt; import java.util.List; import org.apache.spark.SparkConf; import org.apache.spark.api.java.JavaRDD; import org.apache
ROS知識(16)----如何編譯時自動鏈接同一個工作空間的其他包的頭文件(包含message,srv,action自動生成的頭文件)
logs package fin 空間 依賴庫 osc div build 知識 catkin_make編譯時,往往需要自動鏈接同一個工作空間的其他包的頭文件。否則會出現類似如下的錯誤: /home/xx/xx_ws/srcA_package/src/db.hpp:13:
Java基礎------生成一個六位數的驗證碼(包含大寫字母、小寫字母、數字,並且不允許重復)?
參考 數組 rand 定義 ole ava length log post 問題描述:生成一個六位數的驗證碼(包含大寫字母、小寫字母、數字,並且不允許重復)? 參考代碼如下: import java.util.Arrays;import java.util.Random
Gradle打可執行Jar包(包含依賴第三方庫中的類)
使用Gradle來打Jar包,在引入Gradle的java外掛後,直接就能實現 在build.gradle檔案中引入java外掛 plugins { id 'java' } 然後配置maifest主類 jar { manifest { attributes "M
C語言~巨集操作大全(巨集定義、內建巨集、__FILE__、__LINE__、##用法)
當然巨集定義非常重要的,它可以幫助我們防止出錯,提高程式碼的可移植性和可讀性等。 下面列舉一些成熟軟體中常用得巨集定義 1,防止一個頭檔案被重複包含 #ifndef COMDEF_H #define COMDEF_H //標頭檔案內容 … #endif
SparkStreaming部分的學習(包括:sparkStreaming與storm的區別, Sparkstreaming處理資料的過程等)【業務邏輯圖及文字說明】
sparkStreaming與storm的區別: Sparkstreaming處理資料的過程: sparkstreaming:資料是一段時間處理的,是一個微批處理,這個時間是由自己人為設定的。sparkstreaming的吞吐量高。 Storm:是純實時處理資料的,
ios中常用的小技巧大全(總有你不知道的和你會用到的)
/*1*/ tempString = [tempString stringByReplacingOccurrencesOfString:@" " withString:@""]; /*2 */tempString = [tempString stringByReplacingOccurrencesOfStri
Spark的RDD連續轉換操作有時需要注意強行觸發action執行操作,否則(Tansformation)的惰性(lazy)機制會導致結果錯誤
最近通過spark做一些資料處理,遇到一些詭異的現象 我開發了一個隨機生成海量資料點的程式,因為要保證這些點具有自增序號,不適合直接map分散式做(幾十億的資料,map計算需要分割槽(不主動分割槽估計也會自動分割槽,spark自帶的資料累加邏輯只能對單個partitio
eclipse中使用spring boot 入門開發(包含:與jsp頁面和資料庫互動,cmd打包執行war包)
突然想到自己有一段時間沒使用spring boot了,熟悉了一下之後決定記錄一下這次使用的注意點 一:使用springBoot搭出來一個架子(從前端到資料庫) 1.eclipse已經整合了maven,所以新建一個maven專案,然後針對專案修改下jdk相關點 2.目錄如
Android------視訊播放器(包含全屏播放,快退,快進,騰訊新聞的列表播放等)
前段時間做了一個新聞APP,涉及到了列表視訊播放,和騰訊新聞APP差不多,總結了一下程式碼,寫了一個Demo來分享給大家。用了 TabLayout+RecylerView+自定義視訊控制元件 完成的 列表中支援全屏播放來看看效果圖: 列表類程式碼:public clas
(19)ASP.NET Core EF建立模型(包含屬性和排除屬性、主鍵、生成的值)
1.什麼是Fluent API? EF中內嵌的約定將POCO類對映到表。但是,有時您無法或不想遵守這些約定,需要將實體對映到約定指示外的其他物件,所以Fluent API和註解都是一種方法,這兩種方法是用來配置EF在對映屬性時繞開約定。Code first fluent API最常訪問通過重寫OnModel
Cmake新手使用日記(1)【C++11下的初體驗】
pen 如何 其他 err ++ targe 使用 可執行文件 使用教程 第一次使用Cmake,搜索了很多使用教程,包括《Cmake實踐》、《Cmake手冊》等,但是在針對最新的C++11條件下編程還是會存在一點點問題,需要實驗很多次錯誤並搜索大量文章才能解決問題。這裏
純CSS畫的基本圖形(矩形、圓形、三角形、多邊形、愛心、八卦等)
技術分享 部分 fin display 三角形 spl back transform 純css 今天在css-tricks上看到一篇文章,那篇文章讓我不禁心頭一震,強大的CSS啊,居然能畫出這麽多基本的圖形。圖形包括基本的矩形、圓形、橢圓、三角形、多邊形,也包括稍微復雜一點
ArcGIS API for JavaScript3.x 學習筆記[4] 加載底圖(三)【Open Street Map開放街道地圖】
asc 裏的 指定 訪問 utf-8 gis sca utf 同方 Open Street Map OpenStreetMap(簡稱OSM,中文是開放街道地圖)是一個網上地圖協作計劃,目標是創造一個內容自由且能讓所有人編輯的世界地圖。 OSM是一款由網絡大眾共同打造的免費開
語音識別學習筆記(二)【基於向量量化的識別技術】
語音識別學習筆記(二)【基於向量量化的識別技術】 概述 量化分為標量量化和向量量化(Vector Quantization,VQ)。標量量化是將取樣後的訊號值逐個進行量化,而適量量化是將若干個取樣訊號分成一組,即構成一個向量,然後對此向量一次進行量化。向量量化
04 -pandas索引的堆(行列操作,交換行列)、聚合操作(求和、最大值、最小值、平均值等)
引入模組 import pandas as pd from pandas import Series,DataFrame import matplotlib.pyplot as plt 建立示例DataFrame # 用作案例 不要刪 !!! data=np.random.ra
PAT 1123—— Is It a Complete AVL Tree(平衡二叉樹)【左旋右旋各種旋】
#include <cstdio> #include <algorithm> #include <vector> #include <iostream> #include <queue> using namespace std;