1. 程式人生 > >Spark RDD程式設計(Python和Scala版本)

Spark RDD程式設計(Python和Scala版本)

Spark中的RDD就是一個不可變的分散式物件集合,是一種具有相容性的基於記憶體的叢集計算抽象方法,Spark則是這個方法的抽象。

Spark的RDD操作分為轉化操作(transformation)和行動操作(action),兩者的區別在於:

       a.轉化操作返回一個新的RDD物件

       b.行動操作則會對RDD產生一個計算結果,並把結果返回到驅動器程式中或者把結果儲存到外部儲存系統(如HDFS)

常見的轉化操作有:map,filter,flatMap,sample,union,distinct,

                                    groupByKey,reduceByKey,sortByKey,join,cogroup,cartesian  ......

常見的行動操作有:reduce,collect,count,first,take,taksSample,

                                    saveAsTextFile,saveAsSequenceFile,countByKey,foreach ......

下面我們以例項說明Saprk的RDD程式設計

1:建立RDD

      有兩種方式:讀取外部資料集,以及在驅動器程式中對一個集合進行並行化

       python:

>>> nums =sc.parallelize([1,2,3,4])
>>> nums
ParallelCollectionRDD[0] at parallelize at PythonRDD.scala:423
>>> words = sc.textFile("file:///usr/local/hadoop/spark/README.md")
>>> words
file:///usr/local/hadoop/spark/README.md MapPartitionsRDD[2] at textFile at NativeMethodAccessorImpl.java:-2
>>> 
       Scala(兩種方法):
val lines = sc.parallelize(List(1,2,3,4))
lines: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[0] at parallelize at <console>:27
val rdd = sc.makeRDD(1 to 10,2)
rdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[10] at makeRDD at <console>:27

2:map()函式 和 take()函式

      RDD.map(func),map接受一個函式作為引數,作用於RDD中的每個物件,並將返回結果作為結果RDD中對應的元素的值

      RDD.take(num),用於取回num個value,在這裡結合map使用,方便檢視值

Python:

>>> nums = sc.parallelize([1,2,3,4])
>>> for num in nums.take(4):
...     print num
... 
1
2
3
4
>>> new_nums = nums.map(lambda x: x*2)
>>> for new_num in new_nums.take(4):
...     print new_num
... 
2
4
6
8

Scala:

scala> val nums = sc.parallelize(List(1,2,3,4))
nums: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[4] at parallelize at <console>:27

scala>nums.take(4).foreach(println)
1
2
3
4

3:flatMap()函式

      RDD.flatMap(func),和map類似,只不過map返回的是一個個元素,而flatMap返回的則是一個返回值序列的迭代器

Python:

>>> string = sc.parallelize(["i love you"])
>>> new_str = string.flatMap(lambda str:str.split(" "))</span>
>>> for str in new_str.take(3):
...     print str
... 
i
love
you

Scala:

scala> val string = sc.parallelize(List("i love you"))
string: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[5] at parallelize at <console>:27

scala> val new_str = string.flatMap(line=>line.split(" "))
new_str: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[6] at flatMap at <console>:29

scala> new_str.take(3).foreach(println)
i
love
you

4:filter()函式和 first()函式

      RDD.filter(func),接受一個函式作為引數,並將RDD中滿足該函式的元素放入新的RDD中返回

      RDD.first(),返回的第一個

Python:

>>> string = sc.parallelize(["i love you"])
>>> new_str = string.filter(lambda line : "you" in line)
>>> new_str.first()
'i love you'

Scala:

scala> val string = sc.parallelize(List("i love you"))
string: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[7] at parallelize at <console>:27
scala> string.first()
res3: String = i love you

scala> 
<pre name="code" class="java">scala> val string = sc.parallelize(List("I love you"))
scala> val new_str = string.filter(line =>line.contains("love"))
new_str: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[6] at filter at <console>:23

scala> new_str.foreach(println)
I love you

5:union()函式

      RDD1.union(RDD2),操作物件為兩個RDD,返回一個新的RDD,轉化操作可以操作任意數量的輸入RDD

