1. 程式人生 > >資料探勘工具---pyspark使用方法練習

資料探勘工具---pyspark使用方法練習

來源,官網spark2.2.1版本
pyspark不同函式的形象化解釋
SparkSession是Spark 2.0引入的新概念。SparkSession為使用者提供了統一的切入點,來讓使用者學習spark的各項功能。 在spark的早期版本中,SparkContext是spark的主要切入點,由於RDD是主要的API,我們通過sparkcontext來建立和操作RDD。對於每個其他的API,我們需要使用不同的context。例如,對於Streming,我們需要使用StreamingContext;對於sql,使用sqlContext;對於hive,使用hiveContext。但是隨著DataSet和DataFrame的API逐漸成為標準的API,就需要為他們建立接入點。所以在spark2.0中,引入SparkSession作為DataSet和DataFrame API的切入點。SparkSession實質上是SQLContext和HiveContext的組合(未來可能還會加上StreamingContext),所以在SQLContext和HiveContext上可用的API在SparkSession上同樣是可以使用的。SparkSession內部封裝了SparkContext,所以計算實際上是由SparkContext完成的。

1、pyspark函式介面

pyspark.SparkConf()

是對spark應用進行設定的類

pyspark.SparkContext()

是spark應用的入口,也可以稱為驅動
SparkContext.broadcast(value)函式:廣播變數
廣播變數允許程式設計師將一個只讀的變數快取在每臺機器上,而不用在任務之間傳遞變數。廣播變數可被用於有效地給每個節點一個大輸入資料集的副本。Spark還嘗試使用高效地廣播演算法來分發變數,進而減少通訊的開銷。
Spark的動作通過一系列的步驟執行,這些步驟由分散式的洗牌操作分開。Spark自動地廣播每個步驟每個任務需要的通用資料。這些廣播資料被序列化地快取,在執行任務之前被反序列化出來。這意味著當我們需要在多個階段的任務之間使用相同的資料,或者以反序列化形式快取資料是十分重要的時候,顯式地建立廣播變數才有用。
通過在一個變數v上呼叫SparkContext.broadcast(v)可以建立廣播變數。廣播變數是圍繞著v的封裝,可以通過value方法訪問這個變數。舉例如下:

from pyspark import SparkContext
from numpy import array
sc=SparkContext()
broadcast_var= sc.broadcast(array([1,2,3,4]))
print(broadcast_var.value)
#執行結果 [1 2 3 4]

在建立了廣播變數之後,在叢集上的所有函式中應該使用它來替代使用v.這樣v就不會不止一次地在節點之間傳輸了。另外,為了確保所有的節點獲得相同的變數,物件v在被廣播之後就不應該再修改。

SparkContext.parallelize(c,numSlice=none)

將一個本地的python物件叢集分佈以建立一個RDD。如果是將一個範圍實現RDD化,推薦使用xrange(),xrange與range的區別在於xrange是生成一個列表物件,而不是列表本身,因而不需要一上來就開闢一塊記憶體區域,但是python3已經沒有xrange了。

sc=SparkContext()
print(sc.parallelize([1,2,3,4,5,6],3).glom().collect())
print(sc.parallelize([0,6,2],5).glom().collect())

結果如下:

[[1, 2], [3, 4], [5, 6]]
[[], [0], [], [6], [2]]

上面的程式中,parallelize的第二個引數是RDD的切片數目(這個跟分割槽有區別?),
同時可以看到,如果元素個數少於分割槽數,則會產生空的列表。

SparkContext.range(start,end=none,step=1,numSlice=none)

產生某個範圍內元素組成的RDD

sc=SparkContext()
print(sc.range(1,7,2).collect())

結果如下:

[1, 3, 5]

SparkContext.textFile(name, minPartitions=None, use_unicode=True)

從hdfs上,從本地等讀取txt檔案,並轉換成RDD

SparkContext.wholeTextFiles(path, minPartitions=None, use_unicode=True)

讀取整個資料夾下的txt檔案

SparkContext.union(rdds)

將不同的RDD組合在一起

rdd.map(f)

將rdd中的每一個元素作一個一對一的對映,對映後的一可以是一個元素或一個組合,比如元組、列表等。如果輸入是一個dataframe,比如使用sparksql讀取hive資料表,此時rdd中的每一個元素對應原資料庫中的一行。這樣我們可以做針對行的組合操作。
比如使用map實現(key,values)操作:

