1. 程式人生 > >Spark RDD API 參考示例(三)

Spark RDD API 參考示例(三)

28、getCheckpointFile

原型
def getCheckpointFile: Option[String]

含義
getCheckpointFile 返回RDD的checkpoint 檔案的路徑,主要用於對大型計算中恢復到指定的節點

示例

//設定CheckPoint的路徑,前提是路徑一定要存在
sc.setCheckpointDir("hdfs://192.168.10.71:9000/wc")
val a = sc.parallelize(1 to 500, 5)
val b = a++a++a++a++a

//獲取b的歷史 checkpoint 檔案路徑
b.getCheckpointFile
//目前沒有checkpoint檔案
res5: Option[String] = None //設定checkpoint,但不會立馬提交,rdd具有延遲的特點 b.checkpoint b.getCheckpointFile res10: Option[String] = None //使用action運算元時,才會真正提交checkpoint b.collect //獲取上面提交的checkpoint檔案路徑 b.getCheckpointFile res15: Option[String] = Some(hdfs://192.168.10.71:9000/wc/e7f2340a-b37b-4d97-8b48-58253e6e4464/rdd-133)

29、getStorageLevel

原型
def getStorageLevel

含義
getStorageLevel 返回RDD當前的儲存級別,儲存級別一旦確定,就不能再修改了。

示例

val a = sc.parallelize(1 to 100000, 2)
//表示目前RDD使用的儲存級別是儲存在記憶體中,未序列化,儲存1份
a.getStorageLevel
res1: org.apache.spark.storage.StorageLevel = StorageLevel(memory, deserialized, 1 replicas)

//可以事先指定儲存級別
val a = sc.parallelize(1 to 100000, 2) a.persist(org.apache.spark.storage.StorageLevel.DISK_ONLY) a.getStorageLevel //表示儲存在磁碟中,儲存1份 res2: org.apache.spark.storage.StorageLevel = StorageLevel(disk, 1 replicas)

30、glom

原型
def glom(): RDD[Array[T]]

含義
glom 將RDD的每一個分割槽作為一個單獨的包裝,然後分割槽之間再包裝起來

示例

val a = sc.parallelize(1 to 10, 3)
a.glom.collect
//每一個分割槽作為一個單獨的包裝,然後分割槽之間再包裝起來
res1:  Array(Array(1, 2, 3), Array(4, 5, 6), Array(7, 8, 9, 10))

31、groupBy

原型
def groupBy[K: ClassTag](f: T => K, numPartitions: Int): RDD[(K, Iterable[T])]

含義
groupBy 將RDD中的資料按照指定的函式和分割槽數量,來進行分組。

示例

val a = sc.parallelize(1 to 9, 3)
//groupBy的第一個引數是一個函式,用於指定分組條件。分類標籤由條件返回值給定
//這裡會根據條件返回 "even" 和 "odd"
a.groupBy(x => { if (x % 2 == 0) "even" else "odd" }).collect
res1: Array((even,CompactBuffer(2, 8, 4, 6)), (odd,CompactBuffer(5, 1, 3, 7, 9)))

//這裡的返回標籤為 0 ,1 ,2
a.groupBy(x =>(x % 3)).collect
res2:Array((0,CompactBuffer(3, 9, 6)), (1,CompactBuffer(4, 1, 7)), (2,CompactBuffer(2, 8, 5)))

//自定義函式進行分組
val a = sc.parallelize(1 to 9, 3)
def myfunc(a: Int) : Int =
{
  a % 2
}
//groupBy中的第二個引數是指定,分組後將結果儲存在幾個分割槽中,預設分割槽數量和RDD元素分割槽數量相等
a.groupBy(x => myfunc(x), 3).collect
res2: Array((0,ArrayBuffer(2, 4, 6, 8)), (1,ArrayBuffer(1, 3, 5, 7, 9)))
a.groupBy(x => myfunc(x), 3).partitions.length
res4: Int = 3

//指定結果分割槽數量為1
a.groupBy(myfunc(_), 1).collect
res3: Array((0,ArrayBuffer(2, 4, 6, 8)), (1,ArrayBuffer(1, 3, 5, 7, 9)))
a.groupBy(myfunc(_), 1).partitions.length
res5: Int = 1

32、groupByKey [Pair]

原型
def groupByKey(): RDD[(K, Iterable[V])]
def groupByKey(numPartitions: Int): RDD[(K, Iterable[V])]
def groupByKey(partitioner: Partitioner): RDD[(K, Iterable[V])]