Python:

>>> num1 = sc.parallelize([1,2,3])
>>> num2 = sc.parallelize([4,5,6])
>>> num3 = num1.union(num2)
>>> for num in num3.take(6):
...     print num
... 
1
2
3
4
5
6

Scala:

scala> val num1 = sc.parallelize(List(1,2,3))
num1: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[0] at parallelize at <console>:27

scala> val num2 = sc.parallelize(List(4,5,6))
num2: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[1] at parallelize at <console>:27

scala> val num3 = num1.union(num2)
mum3: org.apache.spark.rdd.RDD[Int] = UnionRDD[2] at union at <console>:31

scala> num3.count()
res1: Long = 6                                                                  

scala> num3.foreach(println)
3
1
2
4
5
6

6:count()函式和collect()函式

      RDD.count(),是統計RDD中元素的個數,返回的是一個整數

      EDD.collect(),用來收集資料,儲存在一個新的資料結構中,用來持久化,需要注意的是collect不能用在大規模資料集上

Python:

>>> nums = sc.parallelize([1,2,3,4])
>>> nums.count()
[Stage 0:>                                                          (0 +[Stage 0:>                                                          (0 +[Stage 0:==============>  
 (1 +
 4       
>>> 
>>> new_nums = nums.collect()
>>> new_nums
[1, 2, 3, 4]
>>> 

Scala:

scala> val num1 = sc.parallelize(List(1,2,3))
num1: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[3] at parallelize at <console>:27

scala> num1.count()
res3: Long = 3

scala> val num2=num1.collect()
num2: Array[Int] = Array(1, 2, 3)

scala> num2
res4: Array[Int] = Array(1, 2, 3)

scala> 

7:偽集合操作

(1):RDD.distinct,去重,但其操作的開銷大,因為它需要所有資料通過網路進行混洗

Python:
>>> nums1 = sc.parallelize([1,2,3,3])
>>> nums1.count()
4
>>> nums2=nums1.distinct()
>>> nums2.count()
3
>>>
Scala:
scala> val num1 = sc.parallelize(List(1,2,3,3))
num1: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[4] at parallelize at <console>:27

scala> val num2 = num1.distinct()
num2: org.apache.spark.rdd.RDD[Int] = MapPartitionsRDD[7] at distinct at <console>:29

scala> num2.foreach(println)
2
3
1

(2):RDD1.intersection(RDD2),返回兩個RDD中都有的元素,類似於集合中的交集

Python:
>>> nums_1=sc.parallelize([1,2,3,4,5])
>>> nums_2=sc.parallelize([3,4,5,6,7])
>>> nums_3=nums_1.intersection(nums_2)
>>> nums_3.count()
[Stage 7:>                                                          (0 +                                                                        3       
>>> for num in nums_3.take(3):
...     print num
...
3
4
5
>>>
Scala:
scala> val num1 = sc.parallelize(List(1,2,3,4))
num1: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[9] at parallelize at <console>:27

scala> val num2 = sc.parallelize(List(3,4,5,6))
num2: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[10] at parallelize at <console>:27

scala> val num3 = num1.intersection(num2)
num3: org.apache.spark.rdd.RDD[Int] = MapPartitionsRDD[16] at intersection at <console>:31

scala> num3.foreach(println)
4
3


(3):RDD1.subtract(RDD2),接受一個RDD作為引數,返回一個由只存在第一個RDD1而不存在與第二個RDD2中的所有元素組成的RDD

Python:
>>> nums_4 = nums_1.subtract(nums_2)
>>> nums_4.count()
2
>>> for num in nums_4.take(2):
...     print num
...
1
2
>>>
Scala:
scala> val num1 = sc.parallelize(List(1,2,3,4))
num1: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[17] at parallelize at <console>:27

scala> val num2 = sc.parallelize(List(3,4,5,6))
num2: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[18] at parallelize at <console>:27

scala> val num3 = num1.subtract(num2)
num3: org.apache.spark.rdd.RDD[Int] = MapPartitionsRDD[22] at subtract at <console>:31

scala> num3.foreach(println)
2
1


(4):RDD1.cartesian(RDD2),求笛卡爾積,求出所有可能的(a,b)對

Python:
>>> nums_5 = nums_1.cartesian(nums_2)
>>> nums_5
[email protected]
>>> nums_5.first()
(1, 3)
>>>

Scala:

scala> val num1 = sc.parallelize(List(1,2,3,4))
num1: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[23] at parallelize at <console>:27

scala> val num2 = sc.parallelize(List(3,4,5,6))
num2: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[24] at parallelize at <console>:27

scala> val num3 = num1.cartesian(num2)
num3: org.apache.spark.rdd.RDD[(Int, Int)] = CartesianRDD[25] at cartesian at <console>:31

scala> num3.foreach(println)
(1,3)
(1,5)
(1,6)
(1,4)
(2,3)
(2,4)
(3,3)
(2,5)
(2,6)
(3,4)
(3,6)
(4,3)
(3,5)
(4,5)
(4,4)
(4,6)

8:reduce()函式

  RDD.reduce(func),接受一個函式作為引數,操作兩個RDD的元素型別的資料並返回一個同樣型別的新元素

Python:

>>> nums=sc.parallelize([1,2,3,4,5,6])
>>> nums.reduce(lambda x,y:x+y)
21
>>> 
Scala:
scala> val num1 = sc.parallelize(List(1,2,3,4))
num1: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[27] at parallelize at <console>:27

scala> val num2 = num1.reduce((x,y)=>x+y)
num2: Int = 10

9:aggregate()函式

aggregate()函式需要我們提供期待返回的型別的初始值,然後通過一個函式把RDD中的元素合併起來放入累加器,考慮到每個節點是在本地累加的,最終,還需要通過第二個函式把累加器兩兩合併

Python:

>>> nums = sc.parallelize([1,2,3,4])
>>> sumCount = nums.aggregate( (0,0),
... (lambda acc,value:(acc[0]+value,acc[1]+1)),
... (lambda acc1,acc2:(acc1[0]+acc2[0],acc1[1]+acc2[1])))
>>> sumCount[0]/float(sumCount[1])
2.5
>>>
Scala:
scala> val num1 = sc.parallelize(List(1,2,3,4))
num1: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[30] at parallelize at <console>:27

scala> val result = num1.aggregate((0,0))(
     | (acc,value) => (acc._1 + value,acc._2+1),
     | (acc1,acc2) =>(acc1._1+acc2._1,acc1._2+acc2._2)
     | )
result: (Int, Int) = (10,4)

scala> val avg = result._1/result._2.toDouble
avg: Double = 2.5

scala> 

10:top()函式和 foreach()函式

        RDD.top(num),從RDD中返回前邊的num個元素

Python:

>>> nums = sc.parallelize([1,2,3,4])
>>> new_nums = nums.top(3)
>>> new_nums
[4, 3, 2]
Scala:
scala> val num1 = sc.parallelize(List(1,2,3,4))
num1: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[31] at parallelize at <console>:27

scala> num1.top(2)
res10: Array[Int] = Array(4, 3)

scala> 
        RDD.foreach(func),對RDD中的每個元素使用給定的函式

Python:

>>> nums = sc.parallelize([1,2,3])
>>> def add(x):
...     print "\n","x+2:",x+2
... 
>>> nums.foreach(add)

x+2: 5

x+2: 3

x+2: 4

Scala:

scala> def add(x:Int)={
     |  println (x+2)
     | }
add: (x: Int)Unit

scala> val num1 = sc.parallelize(List(1,2,3,4))
num1: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[36] at parallelize at <console>:27

scala> num1.foreach(add)
6
5
3
4

11:sample()函式 和 takeSample()函式

            sample(withReplacement,traction,[send]):對RDD取樣以及是否轉換

Python:

>>> nums = sc.parallelize([1,2,3,4,5,6,7])
>>> new_nums = nums.sample(False,0.5)
>>> new_nums
PythonRDD[106] at RDD at PythonRDD.scala:43
>>> new_nums.count()
5
>>> for n in new_nums.take(5):
...     print n
... 
1
3
5
6
7
Scala:
scala> val num1 = sc.parallelize(List(1,2,3,4))
num1: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[37] at parallelize at <console>:27

scala> val num2 = num1.sample(false,0.5)
num2: org.apache.spark.rdd.RDD[Int] = PartitionwiseSampledRDD[38] at sample at <console>:29

scala> num2.foreach(println)
2
3
          RDD.takeSample(withReplacement,num,[send]),從RDD中返回任意一些元素

Python:

>>> nums = sc.parallelize([1,2,3,4,5,6,7])
>>> new_nums= nums.takeSample(False,5)
>>> new_nums
[5, 3, 4, 6, 7]

Scala:

scala> val num1 = sc.parallelize(List(1,2,3,4))
num1: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[39] at parallelize at <console>:27

scala> val num2 = num1.takeSample(false,2)
num2: Array[Int] = Array(3, 4)

12:persist  和 unpersist

        RDD.persist(),不帶引數預設把資料以序列化的形式快取在JVM的堆空間中

        RDD.unpersist(),手動把持久化的RDD從記憶體中刪除

>>> nums = sc.parallelize([1,2,3,4,5,6,7])
>>> new_nums = nums.persist()
>>> new_nums
ParallelCollectionRDD[124] at parallelize at PythonRDD.scala:423
>>> new_nums.unpersist()
ParallelCollectionRDD[124] at parallelize at PythonRDD.scala:423
>>> 
Scala:
scala> val num1 = sc.parallelize(List(1,2,3,4))
num1: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[41] at parallelize at <console>:27

scala> val num2 = num1.persist()
num2: num1.type = ParallelCollectionRDD[41] at parallelize at <console>:27

scala> num2.foreach(println)
3
1
2
4

scala> num2.unpersist()
res17: num2.type = ParallelCollectionRDD[41] at parallelize at <console>:27


相關推薦

Spark RDD程式設計PythonScala版本

Spark中的RDD就是一個不可變的分散式物件集合,是一種具有相容性的基於記憶體的叢集計算抽象方法,Spark則是這個方法的抽象。Spark的RDD操作分為轉化操作(transformation)和行動操作(action),兩者的區別在於:       a.轉化操作返回一個新

Spark-RDD轉Dataset及簡單的SparkSql操作javascala版本

一、程式設計式方法 (一)java版本 public class WordPro { private static SparkSession gloableSpark; private static Logger logger = LoggerFactory.getL

Scala中使用函數語言程式設計函式高階函式

                                            圖示,這是一個普通

解決CentOS67版本,/etc/sysconfig/下沒有iptables的問題

name 命令 install star spa lib cep pro centos 6 一、Centos 6版本解決辦法: 1.任意運行一條iptables防火墻規則配置命令: iptables -P OUTPUT ACCEPT 2.對iptables服務進行保存:

python入門PythonPycharm安裝

不能 速查 ins ase 技術 ati scroll env 按鈕 Python簡介 Python是一種計算機程序設計語言,它結合了解釋性、編譯性、互動性和面向對象的腳本語言,非常簡單易用。Python 的設計具有很強的可讀性,相比其他語言經常使用英文關鍵字,其他語言

ID過濾靚號寫法PHPNodejs版本

log aabb CA tro 運算 代碼 ice for brush 1 前言 例如某APP的用戶ID,需要按照一定規則把靚號先存取來,然後慢慢按要求釋放靚號 2 代碼 PHP版本如下: function genUserId(){ $id = "";

Leetcode 929 獨特的電子郵件PythonC++實現

每封電子郵件都由一個本地名稱和一個域名組成,以 @ 符號分隔。 例如,在 [email protected]中, alice 是本地名稱,而 leetcode.com 是域名。 除了小寫字母,這些電子郵件還可能包含 ','

C++瀏覽目錄下所有檔案windowlinux版本

原本以為這麼常用的功能應該是標準C支援的,試了一下才發現不同平臺差異挺大。 參考部落格:https://blog.csdn.net/u012005313/article/details/50687297 上程式碼 test_dir.cpp #include <vector>

Leetcode 929 獨特的電子郵件PythonC++實現

每封電子郵件都由一個本地名稱和一個域名組成,以 @ 符號分隔。 例如,在 [email protected]中, alice 是本地名稱,而 leetcode.com 是域名。 除了小寫字母,這些電子郵件還可能包含 ',' 或 '+'。 如果在電子郵件地址的本

Java初學 面向物件程式設計介面內部類

Java初學 面向物件程式設計(介面和內部類) 1、定義一個Phone介面,其中包含String GetPrice()方法和double GetWeight()方法;(1)在主類中設計void PrintPhone(Phone p)方法,呼叫Phone介面中的兩

2.二維陣列中的查詢pythonjava實現

題目:在一個二維陣列中,每一行都是按照從左到右遞增的順序排序,每一列都是安裝從上到下遞增的順序排序。請完成一個函式,輸入這樣的一個二維陣列和一個整數,判斷陣列中書否含有該整數。 演算法思想:我們知道每一行都是遞增排序的,每一列也是從上到下遞增排序的,所以左上角的數是最小的,

C語言之網路程式設計伺服器客戶端

1、 套接字:源IP地址和目的IP地址以及源埠號和目的埠號的組合稱為套接字。其用於標識客戶端請求的伺服器和服務。 常用的TCP/IP協議的3種套接字型別如下所示。 (1)流套接字(SOCK_STREAM): 流套接字用於提供面向連線、可靠的資料傳輸服務。該服務將保證資料能夠實現無差錯、無重複傳送,並按順序接

列印自身的程式pythonc版

python版 me='me=%r\nprint me %% me' print me % me 網上流傳的c版 #include <stdio.h> char* recurse = "#include <stdio.h>%cchar* rec

SparkRDD轉換成DataFrame的兩種方式分別用Javascala實現

 一:準備資料來源       在專案下新建一個student.txt檔案,裡面的內容為: print? <code class="language-java">1,zhangsan,20   2,lisi,21   3,wanger,1

關於Spark scala 版本衝突的問題

關於Spark 和 scala 版本的問題 最近用 scala 寫 Spark 的時候出現了兩種問題,最後排查都是版本的問題,使用的是IDEA本地執行的時候出現 尤其第二種,他並不是每次都會出現,可能只在執行某幾行有衝突的程式碼時才會出現 先把兩種異常貼出

關於sparkscala版本衝突問題

在pom.xml 中進行配置相應版本的配置,例如spark1.6.2 相容scala2.10.5 下面,我提供一個相容的pom.xml檔案,我以上傳pom.xml檔案 4.0.0 com.aura.bigdata meituan-log-analysis 1.

自學Python day6--------面向物件程式設計例項

自學Python day6——–面向物件程式設計(類和例項) 1.類和例項 面向物件最重要的概念就是類(Class)和例項(Instance),必須牢記類是抽象的模板,比如Student類,而例項是根據類創建出來的一個個具體的“物件”,每個物件都擁有相同的方

SparkSpark RDD程式設計

目錄: 3、RDD程式設計 3.1、RDD基礎 3.2、建立RDD 3.3、RDD操作 3.3.1、轉化操作 3.3.2、行動操作 3.3.3、惰性求值 3.4、向Spark傳遞函式 3.5、常見的轉化操作和行動操作 3.5.1、基本RDD 3.5.2、在

Python篇----面向物件程式設計物件篇

1 概述     簡稱:OOP(Oriented Object Programming)。這是一種以構建物件,程式設計實現為方向的語言。現實世界中,許多問題過於複雜,需要拆分,所以用不同的物件代替各

Faster R-CNN的安裝及測試Python版本Matlab版本

rbg的Python版本 一、拉取原始碼 git clone --recursive https://github.com/rbgirshick/py-faster-rcnn.git 拉取完成後,在/home/cmwang/目錄下增加了py-fas