#我們首先建立這樣的一個rdd
input_data=sc.parallelize([['a',1],['b',2],['b',3],['c',4],['a',5],['b',6]],2)
#檢視結果:
print(input_data.glom().collect())
#結果如下:
#[[['a', 1], ['b', 2], ['b', 3]], [['c', 4], ['a', 5], ['b', 6]]]
#可以看到這樣rdd有2個分割槽,每個分割槽中包含3個元素,每個元素均為一個列表
#我們假如第一個是鍵,第二個是值,下面我們來構造鍵值對rdd,並對每個鍵進行求和。
#首先利用map(f)構造鍵值對
input_key_value=input_data.map(lambda x:(x[0],x[1]))
#檢視一下結果:
#[[('a', 1), ('b', 2), ('b', 3)], [('c', 4), ('a', 5), ('b', 6)]]
#從上面的結果可以看到,分割槽未發生變化。但每個元素的形式發生了變化
#下面利用reduceByKey(f)來求和
sum_value=input_key_value.reduceByKey(lambda x,y:x+y)
#上式中lambda傳入的引數x,y代表元素的值兩兩間進行,返回x+y表示兩兩求和,按鍵逐步聚合。
print(sum_value.glom().collect())
#結果如下:
#[[('c', 4), ('b', 11)], [('a', 6)]]
#上面的結果比較有意思的是,求reduceByKey求和後,仍然是兩個分割槽。很顯示這樣存在分割槽間的資料傳遞。所以從效率的角度考慮這樣的分割槽方式並不合理。

下面用map實現函式對映,這裡以kmeans為例.程式碼主體來源網上。

#生成多類單標籤資料集
import numpy as np
import matplotlib.pyplot as plt
from sklearn.datasets.samples_generator import make_blobs
center=[[1,1],[-1,-1],[1,-1]]
cluster_std=0.3
X,labels=make_blobs(n_samples=200,centers=center,n_features=2,
                    cluster_std=cluster_std,random_state=0)
print('X.shape',X.shape)
print("labels",set(labels))

unique_lables=set(labels)
colors=plt.cm.Spectral(np.linspace(0,1,len(unique_lables)))
for k,col in zip(unique_lables,colors):
    x_k=X[labels==k]
    plt.plot(x_k[:,0],x_k[:,1],'o',markerfacecolor=col,markeredgecolor="k",
             markersize=14)
plt.title('data by make_blob()')
plt.show()

生成的資料
X.shape (200, 2)
labels {0, 1, 2}
其影象如下所示:
這裡寫圖片描述
下面是基於map和reduceByKey完成的kmeans聚類

#生成多類單標籤資料集
import numpy as np
import matplotlib.pyplot as plt
from sklearn.datasets.samples_generator import make_blobs
center=[[1,1],[-1,-1],[1,-1]]
cluster_std=0.3
X,labels=make_blobs(n_samples=200,centers=center,n_features=2,
                    cluster_std=cluster_std,random_state=0)
# print('X.shape',X.shape)
# print("labels",set(labels))
#
unique_lables=set(labels)
colors=plt.cm.Spectral(np.linspace(0,1,len(unique_lables)))
for k,col in zip(unique_lables,colors):
    x_k=X[labels==k]
    plt.plot(x_k[:,0],x_k[:,1],'o',markerfacecolor=col,markeredgecolor="k",
             markersize=14)
# plt.title('data by make_blob()')
# plt.show()

from pyspark import SparkContext

def closestPoint(p,centers):#計算某個元素的類別
    bestIndex = 0#初始化類別
    closest = float("+inf")#初始化某點與任意一聚類中心的最小距離為無窮大
    for i in range(len(centers)):
        tempDist = np.sum((p - centers[i]) ** 2)#某個元素到某個聚類中心的距離
        if tempDist < closest:
            closest = tempDist#更新與任意一聚類中心的最小距離
            bestIndex = i#更新類別
    return bestIndex
sc=SparkContext()
inputData=sc.parallelize(X).cache()#將原資料轉換成rdd,每一個元素對應於一個樣本點,並將資料快取。這裡每個元素是array([x1,x2])的形式。
K=3#初始化類的數目
convergeDist=0.01#初始化相臨兩次聚類中心的最小收斂距離,即如果新的聚類中心與上一次聚類中心距離很小了,就可以不用再繼續優化了。
tempDist = 1#初始化相臨兩次聚類中心的距離
kPoints=inputData.takeSample(False,K,1)#隨機抽取K個類中心,即初始化聚類中心。
#print(kPoints)
#結果[array([ 1.04321307,  1.43628205]), array([ 0.85610326, -0.81389251]), array([-1.42721827, -1.14799597])]
while tempDist > convergeDist:
    closest = inputData.map(lambda p: (closestPoint(p, kPoints), (p, 1)))#將元素對映到這樣的鍵值對,元素---->(類別,(元素,1))
    #print(closest.collect())
    #結果[(0, (array([ 1.68092639,  0.5636903 ]), 1)), ..., (2, (array([-1.36763066, -0.74669111]), 1))]
    pointStats = closest.reduceByKey(lambda p1_c1, p2_c2: (p1_c1[0] + p2_c2[0], p1_c1[1] + p2_c2[1]))#(類別,(元素,1))---->(類別,(array(sum(維度1),sum(維度2)), n))
    #print(pointStats.collect())
    #結果[(0, (array([ 67.85854692,  71.4189192 ]), 67)), (1, (array([ 62.88505036, -68.0744321 ]), 67)), (2, (array([-69.06467775, -68.44964606]), 66))]
    newPoints = pointStats.map(lambda st: (st[0], st[1][0] / st[1][1])).collect()#計算新的聚類中心
    #print(newPoints)
    #結果[(0, array([ 1.01281413,  1.06595402])), (1, array([ 0.93858284, -1.0160363 ])), (2, array([-1.04643451, -1.03711585]))]
    tempDist = sum(np.sum((kPoints[iK] - p) ** 2) for (iK, p) in newPoints)#計算相臨兩次聚類中心的距離
    #print(tempDist)
    #結果0.343022614898
    for (iK, p) in newPoints:#更新聚類中心
        kPoints[iK] = p