含義
groupByKeygroupBy 非常相似,不提供函式功能,只是按照key來進行分組,相同的key分在一組,相比於groupBy 要簡單

示例

val a = sc.parallelize(List("dog", "tiger", "lion", "cat", "spider", "eagle"), 2)
//生成一個以單詞長度作為key,單詞作為value的 元組
val b = a.keyBy(_.length)
//groupByKey不提供函式功能,直接按照Key進行分類
b.groupByKey.collect
res1: Array[(Int, Seq[String])] = Array((4,ArrayBuffer(lion)), (6,ArrayBuffer(spider)), (3,ArrayBuffer(dog, cat)), (5,ArrayBuffer(tiger, eagle)))

33、histogram [Double]

原型
def histogram(bucketCount: Int): Pair[Array[Double], Array[Long]]
def histogram(buckets: Array[Double], evenBuckets: Boolean = false): Array[Long]

含義
histogram 根據RDD中的資料生成一個隨機的直方圖,RDD中的資料作為橫座標,系統自動生成一個縱座標,有兩種方式生成橫座標,第一種指定需要幾個柱,第二種,給定橫座標個數。

示例

//根據給定的柱子數量來確定座標
val a = sc.parallelize(List(9.1, 1.0, 1.2, 2.1, 1.3, 5.0, 2.0, 2.1, 7.4, 7.5, 7.6, 8.8, 10.0, 8.9, 5.5), 3)
a.histogram(6)
//表示需要7個橫座標點,生成6個柱
res1: (Array[Double], Array[Long]) = (Array(1.0, 2.5, 4.0, 5.5, 7.0, 8.5, 10.0),Array(6, 0, 1, 1, 3, 4))

//根據使用者指定的橫座標來確定
val a = sc.parallelize(List(1.1, 1.2, 1.3, 2.0, 2.1, 7.4, 7.5, 7.6, 8.8, 9.0), 3)
a.histogram(Array(0.0, 3.0, 8.0))
res2: Array[Long] = Array(5, 3)

34、id

原型
val id: Int

含義
id 獲取系統分配給RDD的編號,這個編號可以用於查詢指定的的RDD

示例

val y = sc.parallelize(1 to 10, 10)
y.id
res1: Int = 19

35、intersection

原型
def intersection(other: RDD[T], numPartitions: Int): RDD[T]
def intersection(other: RDD[T], partitioner: Partitioner)(implicit ord: Ordering[T] = null): RDD[T]
def intersection(other: RDD[T]): RDD[T]

含義
intersection 求兩個集合中相同的元素,也就是求二者的交集

示例

//普通元素求交集
val x = sc.parallelize(1 to 20)
val y = sc.parallelize(10 to 30)
val z = x.intersection(y)
//求兩個集合的交集
z.collect
res1: Array[Int] = Array(16, 12, 20, 13, 17, 14, 18, 10, 19, 15, 11)

//兩個元組求交集
val x = sc.parallelize(List(("cat",2),("wolf",1),("gnu",1)))
val y = sc.parallelize(List(("cat",1),("wolf",1),("mouse",1)))
val z = x.intersection(y)
z.collect
//只有完全相同的元組才算相同元素
res2: Array[(String, Int)] = Array((wolf,1))

36、isCheckpointed

原型
def isCheckpointed: Boolean

含義
isCheckpointed 檢測一個RDD是否已經存在檢查點

示例

//設定檢查點
val c = sc.parallelize(1 to 10)
sc.setCheckpointDir("hdfs://192.168.10.71:9000/wc")
c.isCheckpointed
res1: Boolean = false

//延遲執行,只有執行action運算元時,才會執行checkpoint
c.checkpoint
c.isCheckpointed
res2: Boolean = false

//執行action運算元,生成checkpoint
c.collect
c.isCheckpointed
res3: Boolean = true

37、join [Pair]

原型
def join[W](other: RDD[(K, W)]): RDD[(K, (V, W))]
def join[W](other: RDD[(K, W)], numPartitions: Int): RDD[(K, (V, W))]
def join[W](other: RDD[(K, W)], partitioner: Partitioner): RDD[(K, (V, W))]

含義
join 用於兩個key-value型別的RDD的內連線操作,類似於資料庫中的內連線。只有兩者的key相同時,才會連線

示例

val a = sc.parallelize(List("dog", "salmon", "salmon", "rat", "elephant"), 3)
val b = a.keyBy(_.length)
val c = sc.parallelize(List("dog","cat","gnu","salmon","rabbit","turkey","wolf","bear","bee"), 3)
//相同的key,就能連線在一起
val d = c.keyBy(_.length)
b.join(d).collect 

