1. 程式人生 > >Spark RDD Transformation 詳解---Spark學習筆記7

Spark RDD Transformation 詳解---Spark學習筆記7

這幾天學習了Spark RDD transformation 和 action ,做個筆記記錄下心得,順便分享給大家。

1. 啟動spark-shell 

SPARK_MASTER=local[4] ./spark-shell.sh
Welcome to
      ____              __  
     / __/__  ___ _____/ /__
    _\ \/ _ \/ _ `/ __/  '_/
   /___/ .__/\_,_/_/ /_/\_\   version 0.8.1
      /_/                  


Using Scala version 2.9.3 (Java HotSpot(TM) 64-Bit Server VM, Java 1.6.0_20)
Initializing interpreter...
14/04/04 10:49:44 INFO server.Server: jetty-7.x.y-SNAPSHOT
14/04/04 10:49:44 INFO server.AbstractConnector: Started 
[email protected]
:5757 Creating SparkContext... 14/04/04 10:49:50 INFO slf4j.Slf4jEventHandler: Slf4jEventHandler started 14/04/04 10:49:50 INFO spark.SparkEnv: Registering BlockManagerMaster 14/04/04 10:49:50 INFO storage.DiskBlockManager: Created local directory at /tmp/spark-local-20140404104950-5dd2

2.我們就拿根目錄下的CHANGES.txt和README.txt檔案做示例吧。
scala> sc
res0: org.apache.spark.SparkContext = 
[email protected]
scala> val changes = sc.textFile("CHANGES.txt") 14/04/04 10:51:39 INFO storage.MemoryStore: ensureFreeSpace(44905) called with curMem=0, maxMem=339585269 14/04/04 10:51:39 INFO storage.MemoryStore: Block broadcast_0 stored as values to memory (estimated size 43.9 KB, free 323.8 MB) changes: org.apache.spark.rdd.RDD[String] = MappedRDD[1] at textFile at <console>:12 scala> changes foreach println 14/04/04 10:52:03 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable 14/04/04 10:52:03 WARN snappy.LoadSnappy: Snappy native library not loaded 14/04/04 10:52:03 INFO mapred.FileInputFormat: Total input paths to process : 1 14/04/04 10:52:03 INFO spark.SparkContext: Starting job: foreach at <console>:15 14/04/04 10:52:03 INFO scheduler.DAGScheduler: Got job 0 (foreach at <console>:15) with 1 output partitions (allowLocal=false) 14/04/04 10:52:03 INFO scheduler.DAGScheduler: Final stage: Stage 0 (foreach at <console>:15) 14/04/04 10:52:03 INFO scheduler.DAGScheduler: Parents of final stage: List() 14/04/04 10:52:03 INFO scheduler.DAGScheduler: Missing parents: List() 14/04/04 10:52:03 INFO scheduler.DAGScheduler: Submitting Stage 0 (MappedRDD[1] at textFile at <console>:12), which has no missing parents 14/04/04 10:52:03 INFO scheduler.DAGScheduler: Submitting 1 missing tasks from Stage 0 (MappedRDD[1] at textFile at <console>:12) 14/04/04 10:52:03 INFO local.LocalTaskSetManager: Size of task 0 is 1664 bytes 14/04/04 10:52:03 INFO executor.Executor: Running task ID 0 14/04/04 10:52:03 INFO storage.BlockManager: Found block broadcast_0 locally 14/04/04 10:52:03 INFO rdd.HadoopRDD: Input split: file:/app/home/hadoop/shengli/spark-0.8.1-incubating-bin-hadoop1/CHANGES.txt:0+65951 Spark Change Log Release 0.8.1-incubating d03589d Mon Dec 9 23:10:00 2013 -0800 Merge pull request #248 from colorant/branch-0.8 [Fix POM file for mvn assembly on hadoop 2.2 Yarn] 3e1f78c Sun Dec 8 21:34:12 2013 -0800 Merge pull request #195 from dhardy92/fix_DebScriptPackage [[Deb] fix package of Spark classes adding org.apache prefix in scripts embeded in .deb] c14f373 Sat Dec 7 22:35:31 2013 -0800 Merge pull request #241 from pwendell/master [Update broken links and add HDP 2.0 version string] 9c9e71e Sat Dec 7 12:47:26 2013 -0800 Merge pull request #241 from pwendell/branch-0.8 [Fix race condition in JobLoggerSuite [0.8 branch]] 92597c0 Sat Dec 7 11:58:00 2013 -0800 Merge pull request #240 from pwendell/master [SPARK-917 Improve API links in nav bar] cfca70e Sat Dec 7 01:15:20 2013 -0800 Merge pull request #236 from pwendell/shuffle-docs [Adding disclaimer for shuffle file consolidation]

現在要找出所有包含Merge的文字

filter

現在要找出所有包含Merge的文字

scala> val mergeLines = changes.filter(_.contains("Merge"))
mergeLines: org.apache.spark.rdd.RDD[String] = FilteredRDD[2] at filter at <console>:14


scala> mergeLines foreach println
14/04/04 10:54:52 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
14/04/04 10:54:52 WARN snappy.LoadSnappy: Snappy native library not loaded
14/04/04 10:54:52 INFO mapred.FileInputFormat: Total input paths to process : 1
14/04/04 10:54:52 INFO spark.SparkContext: Starting job: foreach at <console>:17
14/04/04 10:54:52 INFO scheduler.DAGScheduler: Got job 0 (foreach at <console>:17) with 1 output partitions (allowLocal=false)
14/04/04 10:54:52 INFO scheduler.DAGScheduler: Final stage: Stage 0 (foreach at <console>:17)
14/04/04 10:54:52 INFO scheduler.DAGScheduler: Parents of final stage: List()
14/04/04 10:54:52 INFO scheduler.DAGScheduler: Missing parents: List()
14/04/04 10:54:52 INFO scheduler.DAGScheduler: Submitting Stage 0 (FilteredRDD[2] at filter at <console>:14), which has no missing parents
14/04/04 10:54:52 INFO scheduler.DAGScheduler: Submitting 1 missing tasks from Stage 0 (FilteredRDD[2] at filter at <console>:14)
14/04/04 10:54:52 INFO local.LocalTaskSetManager: Size of task 0 is 1733 bytes
14/04/04 10:54:52 INFO executor.Executor: Running task ID 0
14/04/04 10:54:52 INFO storage.BlockManager: Found block broadcast_0 locally
14/04/04 10:54:52 INFO rdd.HadoopRDD: Input split: file:/app/home/hadoop/shengli/spark-0.8.1-incubating-bin-hadoop1/CHANGES.txt:0+65951
  Merge pull request #248 from colorant/branch-0.8
  Merge pull request #195 from dhardy92/fix_DebScriptPackage
  Merge pull request #241 from pwendell/master
  Merge pull request #241 from pwendell/branch-0.8
  Merge pull request #240 from pwendell/master
  Merge pull request #236 from pwendell/shuffle-docs
  Merge pull request #237 from pwendell/formatting-fix
  Merge pull request #235 from pwendell/master
  Merge pull request #234 from alig/master
  Merge pull request #199 from harveyfeng/yarn-2.2
  Merge pull request #101 from colorant/yarn-client-scheduler
  Merge pull request #191 from hsaputra/removesemicolonscala
  Merge pull request #178 from hsaputra/simplecleanupcode
  Merge pull request #189 from tgravescs/sparkYarnErrorHandling
  Merge pull request #232 from markhamstra/FiniteWait
  Merge pull request #231 from pwendell/branch-0.8
  Merge pull request #228 from pwendell/master
  Merge pull request #227 from pwendell/master
  Merge pull request #223 from rxin/transient
  Merge pull request #95 from aarondav/perftest
  Merge pull request #218 from JoshRosen/spark-970-pyspark-unicode-error
  Merge pull request #181 from BlackNiuza/fix_tasks_number
  Merge pull request #219 from sundeepn/schedulerexception
  Merge pull request #201 from rxin/mappartitions
  Merge pull request #197 from aarondav/patrick-fix
  Merge pull request #200 from mateiz/hash-fix
  Merge pull request #193 from aoiwelle/patch-1
  Merge pull request #196 from pwendell/master
  Merge pull request #174 from ahirreddy/master
  Merge pull request #166 from ahirreddy/simr-spark-ui
  Merge pull request #137 from tgravescs/sparkYarnJarsHdfsRebase
  Merge pull request #165 from NathanHowell/kerberos-master
  Merge pull request #153 from ankurdave/stop-spot-cluster
  Merge pull request #160 from xiajunluan/JIRA-923
  Merge pull request #175 from kayousterhout/no_retry_not_serializable
  Merge pull request #173 from kayousterhout/scheduler_hang

map

scala> mergeLines.map(line=>line.split(" "))
res2: org.apache.spark.rdd.RDD[Array[java.lang.String]] = MappedRDD[3] at map at <console>:17

scala> mergeLines.map(line=>line.split(" ")) take 10
14/04/04 11:05:24 INFO spark.SparkContext: Starting job: take at <console>:17
14/04/04 11:05:24 INFO scheduler.DAGScheduler: Got job 1 (take at <console>:17) with 1 output partitions (allowLocal=true)
14/04/04 11:05:24 INFO scheduler.DAGScheduler: Final stage: Stage 1 (take at <console>:17)
14/04/04 11:05:24 INFO scheduler.DAGScheduler: Parents of final stage: List()
14/04/04 11:05:24 INFO scheduler.DAGScheduler: Missing parents: List()
14/04/04 11:05:24 INFO scheduler.DAGScheduler: Computing the requested partition locally
14/04/04 11:05:24 INFO rdd.HadoopRDD: Input split: file:/app/home/hadoop/shengli/spark-0.8.1-incubating-bin-hadoop1/CHANGES.txt:0+65951
14/04/04 11:05:24 INFO spark.SparkContext: Job finished: take at <console>:17, took 0.010779038 s
res3: Array[Array[java.lang.String]] = Array(Array("", "", Merge, pull, request, #248, from, colorant/branch-0.8), Array("", "", Merge, pull, request, #195, from, dhardy92/fix_DebScriptPackage), Array("", "", Merge, pull, request, #241, from, pwendell/master), Array("", "", Merge, pull, request, #241, from, pwendell/branch-0.8), Array("", "", Merge, pull, request, #240, from, pwendell/master), Array("", "", Merge, pull, request, #236, from, pwendell/shuffle-docs), Array("", "", Merge, pull, request, #237, from, pwendell/formatting-fix), Array("", "", Merge, pull, request, #235, from, pwendell/master), Array("", "", Merge, pull, request, #234, from, alig/master), Array("", "", Merge, pull, request, #199, from, harveyfeng/yarn-2.2))

可以看出,map最後輸出的資料集是Array(array1,array2....)這樣的巢狀陣列
scala> val splitedLine = mergeLines.map(line=>line.split(" "))
splitedLine: org.apache.spark.rdd.RDD[Array[java.lang.String]] = MappedRDD[5] at map at <console>:16

flatMap

flatMap其實就是將資料集給扁平化了,變成了1個Seq或者Array
scala> changes.flatMap(line=>line.split(" ")) take 10
14/04/04 11:18:26 INFO spark.SparkContext: Starting job: take at <console>:15
14/04/04 11:18:26 INFO scheduler.DAGScheduler: Got job 17 (take at <console>:15) with 1 output partitions (allowLocal=true)
14/04/04 11:18:26 INFO scheduler.DAGScheduler: Final stage: Stage 17 (take at <console>:15)
14/04/04 11:18:26 INFO scheduler.DAGScheduler: Parents of final stage: List()
14/04/04 11:18:26 INFO scheduler.DAGScheduler: Missing parents: List()
14/04/04 11:18:26 INFO scheduler.DAGScheduler: Computing the requested partition locally
14/04/04 11:18:26 INFO rdd.HadoopRDD: Input split: file:/app/home/hadoop/shengli/spark-0.8.1-incubating-bin-hadoop1/CHANGES.txt:0+65951
14/04/04 11:18:26 INFO spark.SparkContext: Job finished: take at <console>:15, took 0.003913527 s
res26: Array[java.lang.String] = Array(Spark, Change, Log, "", Release, 0.8.1-incubating, "", "", "", d03589d)

union

changes.union(changes) foreach println

groupByKey, reduceByKey

scala> val wordcount = changes.flatMap(line=>line.split(" ")).map(word=>(word,1))
wordcount: org.apache.spark.rdd.RDD[(java.lang.String, Int)] = MappedRDD[22] at map at <console>:14
map後打印出來如下:
(#534,1)
(from,1)
(stephenh/removetrycatch,1)
(,1)
(,1)
([Remove,1)
(try/catch,1)
(block,1)
(that,1)
(can't,1)
(be,1)
(hit.],1)
(,1)
(,1)
(,1)

用groupByKey來形成shuffle後的結果

wordcount.groupByKey() foreach println

這裡的key是[Automatically,LogQuery. 
value是一個數組list[v]

([Automatically,ArrayBuffer(1))
(LogQuery,ArrayBuffer(1))
(2d3eae2,ArrayBuffer(1))
(#130,ArrayBuffer(1))
(8e9bd93,ArrayBuffer(1))
(8,ArrayBuffer(1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1))

用reduceByKey來計算wordcount,個人感覺reduceByKey = groupByKey + aggregate function on list[v]
wordcount.reduceByKey(_+_) foreach println
結果如下:
(instructions,2)
(Adds,1)
(ssh,1)
(#691,1)
(#863,1)
(README],2)
(#676,1)
(javadoc,1)
(#571,1)
(0f1b7a0,1)
(shimingfei/joblogger,1)
(links,2)
(memory-efficient,1)
(pwendell/akka-standalone,1)
(hsaputra/update-pom-asf,1)
(method],1)
(mapPartitionsWithIndex],1)
(key-value,1)
(22:19:00,1)
(sbt,4)
(e5b9ed2,1)
(loss,1)
(stephenh/volatile,1)
(code,6)

distinct

distinct類似java中的set,去重。
scala> wordcount.count()
res43: Long = 12222

wordcount.distinct.count()
res44: Long = 3354

sortByKey

依據key排序,true為升序,false降序
wordcount.sortByKey(true) take 10
res10: Array[(java.lang.String, Int)] = Array(("",1), ("",1), ("",1), ("",1), ("",1), ("",1), ("",1), ("",1), ("",1), ("",1))

wordcount.sortByKey(false) take 10
res11: Array[(java.lang.String, Int)] = Array(({0,1},1), (zookeeper,1), (zip,1), (zip,1), (zip,1), (zero-sized,1), (zero,1), (yarn],1), (yarn.version,1), (yarn.version,1))

join

這裡為了避免""值在join的影響,過濾掉""元素
val wordcount = changes.flatMap(line=>line.split(" ")).filter(_!="").map(word=>(word,1)).reduceByKey(_+_)

scala> val readme = sc.textFile("README.md")
scala> val readMeWordCount = readme.flatMap(_.split(" ")).filter(_!="").map(word=>(word,1)).reduceByKey(_+_)

scala> wordcount.join(readMeWordCount)
res3: org.apache.spark.rdd.RDD[(java.lang.String, (Int, Int))] = FlatMappedValuesRDD[36] at join at <console>:21

結果會將相同的key的value整合到一個list裡,即key,list[v from wordcount..., v from readMeWordCount...]
wordcount.join(readMeWordCount) take 20
res6: Array[(java.lang.String, (Int, Int))] = Array((at,(1,2)), (do,(1,1)), (by,(9,5)), (ASF,(2,1)), (adding,(3,1)), (all,(2,1)), (versions,(6,4)), (sbt/sbt,(1,6)), (under,(2,2)), (set,(7,1)))

cogroup

這個操作是將兩個資料集join後,相同的key,的value會變成[seq1 from left],[seq from right]
wordcount.cogroup(readMeWordCount,2).filter(_._1==("do")) take 10
res18: Array[(java.lang.String, (Seq[Int], Seq[Int]))] = Array((do,(ArrayBuffer(1),ArrayBuffer(1))))
這兩個ArrayBuffer第一個是來自左表,第二個來自右表。

cartesian

笛卡爾積,,,你懂的。。。m*n,,資料量大了話,不要隨便試玩。。
wordcount.count
3353
readMeWordCount.count
res25: Long = 324

wordcount.cartesian(readMeWordCount,2)
res23: Long = 1086372

3353 * 324 = 1086372

原創,轉載請註明出處http://blog.csdn.net/oopsoom/article/details/22918991,謝謝。

相關推薦

Spark RDD Transformation ---Spark學習筆記7

這幾天學習了Spark RDD transformation 和 action ,做個筆記記錄下心得,順便分享給大家。 1. 啟動spark-shell  SPARK_MASTER=local[4] ./spark-shell.shWelcome to       ___

Spark RDD入門

1、Spark RDD概念 RDD即彈性分散式資料集,有容錯機制並可以被並行操作的元素集合,具有隻讀、分割槽、容錯、高效、無需物化、可以快取、RDD依賴等特徵。RDD只是資料集的抽象,分割槽內部並不會

Spark RDD使用2--RDD建立方式

RDD建立方式 1)從Hadoop檔案系統(如HDFS、Hive、HBase)輸入建立。 2)從父RDD轉換得到新RDD。 3)通過parallelize或makeRDD將單機資料建立為分散式RDD。 4)基於DB(Mysql)、NoSQL(HBase)、S3(SC3)、資

Spark RDD使用1--RDD原理

RDD簡介       在叢集背後,有一個非常重要的分散式資料架構,即彈性分散式資料集(Resilient Distributed Dataset,RDD)。RDD是Spark的最基本抽象,是對分散式記憶體的抽象使用,實現了以操作本地集合的方式來操作分散式資料集的抽象實現。

Spark RDD API(一) Map和Reduce

RDD是什麼? RDD是Spark中的抽象資料結構型別,任何資料在Spark中都被表示為RDD。從程式設計的角度來看,RDD可以簡單看成是一個數組。和普通陣列的區別是,RDD中的資料是分割槽儲存的,這樣不同分割槽的資料就可以分佈在不同的機器上,同時可以被並行處理。因此,S

Spark RDD使用5--Action運算元

本質上在Actions運算元中通過SparkContext執行提交作業的runJob操作,觸發了RDD DAG的執行。  根據Action運算元的輸出空間將Action運算元進行分類:無輸出、 HDFS、 Scala集合和資料型別。 無輸出 foreach 對RDD中的每個元素

Spark——RDD操作

一、基本RDD 1、針對各個元素的轉化操作 最常用的轉化操作是map()和filter()。轉化操作map()J接收一個函式,把這個函式用於RDD中的每一個元素,將函式的返回結果作為結果RDD中對應元素。而轉化操作filter()則接收一個函式,將RDD滿足

git cherry-pick —— Git 學習筆記 18

git cherry-pick 詳解 初識 git cherry-pick(揀選) 揀選會提取某次提交的補丁,之後嘗試將其重新應用到當前分支上。 這種方式在你只想引入特性分支中的某個提交時很有用。 假設你的專案提交歷史如下: 如果你希望將提交 e43a6 拉取到 m

R-CNN論文學習筆記

R-CNN:基於候選區域的目標檢測 Region proposals 基本概念(看論文前需要掌握的): 1.cnn(卷積神經網路):CNN從入門到精通(初學者) 2.Selective search:選擇性搜素 3.warp:圖形region變換 4.Supervised pre-t

git checkout 命令—— Git 學習筆記 15

git checkout 命令詳解 概覽 git checkout 這條命令的常用格式如下: 用法一 git checkout [<commit>] [--] <paths>

IP協議---Linux學習筆記

網路層概述: IP服務的特點: IP協議為上層協議提供無狀態、無連線、不可靠的服務。 無狀態:IP通訊雙方不同步傳輸狀態的資訊,因此所有IP資料報傳送、傳輸、接收都是相互獨立的,沒有上下文關係。這樣同時也暴露了IP協議的缺點:無法處理亂序和重複的IP資料報

mysql中游標的使用案例學習筆記

1.遊標是啥玩意? 簡單的說:遊標(cursor)就是遊動的標識,啥意思呢,通俗的這麼說,一條sql取出對應n條結果資源的介面/控制代碼,就是遊標,沿著遊標可以一次取出一行。我給大家準備一張圖: 2.怎麼使用遊標? //1.宣告/定義一個遊標 declare 宣告;d

《TCP/IP學習筆記-第17/18章 TCP:概述、連線建立與終止

1、概述 TCP提供一種面向連線的、可靠的位元組流服務。全雙工通訊。一個TCP連線由一個4元組唯一確定:本地 IP地址、本地埠號、遠端 IP地址和遠端埠號。 TCP將使用者資料打包構成報文段;它傳送資料後啟動一個定時器;另一端對收到的資料進行確認,對失序的資

.NET 雲原生架構師訓練營(模組二 基礎鞏固 RabbitMQ Masstransit )--學習筆記

# 2.6.7 RabbitMQ -- Masstransit 詳解 - Consumer 消費者 - Producer 生產者 - Request-Response 請求-響應 ## Consumer 消費者 在 MassTransit 中,一個消費者可以消費一種或多種訊息 消費者的型別包括:普通消

Spark計算Pi執行過程---Spark學習筆記4

上回運行了一個計算Pi的例子 那麼Spark究竟是怎麼執行的呢? 我們來看一下指令碼 #!/bin/sh export YARN_CONF_DIR=/home/victor/software/hadoop-2.2.0/etc/hadoop SPARK_JAR=./ass

Spark函數系列之RDD基本轉換

9.png cal shuff reac 數組a water all conn data 摘要: RDD:彈性分布式數據集,是一種特殊集合 ? 支持多種來源 ? 有容錯機制 ? 可以被緩存 ? 支持並行操作,一個RDD代表一個分區裏的數據集 RDD有兩種操作算子: Tra

Spark核心程式設計:RDD持久化

1.RDD持久化原理 1.Spark非常重要的一個功能特性就是可以將RDD持久化在記憶體中。當對RDD執行持久化操作時,每個節點都會將自己操作的RDD的partition持久化到記憶體中,並且在之後對該RDD的反覆使用中,直接使用記憶體快取的partition

Spark核心程式設計之RDD持久化

RDD持久化原理 Spark非常重要的一個功能特性就是可以將RDD持久化在記憶體中。當對RDD執行持久化操作時,每個節點都會將自己操作的RDD的partition持久化到記憶體中,並且在之後對該RDD的反覆使用中,直接使用記憶體快取的partition。這樣的

spark on yarn

.sh 提交 cut com blog sta clu ... client模式 1、參考文檔: spark-1.3.0:http://spark.apache.org/docs/1.3.0/running-on-yarn.html spark-1.6.0:http://s

大資料篇:Spark入門第一個Spark應用程式:WordCount

任務要求 編寫一個Spark應用程式,對某個檔案中的單詞進行詞頻統計。 備註:本文spark的根目錄名:spark-1.6.3-bin-hadoop2.6 #準備工作 cd /usr/local/spark-1.6.3-bin-hadoop2.6 mkdir mycode