print("Final centers: " + str(kPoints))
newInput=closest.collect()

#這裡先這樣畫
for p in newInput:
    if p[0]==0:
        plt.plot(p[1][0][0], p[1][0][1], '*', markerfacecolor='g', markeredgecolor='b',
                 markersize=14)
    elif p[0]==1:
        plt.plot(p[1][0][0], p[1][0][1], '*', markerfacecolor='b', markeredgecolor='r',
                 markersize=14)
    else:
        plt.plot(p[1][0][0], p[1][0][1], '*', markerfacecolor='r', markeredgecolor='g',
                 markersize=14)
plt.title('kmeansByPySpark')
plt.show()

得到的結果如下:
這裡寫圖片描述
圖中,圓圈是原始類別,五角星是通過聚類學習到的類別,可以看到聚類完全正確。

flatMap(f)

flatMap的用法與map類似,只不過map是一個輸入元素對應一個輸出元素,而flatMap一個輸入元素對應多個輸出元素。

from pyspark import SparkContext
sc=SparkContext()
x=sc.parallelize([1,2,3])
xFM1=x.flatMap(lambda x:(x,x*100,x**2))
xFM2=x.flatMap(lambda x:[x,x*100,x**2])
xFM3=x.flatMap(lambda x:(x,(x*100,x**2)))
print(x.collect())
print(xFM1.collect())
print(xFM2.collect())
print(xFM3.collect())
#結果如下:
#[1, 2, 3]
#[1, 100, 1, 2, 200, 4, 3, 300, 9]
#[1, 100, 1, 2, 200, 4, 3, 300, 9]
#[1, (100, 1), 2, (200, 4), 3, (300, 9)]
#所以不管是組合成一個元組,還是組合成一個列表,最後都會把裡面的每一個單元當成是輸出rdd的一個元素,這就是一對多;但是裡面的單元只深入到第一層,不會繼續拆分,否則就無法控制。

mapValues(f)

將鍵值對形式的rdd的值用函式f來作用,而保持鍵不變,同時分割槽不變。

flatMapValues(f)[source]

綜合flatMap和mapValues的特點。

mapPartitions(f, preservesPartitioning=False)

與map不同,map是對每一個元素用函式作用;而mapPartitions是對每一個分割槽用一個函式去作用,每一個分割槽的元素先構成一個迭代器iterator,iterator是一個像列表,但裡面的元素又保持分散式特點的一類物件;輸入的引數就是這個iterator,然後對iterator進行運算,iterator支援的函式不是太多,sum,count等一些spark定義的基本函式應該都是支援的。但如果要進行更為複雜的一些個性化函式運算,可以就用不了。實踐中發生可以通過[x for i in iterator]的方式,將iterator轉換為列表,然後就可以進行各種操作。但是這樣在分割槽內部或分組內部就失去了分散式運算的特點。

x=sc.parallelize([1,2,3],2)
def f(iterator):yield sum(iterator)
xMP=x.mapPartitions(f)
print(x.glom().collect())
print(xMP.glom().collect())
#結果為:
#[[1], [2, 3]]
#[[1], [5]]

mapPartitionsWithIndex

與mapPartition相比,mapPartitionWithIndex能夠保留分割槽索引,函式的傳入引數也是分割槽索引和iterator構成的鍵值對。給了我們操作分割槽索引的機會,至於最後的結果要不要保留分割槽索引那是另一回事。

x=sc.parallelize([1,2,3],2)
def f1(partitionIndex,iterator):yield (partitionIndex,sum(iterator))
def f2(partitionIndex,iterator):yield sum(iterator)
xMP1=x.mapPartitionsWithIndex(f1)
xMP2=x.mapPartitionsWithIndex(f2)
print(x.glom().collect())
print(xMP1.glom().collect())
print(xMP2.glom().collect())
#結果為:
#[[1], [2, 3]]
#[[(0, 1)], [(1, 5)]]
#[[1], [5]]

partitionBy(numPartitions, partitionFunc=)

按給定的分割槽數和對映方法進行分割槽

pairs = sc.parallelize([1, 2, 3, 4, 2, 4, 1]).map(lambda x: (x, x))
print(pairs.collect())
sets = pairs.partitionBy(2).glom().collect()
print(sets)

結果如下:

[(1, 1), (2, 2), (3, 3), (4, 4), (2, 2), (4, 4), (1, 1)]
[[(4, 4), (2, 2), (4, 4), (2, 2)], [(1, 1), (3, 3), (1, 1)]]