res0: Array[(Int, (String, String))] = Array((6,(salmon,salmon)), (6,(salmon,rabbit)), (6,(salmon,turkey)), (6,(salmon,salmon)), (6,(salmon,rabbit)), (6,(salmon,turkey)), (3,(dog,dog)), (3,(dog,cat)), (3,(dog,gnu)), (3,(dog,bee)), (3,(rat,dog)), (3,(rat,cat)), (3,(rat,gnu)), (3,(rat,bee)))

38、keyBy

原型
def keyBy[K](f: T => K): RDD[(K, T)]

含義
keyBy 指定一個函式產生特定的資料作為RDD的key,這個函式可以自定義,主要目的是產生一個元組。

示例

val a = sc.parallelize(List("dog", "salmon", "salmon", "rat", "elephant"), 3)
//指定每個單詞的長度作為RDD中元素的Key
val b = a.keyBy(_.length)
b.collect
res1: Array[(Int, String)] = Array((3,dog), (6,salmon), (6,salmon), (3,rat), (8,elephant))

39、keys [Pair]

原型
def keys: RDD[K]

含義
keys 獲取RDD中元組的key,這些key可以重複出現

示例

val a = sc.parallelize(List("dog", "tiger", "lion", "cat", "panther", "eagle"), 2)
val b = a.map(x => (x.length, x))
b.keys.collect
//可以重複出現
res2: Array[Int] = Array(3, 5, 4, 3, 7, 5)

40、leftOuterJoin [Pair]

原型
def leftOuterJoin[W](other: RDD[(K, W)]): RDD[(K, (V, Option[W]))]
def leftOuterJoin[W](other: RDD[(K, W)], numPartitions: Int): RDD[(K, (V, Option[W]))]
def leftOuterJoin[W](other: RDD[(K, W)], partitioner: Partitioner): RDD[(K, (V, Option[W]))]

含義
leftOuterJoin 類似於資料庫中的左外連線,以左邊作為標準,右邊沒有的填缺失值,左邊沒有的右邊有,捨棄掉。

示例

val a = sc.parallelize(List(("dog",2),("salmon",2),("rat",1),("elephant",10)),3)
val b = sc.parallelize(List(("dog",2),("salmon",2),("rabbit",1),("cat",7)), 3)
a.leftOuterJoin(b).collect

//左邊有的,在結果集中都有,左邊沒有的,右邊都捨棄掉。以左邊作為參考標準
res1:Array((rat,(1,None)), (salmon,(2,Some(2))), (elephant,(10,None)), (dog,(2,Some(2))))

41、lookup

原型
def lookup(key: K): Seq[V]

含義
lookup 檢視指定key的value值,通過全表掃描來實現

示例

val a = sc.parallelize(List("dog", "tiger", "lion", "cat", "panther", "eagle"), 2)
val b = a.map(x => (x.length, x))
b.lookup(5)
//通過全表掃描來查詢 key=5 的值
res1: Seq[String] = WrappedArray(tiger, eagle)

相關推薦

Spark RDD API 參考示例

28、getCheckpointFile 原型 def getCheckpointFile: Option[String] 含義 getCheckpointFile 返回RDD的che

Spark RDD API 參考示例

1、aggregate 原型 def aggregate[U: ClassTag](zeroValue: U)(seqOp: (U, T) => U, combOp: (U, U) => U): U 含義 aggregate是一個聚

Spark RDD API 參考示例

57、sample 原型 def sample(withReplacement: Boolean, fraction: Double, seed: Int): RDD[T] 含義 sa

Spring Boot參考教程內部應用監控Actuator

使用方式 實現類 pat igp pid localhost aid 倉庫 默認 3. 內部應用監控(Actuator) 如上2.4中所述,傳統spring工程中工程的初始化過程,bean的生命周期,應用的內部健康情況均無法監控,為了解決這個問題,spring boot提供

WebDriver API 元素定位

IE sele webdriver 否則 IV HR 元素 content pytho 將元素滾定到可見區域 iframe切換 將元素滾定到可見區域   web頁面不能一次顯示全部全部顯示,需要借助滾定來查看相應的顯示;selenium進行操作時,需要在可視範圍內進

【從零開始搭建自己的.NET Core Api框架】集成輕量級ORM——SqlSugar:3.3 自動生成實體類