rdd.coalesce(numPartitions,shuffle=none)

按時新的分割槽數重新分割槽

print(sc.parallelize([1,2,3,4,5,6],3).glom().collect())
print(sc.parallelize([1,2,3,4,5,6],3).coalesce(2).glom().collect())

結果如下:

[[1, 2], [3, 4], [5, 6]]
[[1, 2], [3, 4, 5, 6]]

rdd.repartition(numPartitions)

按時新的分割槽數重新分割槽

rdd.zip(other)

將第一個rdd的元素作用鍵,第二個rdd的元素作為值,組成新rdd的元素。

x=sc.parallelize(['B','A','A'])
y=x.map(lambda x:ord(x))
z=x.zip(y)
print(x.collect())
#結果為:['B', 'A', 'A']
print(y.collect())
#結果為:[66, 65, 65]
print(z.collect())
#結果為:[('B', 66), ('A', 65), ('A', 65)]

rdd.zipWithIndex()

將rdd的元素作鍵,索引(可以理解為看到的位置索引)作為值,組成新rdd的元素。

x=sc.parallelize(['B','A','A'],2)
y=x.zipWithIndex()
print(x.collect())
print(y.collect())
#結果為:[['B'], ['A', 'A']]
#結果為:[[('B', 0)], [('A', 1), ('A', 2)]]

rdd.zipWithUniqueId()

將rdd的元素作鍵,按公式算出的值作為值,組成新rdd的元素。
Zips this RDD with generated unique Long ids.
Items in the kth partition will get ids k, n+k, 2*n+k, …, where n is the number of partitions. So there may exist gaps, but this method won’t trigger a spark job, which is different from zipWithIndex

rdd.keyBy(f)

為rdd中的每個元素按照函式f生成一個鍵,新rdd的元素以元組形式存在。

x=sc.parallelize([1,2,3])
y=x.keyBy(lambda x:x**2)
print(x.collect())
#結果為[1, 2, 3]
print(y.collect())
#結果為[(1, 1), (4, 2), (9, 3)]

rdd.foreach(f)

對RDD中的每個元素使用函式來作用,由於是直接對每個元素操作併產生結果,所以得到的結果不是rdd,而是普通python物件。這與foreachPartition不同。

from pyspark import SparkContext
sc=SparkContext()
rdd_data=sc.parallelize([1,2,3,4,5],2)
print(rdd_data.glom().collect())
def f(x):
    print(x)
list_new=rdd_data.foreach(f)

結果如下:

[[1, 2], [3, 4, 5]]
3
4
5
1
2

一個將rdd元素逐個寫到檔案中的例子:

inputData=sc.parallelize([1,2,3])
def f(x):#定義一個將內容追加於檔案末尾的函式
    with open('./example.txt','a+') as fl:
        print(x,file=fl)

open('./example.txt','w').close()#操作之前先關閉之前可能存在的對該檔案的寫操作
y=inputData.foreach(f)
print(y)
#結果為:None,因為函式f沒有返回值
#檢視寫檔案的結果
with open('./example.txt') as fl:
    print(fl.read())
#結果為:
'''
1
2
3
'''
#說明每一個元素都被寫到檔案'./example.txt'中去了。

rdd.foreachpartition

對RDD的每一個分割槽使用函式來作用

from pyspark import SparkContext
sc=SparkContext()
rdd_data=sc.parallelize([1,2,3,4,5],2)
print(rdd_data.glom().collect())
def f(iterator):
    for x in iterator:
        print(x)
list_new=rdd_data.foreachPartition(f)

結果如下:

[[1, 2], [3, 4, 5]]
1
3
2
4
5

寫分割槽內容的例子

inputData=sc.parallelize([1,2,3],5)
print(inputData.glom().collect())
#結果為:
#[[], [1], [], [2], [3]]
def f(x):#定義一個將內容追加於檔案末尾的函式
    with open('./example.txt','a+') as fl:
        print(list(x),file=fl)#先對分割槽序列進行轉化再寫入到檔案中

open('./example.txt','w').close()#操作之前先關閉之前可能存在的對該檔案的寫操作
y=inputData.foreachPartition(f)
print(y)
#結果為:None,因為函式f沒有返回值
#檢視寫檔案的結果
with open('./example.txt') as fl:
    print(fl.read())
#結果為:
'''
[]
[]
[2]
[1]
[3]
'''
#說明每一個分割槽序列都被寫到檔案'./example.txt'中去了。

rdd.groupByKey(numPartitions=None, partitionFunc=)

原rdd為鍵值對,groupByKey()則將原rdd的元素相同鍵的值編進一個sequence(不知道與list和iterator的不同有多大,可以暫時當成iterator看)

rdd = sc.parallelize([("a", 1), ("b", 1), ("a", 1)])
rddGp=rdd.groupByKey()
print(rdd.collect())
print(rddGp.collect())
#結果如下:
#[('a', 1), ('b', 1), ('a', 1)]
#[('a', <pyspark.resultiterable.ResultIterable object at 0x7fe1e3f88710>), ('b', <pyspark.resultiterable.ResultIterable object at 0x7fe1e3f88668>)]
#從結果看,確實是將鍵相同的值編到一個序列裡了,但型別很奇怪。這樣看沒有什麼用處。但是後面可以接其他函式,一般都接mapValues(f),這樣就可以完成按對值的一些操作。
def f(x):
    a=list(x)#直接使用x會報錯,說明sequence並不能用for
    for i in range(len(a)):
        a[i]=a[i]*2
    return a

gpMp1=rddGp.mapValues(len)
gpMp2=rddGp.mapValues(list)
gpMp3=rddGp.mapValues(f)
print(gpMp1.collect())
print(gpMp2.collect())
print(gpMp3.collect())
#結果如下:
#[('a', 2), ('b', 1)],按鍵計算對應值的個數
#[('a', [1, 1]), ('b', [1])],按鍵將值轉換成列表形式
#[('a', [2, 2]), ('b', [2])],通過自定義函式可以按鍵對值實現更復雜的操作。
#groupByKey()+mapValues()與reduceByKey()的過程很像,但兩者執行效率相差很大,在能夠用reduceByKey()或aggregateByKey的時候,儘量不要用groupByKey()

groupByKey()和flatPapValues()綜合應用

rdd.groupBy(f, numPartitions=None, partitionFunc=)

groupBy()的用法與groupByKey相似,但傳入引數多了f,傳入的函式f可以把它當成用來生成新的key的。它也圍繞這個潛在的key將值編進一個序列。可以看得出來,它比groupByKey更靈活。

rdd = sc.parallelize([1, 1, 2, 3, 5, 8])
result=rdd.groupBy(lambda x:x%2)#按餘數來分組
#後面緊接著一般是mapValues函式做進一步處理,這裡我們直接獲取該資料而不做進一步處理。
print(result.collect())
#結果如下
[(0, <pyspark.resultiterable.ResultIterable object at 0x7f336e26f550>), (1, <pyspark.resultiterable.ResultIterable object at 0x7f336e26f6a0>)]
#所以要用for函式或在mapValues內部用list將pyspark.resultiterable.ResultIterable object轉換出來
resultGp=[(x,sorted(y)) for (x,y) in result.collect()]
print(resultGp)
#結果如下:
[(0, [2, 8]), (1, [1, 1, 3, 5])]

rdd.reduce(f)

reduce函式是將rdd中的每個元素兩兩之間按函式f進行操作,然後再結果再兩兩之間按f進行操作,一直進行下去,即所謂的shuffle過程。reduce得到的結果是普通的python物件,而不是rdd.

rdd.keys()

原rdd的元素為鍵值對,返回原rdd元素的鍵為元素的rdd

rdd.values()

原rdd的元素為鍵值對,返回原rdd元素的值為元素的rdd

rdd.reduceByKey(func, numPartitions=None, partitionFunc=)

reduceByKey函式與reduce相似,但它是按key分組,在組內,將元素兩兩之間按函式f操作。可以看成是將value引數傳入了,但最終結果又不丟失key的資訊。更詳細的使用,可參考上面介紹map函式時的例子。reduceByKey得到的結果是普通的python物件,而不是rdd.

rdd.reduceByKeyLocally(f)

其他與reduceByKey一樣,只不過聚合後立即將鍵,值對以字典的形式傳給到叢集master

from operator import add
inputRdd=sc.parallelize([("a", 1), ("b", 1), ("a", 1)])
rddRBKL=inputRdd.reduceByKeyLocally(add)
print(inputRdd.collect())
#結果為:[('a', 1), ('b', 1), ('a', 1)]
print(rddRBKL)
#結果為:{'a': 2, 'b': 1}

fold(zeroValue, op)[source]

與partitionBy很像,只不過有一個起始值。fold函式是按分割槽對每個元素進行操作,即先每個元素與起始值按op進行操作,得到的結果再兩兩之間按op操作,一直進行下去得到分割槽結果,然後再將分割槽結果按op操作。

>>> from operator import add
>>> x=sc.parallelize([1,2,3],2)
>>> y=x.fold(1,lambda valueInitial,accumulated:accumulated+valueInitial)#這裡的valueInitial,accumulated可以隨便取名字,只表示輸入
print(x.collect())
#結果為:[1, 2, 3]
>>> print(y)
#結果為9

關於python內建操作op有哪些,可以使用help(‘operator’)進行查詢,operator是一個專門的模組。這個模組是mapValues(f),reduce(),aggregate()等相似功能函式都能使用的。詳細列舉如下:

Help on module operator:

NAME
    operator - Operator interface.

DESCRIPTION
    This module exports a set of functions implemented in C corresponding
    to the intrinsic operators of Python.  For example, operator.add(x, y)
    is equivalent to the expression x+y.  The function names are those
    used for special methods; variants without leading and trailing
    '__' are also provided for convenience.