i++ 點運算 自己的 yui content project style ref 數據庫表 系列目錄 一. 創建項目並集成swagger   1.1 創建   1.2 完善 二. 搭建項目整體架構 三. 集成輕量級ORM框架——SqlSugar   3.1 搭建環境  

Linux shell示例

\n str ons echo return NPU inux pre ech example No. 1作為shell編寫人員,要避免使用者輸入數據的格式不正確,也就是要在shell腳本發布前攔截可能的錯誤;本示例是對使用者輸入的數據(或位置變量)進行判斷是否可用,然後在

Zookeeper C API應用示例3——配置管理非同步API

場景描述同:https://blog.csdn.net/qq_41688455/article/details/83780854 服務端程式碼如下: #include <stdio.h> #include <unistd.h> #include <std

Zookeeper C API應用示例1——配置管理同步API

場景描述 服務端監控/configure目錄; 客戶端對/configure目錄讀/寫資料,建立/刪除子節點 服務端: 監控/configure目錄,有資料更新時,輸出/configure中的資料;子節點建立/刪除時,服務程式列出當前的子目錄列表。 程式碼如下: #include &

ArcGIS API For JavaScriptQueryTask&IdentifyTask實現空間查詢

ArcGIS API For JavaScript(三)QueryTask&IdentifyTask實現空間查詢 1、通過QueryTask()實現空間查詢 在地圖上畫一個多邊形,將和多邊形相交的要素找出並高亮顯示,之後動態新增表格,將要素的名稱依次顯示在表格中   <

Java呼叫C++ API完整示例dll

最近有一個和香港的對接專案。在通訊問題上出現了卡殼。港方提供的是一個java庫,需要和我們這邊進行交易策略對接。交易策略是以協議的方式,通過網路通訊傳送到我們的系統。由於我們缺少穩定可靠的java通訊元件,但我們具有一個非常可靠的C++通訊元件。因此就萌發了將現有的C++通訊元件封裝為可供

Spark Streaming狀態管理函式——MapWithState的使用scala版

MapWithState   關於mapWithState   注意事項   示例程式碼   執行   結論    關於mapWithState   需要自己寫一個匿名函式func來實現自己想要的功能。如果有初始化的值得需要,可以使用initia

balance transfer 解析及api深度追蹤加入通道

一 程式碼解析 var util = require(‘util’); var path = require(‘path’); var fs = require(‘fs’); var Peer = require(‘fabric-client/lib/Peer.

《大話設計模式》Java程式碼示例之裝飾模式

裝飾模式(Decorator):動態地給一個物件新增一些額外的職責,就增加功能來說,裝飾模式比生成子類更為靈活。 package decorator; /** * 裝飾模式(Decorator) * Person類 */ public class Perso

spark RDD的元素順序ordering測試

通過實驗發現: foreach()遍歷的順序是亂的 但: collect()取到的結果是依照原順序的 take()取到的結果是依照原順序的 為什麼呢???? 另外,可以發現: take()

spark機器學習筆記:Spark Python構建推薦系統

輸出結果: [[Rating(user=789, product=1012, rating=4.0), Rating(user=789, product=127, rating=5.0), Rating(user=789, product=475, rating=5.0), Rating(us

spark原始碼閱讀筆記DatasetstructField、structType、schame

StructType(fields: Seq[StructField]) 一個StructType物件,可以有多個StructField,同時也可以用名字(name)來提取,就想當於Map可以用key來提取value,但是他StructType提取的是整條欄位的資訊 在原始碼中structType是一個cas

API 系列教程:使用 API Resource 來建立自定義 JSON 格式的 API

上一篇教程中我們通過 jwt-auth 實現了 Laravel 的 API 認證。 使用者請求登入介面 http://apidemo.test/api/auth/login 登入成功後,獲取到 JSON 響應,響應頭會帶有 token 資訊。 Authorizatio

C#呼叫百度地圖API經驗分享

    這一篇我將跟大家分享一下我自己在開發過程中總結出的一些操作地圖的方法,屬性,及思路,希望可以讓大家少走彎路。 1.定位 一般百度的示例DEMO裡開始初始化地圖時用的都是map.centerAn

sqler sql 轉rest api 原始碼解析 rest協議

rest 服務說明 rest 協議主要是將配置檔案中的巨集暴露為rest 介面,使用了labstack/echo web 框架,同時基於context 模型 進行巨集管理物件的共享,同時進行了一些中介軟體的註冊 cors RemoveTrailingSlash gzip Recover rest 啟動