CLASSES
    builtins.object
        attrgetter
        itemgetter
        methodcaller

    class attrgetter(builtins.object)
     |  attrgetter(attr, ...) --> attrgetter object
     |  
     |  Return a callable object that fetches the given attribute(s) from its operand.
     |  After f = attrgetter('name'), the call f(r) returns r.name.
     |  After g = attrgetter('name', 'date'), the call g(r) returns (r.name, r.date).
     |  After h = attrgetter('name.first', 'name.last'), the call h(r) returns
     |  (r.name.first, r.name.last).
     |  
     |  Methods defined here:
     |  
     |  __call__(self, /, *args, **kwargs)
     |      Call self as a function.
     |  
     |  __getattribute__(self, name, /)
     |      Return getattr(self, name).
     |  
     |  __new__(*args, **kwargs) from builtins.type
     |      Create and return a new object.  See help(type) for accurate signature.
     |  
     |  __reduce__(...)
     |      Return state information for pickling
     |  
     |  __repr__(self, /)
     |      Return repr(self).

    class itemgetter(builtins.object)
     |  itemgetter(item, ...) --> itemgetter object
     |  
     |  Return a callable object that fetches the given item(s) from its operand.
     |  After f = itemgetter(2), the call f(r) returns r[2].
     |  After g = itemgetter(2, 5, 3), the call g(r) returns (r[2], r[5], r[3])
     |  
     |  Methods defined here:
     |  
     |  __call__(self, /, *args, **kwargs)
     |      Call self as a function.
     |  
     |  __getattribute__(self, name, /)
     |      Return getattr(self, name).
     |  
     |  __new__(*args, **kwargs) from builtins.type
     |      Create and return a new object.  See help(type) for accurate signature.
     |  
     |  __reduce__(...)
     |      Return state information for pickling
     |  
     |  __repr__(self, /)
     |      Return repr(self).

    class methodcaller(builtins.object)
     |  methodcaller(name, ...) --> methodcaller object
     |  
     |  Return a callable object that calls the given method on its operand.
     |  After f = methodcaller('name'), the call f(r) returns r.name().
     |  After g = methodcaller('name', 'date', foo=1), the call g(r) returns
     |  r.name('date', foo=1).
     |  
     |  Methods defined here:
     |  
     |  __call__(self, /, *args, **kwargs)
     |      Call self as a function.
     |  
     |  __getattribute__(self, name, /)
     |      Return getattr(self, name).
     |  
     |  __new__(*args, **kwargs) from builtins.type
     |      Create and return a new object.  See help(type) for accurate signature.
     |  
     |  __reduce__(...)
     |      Return state information for pickling
     |  
     |  __repr__(self, /)
     |      Return repr(self).

FUNCTIONS
    __abs__ = abs(...)
        abs(a) -- Same as abs(a).

    __add__ = add(...)
        add(a, b) -- Same as a + b.

    __and__ = and_(...)
        and_(a, b) -- Same as a & b.

    __concat__ = concat(...)
        concat(a, b) -- Same as a + b, for a and b sequences.

    __contains__ = contains(...)
        contains(a, b) -- Same as b in a (note reversed operands).

    __delitem__ = delitem(...)
        delitem(a, b) -- Same as del a[b].

    __eq__ = eq(...)
        eq(a, b) -- Same as a==b.

    __floordiv__ = floordiv(...)
        floordiv(a, b) -- Same as a // b.

    __ge__ = ge(...)
        ge(a, b) -- Same as a>=b.

    __getitem__ = getitem(...)
        getitem(a, b) -- Same as a[b].

    __gt__ = gt(...)
        gt(a, b) -- Same as a>b.

    __iadd__ = iadd(...)
        a = iadd(a, b) -- Same as a += b.

    __iand__ = iand(...)
        a = iand(a, b) -- Same as a &= b.

    __iconcat__ = iconcat(...)
        a = iconcat(a, b) -- Same as a += b, for a and b sequences.

    __ifloordiv__ = ifloordiv(...)
        a = ifloordiv(a, b) -- Same as a //= b.

    __ilshift__ = ilshift(...)
        a = ilshift(a, b) -- Same as a <<= b.

    __imatmul__ = imatmul(...)
        a = imatmul(a, b) -- Same as a @= b.

    __imod__ = imod(...)
        a = imod(a, b) -- Same as a %= b.

    __imul__ = imul(...)
        a = imul(a, b) -- Same as a *= b.

    __index__ = index(...)
        index(a) -- Same as a.__index__()

    __inv__ = inv(...)
        inv(a) -- Same as ~a.

    __invert__ = invert(...)
        invert(a) -- Same as ~a.

    __ior__ = ior(...)
        a = ior(a, b) -- Same as a |= b.

    __ipow__ = ipow(...)
        a = ipow(a, b) -- Same as a **= b.

    __irshift__ = irshift(...)
        a = irshift(a, b) -- Same as a >>= b.

    __isub__ = isub(...)
        a = isub(a, b) -- Same as a -= b.

    __itruediv__ = itruediv(...)
        a = itruediv(a, b) -- Same as a /= b

    __ixor__ = ixor(...)
        a = ixor(a, b) -- Same as a ^= b.

    __le__ = le(...)
        le(a, b) -- Same as a<=b.

    __lshift__ = lshift(...)
        lshift(a, b) -- Same as a << b.

    __lt__ = lt(...)
        lt(a, b) -- Same as a<b.

    __matmul__ = matmul(...)
        matmul(a, b) -- Same as a @ b.

    __mod__ = mod(...)
        mod(a, b) -- Same as a % b.

    __mul__ = mul(...)
        mul(a, b) -- Same as a * b.

    __ne__ = ne(...)
        ne(a, b) -- Same as a!=b.

    __neg__ = neg(...)
        neg(a) -- Same as -a.

    __not__ = not_(...)
        not_(a) -- Same as not a.

    __or__ = or_(...)
        or_(a, b) -- Same as a | b.

    __pos__ = pos(...)
        pos(a) -- Same as +a.

    __pow__ = pow(...)
        pow(a, b) -- Same as a ** b.

    __rshift__ = rshift(...)
        rshift(a, b) -- Same as a >> b.

    __setitem__ = setitem(...)
        setitem(a, b, c) -- Same as a[b] = c.

    __sub__ = sub(...)
        sub(a, b) -- Same as a - b.

    __truediv__ = truediv(...)
        truediv(a, b) -- Same as a / b.

    __xor__ = xor(...)
        xor(a, b) -- Same as a ^ b.

    abs(...)
        abs(a) -- Same as abs(a).

    add(...)
        add(a, b) -- Same as a + b.

    and_(...)
        and_(a, b) -- Same as a & b.

    concat(...)
        concat(a, b) -- Same as a + b, for a and b sequences.

    contains(...)
        contains(a, b) -- Same as b in a (note reversed operands).

    countOf(...)
        countOf(a, b) -- Return the number of times b occurs in a.

    delitem(...)
        delitem(a, b) -- Same as del a[b].

    eq(...)
        eq(a, b) -- Same as a==b.

    floordiv(...)
        floordiv(a, b) -- Same as a // b.

    ge(...)
        ge(a, b) -- Same as a>=b.

    getitem(...)
        getitem(a, b) -- Same as a[b].

    gt(...)
        gt(a, b) -- Same as a>b.

    iadd(...)
        a = iadd(a, b) -- Same as a += b.

    iand(...)
        a = iand(a, b) -- Same as a &= b.

    iconcat(...)
        a = iconcat(a, b) -- Same as a += b, for a and b sequences.

    ifloordiv(...)
        a = ifloordiv(a, b) -- Same as a //= b.

    ilshift(...)
        a = ilshift(a, b) -- Same as a <<= b.

    imatmul(...)
        a = imatmul(a, b) -- Same as a @= b.

    imod(...)
        a = imod(a, b) -- Same as a %= b.

    imul(...)
        a = imul(a, b) -- Same as a *= b.

    index(...)
        index(a) -- Same as a.__index__()

    indexOf(...)
        indexOf(a, b) -- Return the first index of b in a.

    inv(...)
        inv(a) -- Same as ~a.

    invert(...)
        invert(a) -- Same as ~a.

    ior(...)
        a = ior(a, b) -- Same as a |= b.

    ipow(...)
        a = ipow(a, b) -- Same as a **= b.

    irshift(...)
        a = irshift(a, b) -- Same as a >>= b.

    is_(...)
        is_(a, b) -- Same as a is b.

    is_not(...)
        is_not(a, b) -- Same as a is not b.

    isub(...)
        a = isub(a, b) -- Same as a -= b.

    itruediv(...)
        a = itruediv(a, b) -- Same as a /= b

    ixor(...)
        a = ixor(a, b) -- Same as a ^= b.

    le(...)
        le(a, b) -- Same as a<=b.

    length_hint(...)
        length_hint(obj, default=0) -> int
        Return an estimate of the number of items in obj.
        This is useful for presizing containers when building from an
        iterable.

        If the object supports len(), the result will be
        exact. Otherwise, it may over- or under-estimate by an
        arbitrary amount. The result will be an integer >= 0.

    lshift(...)
        lshift(a, b) -- Same as a << b.

    lt(...)
        lt(a, b) -- Same as a<b.

    matmul(...)
        matmul(a, b) -- Same as a @ b.

    mod(...)
        mod(a, b) -- Same as a % b.

    mul(...)
        mul(a, b) -- Same as a * b.

    ne(...)
        ne(a, b) -- Same as a!=b.

    neg(...)
        neg(a) -- Same as -a.

    not_(...)
        not_(a) -- Same as not a.

    or_(...)
        or_(a, b) -- Same as a | b.

    pos(...)
        pos(a) -- Same as +a.

    pow(...)
        pow(a, b) -- Same as a ** b.

    rshift(...)
        rshift(a, b) -- Same as a >> b.

    setitem(...)
        setitem(a, b, c) -- Same as a[b] = c.

    sub(...)
        sub(a, b) -- Same as a - b.

    truediv(...)
        truediv(a, b) -- Same as a / b.

    truth(...)
        truth(a) -- Return True if a is true, False otherwise.

    xor(...)
        xor(a, b) -- Same as a ^ b.

DATA
    __all__ = ['abs', 'add', 'and_', 'attrgetter', 'concat', 'contains', '...

FILE
    c:\users\csw\miniconda2\envs\guanwang\lib\operator.py

foldByKey(zeroValue, func, numPartitions=None, partitionFunc=)[source]

foldByKey的操作與fold幾乎一樣,只不過前者按鍵來分組,後者按分割槽分組,前者不同鍵不再進一步操作,後者不同分割槽結果還會進一步操作。foldByKey最終結果也是普通python物件,而不是rdd。一個鍵佔一個元組的空間,存放在list中。

>>> from operator import add
>>> rdd = sc.parallelize([("a", 1), ("b", 1), ("a", 1)])
>>> rddFB=sorted(rdd.foldByKey(1, add).collect())
>>> print(rdd.collect())
#結果為:[('a', 1), ('b', 1), ('a', 1)]
>>> print(rddFB)
#結果為:[('a', 4), ('b', 2)]

RDD.aggregate(zeroValue, seqOp, combOp)

aggregate與fold相似又很不同。
seqOp操作會聚合各分割槽中的元素,然後combOp操作把所有分割槽的聚合結果再次聚合,兩個操作的初始值都是zeroValue. seqOp的操作是遍歷分割槽中的所有元素(T),zeroValue跟第一個T做操作,結果再作為與第二個T做操作的zeroValue,直到遍歷完整個分割槽。combOp操作是把各分割槽聚合的結果,再聚合,zeroValue與第一個分割槽結果聚合,聚合結果相當於新的zeroValue,再與第二個分割槽結果聚合,一直進行下去。aggregate函式返回一個跟RDD不同型別的值。因此,需要一個操作seqOp來把分割槽中的元素T合併成一個U,另外一個操作combOp把所有U聚合。

seqOp=(lambda x,y:(x[0]+y,x[1]+1))
combOp=(lambda x,y:(x[0]+y[0],x[1]+y[1]))
x=sc.parallelize([1,2,3,4,5,6],2)
print(x.glom().collect())
#結果為:[[1, 2, 3], [4, 5, 6]]
y=x.aggregate((1,2),seqOp,combOp)
print(y)
#結果為:(24, 12)

#計算過程如下:
#(1,2)--》(1+1,2+1)-->(2+2,3+1)-->(4+3,4+1)-->(7,5);
#(1,2)--》(1+4,2+1)-->(5+5,3+1)-->(10+6,4+1)-->(16,5);
#(1,2)--》(1+7,2+5)-->(8+16,7+5)-->(24,12);

RDD.aggregateByKey(zeroValue, seqFunc, combFunc, numPartitions=None, partitionFunc=)

aggregate是按分割槽進行,而aggregateByKey是按鍵來進行,但是zeroValue與aggregate中的用法很不一樣,這裡的zeroValue是一個值,它即可以跟這樣鍵聚合,也可以跟那個鍵聚合,而且zeroValue必須與鍵內聚合時定義的形式一致。

x=sc.parallelize([('B', 1), ('B', 2), ('B', 3), ('A', 4), ('A', 5),('A', 6)])
zeroValue=[7]
mergeVal=(lambda aggregated,el:aggregated+[(el,el**2)])
mergeComb=(lambda agg1,agg2:agg1+agg2)
y=x.aggregateByKey(zeroValue,mergeVal,mergeComb)
print(x.collect())
#結果為:[('B', 1), ('B', 2), ('B', 3), ('A', 4), ('A', 5), ('A', 6)]
print(y.collect())
#結果為:[('A', [7, (4, 16), (5, 25), (6, 36)]), ('B', [7, (1, 1), (2, 4), (3, 9)])]
#計算過程如下:
#('B', [7]);('B', (1,1**2))-->('B', [7,(1,1)])-->('B', [7,(1,1)]);('B', (2,2**2))-->('B', [7,(1,1),(2,4)])...-->[('B', [7, (1, 1), (2, 4), (3, 9)])]
同時'A'也進行這樣的過程
#[('B', [7, (1, 1), (2, 4), (3, 9)])];[('A', [7, (4, 16), (5, 25), (6, 36)])]-->[('A', [7, (4, 16), (5, 25), (6, 36)]), ('B', [7, (1, 1), (2, 4), (3, 9)])]

combineByKey(createCombiner, mergeValue, mergeCombiners, numPartitions=None, partitionFunc=)

與foldByKey()的做法很相似,但是它沒有初始值,而且可以定義多個環節的操作函式,所以功能會更加靈活。
createCombiner: V => C ,這個函式把各元素的值作為引數,此時