Pyspark 官方介面文件翻譯
由於網上關於Pyspark的資料太過於零散,官方文件也沒有中文版,所以只能自己嘗試來翻譯,第一次翻譯文件,肯定會有很多謬誤,希望大家能多評論指正,共同學習spark!
核心內容:
SparkContext:
Spark功能主要介面
RDD:
彈性分散式資料集,是Spark的基礎概念
streaming.StramingContext:
Spark Streaming功能的主要接入口
streaming.DStream:
離散流,是Spark Streaming中的基本概念
sql.SQLContext:
DataFrame和sql功能的主要介面
sql.DataFrame:
已分組資料指定列的分散式資料集合
SparkConf
一個Spark應用的配置,用於設定key-value形式的Spark變數
大多數時候,你建立SparkConf時候用 SparkConf()的形式從spark.*Java系統屬性中載入,在這種情況下
任何你直接設定的引數都會優先於系統屬性。
對於單元測試,你可以直接呼叫SparkConf來跳過設定來得到相同的配置而不需要關心繫統屬性。
所有的設定方法都支援鏈式連線,如下:
conf.setMaster(“local”).setAppName(“My app”).
注意:一旦一個SparkConf已經傳遞給Spark,它就被複制且不能再被使用者更改
contains(key)
檢視配置是否有key
get(key, defaultValue=None)
拿到一個已經配置好的key的值,否則返回一個預設值
getAll()
拿到所有配置好的屬性值
set(key, value)
設定屬性
setAll(pairs)
批量設定引數,外部以列表形式,內部是key-value形式
setAppName(value)
設定應用名字
setExecutorEnv(key=None,value=None,pairs=None)
設定環境變數傳送給執行緒池
setIfMissing(key,value)
若不存在則設定配置屬性
setMaster(value)
設定主節點URL去連線
setSparkHome(value)
設定Spark在工作節點上的安裝路徑
setDebugString()
返回並列印配置資訊
測試部分介面如下:
SparkContext
SparkContext 可以連線一個Spark叢集,能夠建立RDD和傳送變數到叢集
包擴充套件 = ('.zip', '.egg', '.jar')
accumulator(value,accum_param=None)
建立一個給定初始變數的累加器,用一個給定的累加器引數幫助物件(AccumulatorParam helper object)來
定義如何新增資料型別值。預設累加器引數只適用於整形和浮點型,如果需要其他型別的可以自己定義
addFile(path, recursive=False)
新增一個檔案供Spark任務的每個節點下載使用,path可以是本地檔案路徑也可以是HDFS中檔案(或者是
其他Hadoop支援的檔案系統),也可以是http,https,ftp 地址。
為了將檔案新增到到Spark任務中,使用檔名來尋找它的下載位置:
L{SparkFiles.get(fileName)<pyspark.files.SparkFiles.get>}
如果引數recusive(遞迴)設定為True,可以使用資料夾,但目前只支援Hadoop類的檔案系統
操作示例如下:
addPyFile(path)
新增要給.py或者.zip檔案給所有在SparkContext中將要執行的任務執行,Path可以是本地路徑,HDFS或者是其他Hadoop支援的檔案系統,http,https,ftp連結。
applicationId
一個Spark應用的唯一認證,格式依賴於排程器實現。
>>> sc.applicationId
'local-...'
binaryFiles(path,minPartitions=None)
讀取二進位制檔案,可以是所有節點的本地檔案系統,HDFS...每個檔案作為單獨記錄,並返回鍵值對,keys是檔案路徑,
value是每個檔案的內容
注意:推薦是小檔案,大檔案也可以,但是可能會效率較差
binaryRecords(path, recordLength)
從二進位制檔案中讀取資料,假設每個記錄都是一串指定數字格式的數字,每個記錄的數字不變,path是檔案路徑
而recordLength是分割記錄的長度。
broadcast(value)
向叢集群發要給只讀變數,返回一個用於在分散式物件中讀取的物件,這個變數只會給每個叢集發一次
cancelAllJobs()
取消所有已新增的或者正在執行的任務
setJobGroup(groupId,descriptiom,interruptOnCancel=False)
給所有在一個執行緒中任務分配一個groupId,經常會有單個應用的一個執行單元由多個Spark任務組成,應用開發者能用
這個方法來給所有任務分組和新增描述資訊,一旦設定好,Spark UI會將任務與組聯絡起來
>>> import threading
>>> from time import sleep
>>> result = "Not Set"
>>> lock = threading.Lock()
>>> def map_func(x):
... sleep(100)
... raise Exception("Task should have been cancelled")
>>> def start_job(x):
... global result
... try:
... sc.setJobGroup("job_to_cancel", "some description")
... result = sc.parallelize(range(x)).map(map_func).collect()
... except Exception as e:
... result = "Cancelled"
... lock.release()
>>> def stop_job():
... sleep(5)
... sc.cancelJobGroup("job_to_cancel")
>>> supress = lock.acquire()
>>> supress = threading.Thread(target=start_job, args=(10,)).start()
>>> supress = threading.Thread(target=stop_job).start()
>>> supress = lock.acquire()
>>> print(result)
Cancelled
cancelJobGroup(groupId)
取消組內所有任務執行
defaultMinPartitions
預設HadoopRDDS的最小分塊數
defaultParallelism
parallelism預設的level(如:reduce 任務)
dump_profiles(path)
將概要統計資訊轉儲到目錄路徑中
emptyRDD()
建立一個沒有分割槽和元素的RDD
getConf()
拿到該SparkContext的SparkConf物件
hadoopFile(path, inputFormatClass, keyClass, valueClass, keyConverter=None,
valueConverter=None, conf=None, batchSize=0)
以HDFS(本地檔案系統)中任意一對key/value讀取一箇舊的Hadoop輸入格式,原理和sc.sequenceFile
相同。
一個Hadoop配置能被以Python字典的形式傳輸,這將轉化為Java中的配置
Path:Hadoop檔案目錄
inputFormatClass:Hadoop輸入格式的全描述類名(eg:“org.apache.hadoop.mapred.TextInputFormat”)
keyClass:key可寫類的全描述類名(e.g. “org.apache.hadoop.io.Text”)
valueClass:value可寫類的全描述類名(e.g. “org.apache.hadoop.io.LongWritable”)
hadoopRDD(inputFormatClass, keyClass, valueClass, keyConverter=None, valueConverter=None,
conf=None, batchSize=0)
同上類似
parallelize(c, numSlices=None)
分配本地python集合來生成一個RDD,如果輸入型式是一個範圍可以用xrange
>>> sc.parallelize([0, 2, 3, 4, 6], 5).glom().collect()
[[0], [2], [3], [4], [6]]
>>> sc.parallelize(xrange(0, 6, 2), 5).glom().collect()
[[], [0], [], [2], [4]]
pickleFile(name,minPartits=None)
載入之前用RDD.saveAsPickleFile方法儲存的RDD
tmpFile = NamedTemporaryFile(delete=True)
>>> tmpFile.close()
>>> sc.parallelize(range(10)).saveAsPickleFile(tmpFile.name, 5)
>>> sorted(sc.pickleFile(tmpFile.name, 3).collect())
[0, 1, 2, 3, 4, 5, 6, 7, 8, 9]
range(start,end=None,step=1,numSlices=None)
看程式碼自然懂
>>> sc.range(5).collect()
[0, 1, 2, 3, 4]
>>> sc.range(2, 4).collect()
[2, 3]
>>> sc.range(1, 7, 2).collect()
[1, 3, 5]
runJob(rdd,partitionFunc,partitions=None,allowLocal=False)
在指定的分割槽裡執行partionFunc函式,以元素陣列的形式返回結果,如果沒有指定,就會一直輪詢執行。
如上程式碼,test是分了3個分割槽的RDD,分別是0,2,4,函式中引數制定了只執行第二分割槽,得到結果是4
文件程式碼如下;
myRDD = sc.parallelize(range(6), 3)
>>> sc.runJob(myRDD, lambda part: [x * x for x in part], [0, 2], True)
[0, 1, 16, 25]
sequenceFile
(path, keyClass=None, valueClass=None, keyConverter=None, valueConverter=None,
minSplits=None, batchSize=0)
...
setLocalProperty(key,value)
設定一個影響從該執行緒提交的作業的本地屬性,例如Spark fair排程器池
setLogLevel(logLevel)
控制日誌級別,級別包括:ALL, DEBUG, ERROR, FATAL, INFO, OFF, TRACE, WARN
sparkUser()
獲取當前在執行SparkContext的系統使用者名稱
startTime
返回SparkContext的啟動時間
stop()
中斷停止SparkContext
textFile(name, minPartitions=None, use_unicode=True)
從HDFS,本地檔案系統(所有節點可用),返回一個字串RDD,如果use_unicode是False,字串將是 str型別(utf-8編碼)。
>>> path = os.path.join(tempdir, "sample-text.txt")
>>> with open(path, "w") as testFile:
... _ = testFile.write("Hello world!")
>>> textFile = sc.textFile(path)
>>> textFile.collect()
['Hello world!']
union(rdds)
建立RDD列表的合併集,程式碼如下:
>>> path = os.path.join(tempdir, "union-text.txt")
>>> with open(path, "w") as testFile:
... _ = testFile.write("Hello")
>>> textFile = sc.textFile(path)
>>> textFile.collect()
['Hello']
>>> parallelized = sc.parallelize(["World!"])
>>> sorted(sc.union([textFile, parallelized]).collect())
['Hello', 'World!']
wholeTextFiles(dirPath)
每個檔案都被讀為單獨一份記錄,返回一個鍵值對,key是檔案目錄,value是檔案內容
>>> dirPath = os.path.join(tempdir, "files")
>>> os.mkdir(dirPath)
>>> with open(os.path.join(dirPath, "1.txt"), "w") as file1:
... _ = file1.write("1")
>>> with open(os.path.join(dirPath, "2.txt"), "w") as file2:
... _ = file2.write("2")
>>> textFiles = sc.wholeTextFiles(dirPath)
>>> sorted(textFiles.collect())
[('.../1.txt', '1'), ('.../2.txt', '2')]
pyspark.RDD
aggreate(zeroValue, seqOp, combOp)
aggregate先對每個分割槽的元素做聚集,然後對所有分割槽的結果做聚集,聚集過程中,使用的是給定的聚集函式以及初始值”zero value”。這個函式能返回一個與原始RDD不同的型別U,因此,需要一個合併RDD型別T到結果型別U的函式,還需要一個合併型別U的函式。這兩個函式都可以修改和返回他們的第一個引數,而不是重新新建一個U型別的引數以避免重新分配記憶體。
引數zeroValue:seqOp
運算子的每個分割槽的累積結果的初始值以及combOp
運算子的不同分割槽的組合結果的初始值 - 這通常將是初始元素(例如“Nil”表的列表 連線或“0”表示求和)
引數seqOp: 每個分割槽累積結果的聚集函式。
引數combOp: 一個關聯運算子用於組合不同分割槽的結果
>>> seqOp = (lambda x, y: (x[0] + y, x[1] + 1))
>>> combOp = (lambda x, y: (x[0] + y[0], x[1] + y[1]))
>>> sc.parallelize([1, 2, 3, 4]).aggregate((0, 0), seqOp, combOp)
(10, 4)
>>> sc.parallelize([]).aggregate((0, 0), seqOp, combOp)
(0, 0)
x是(0,0),y是[1,2,3,4]
執行過程:
首先定義要給初始值(0,0)
x[0]+1, x[1]+1
1+2,1+1
3+3,2+1
6+4,3+1
不斷地將第一個計算結果當作第二個的zeroValue(類似reduce的操作形式)
如果分割槽了就各自計算,然後再相加
checkpoint
()
標記這個RDD作為檢查標記點, 儲存在
這個函式必須先在這個RDD執行任何任務之前先被喚起,強烈建議存在記憶體中不然存在檔案中會要求重新計算。
coalesce
(numPartitions, shuffle=False)
>>> sc.parallelize([1, 2, 3, 4, 5], 3).glom().collect() [[1], [2, 3], [4, 5]] >>> sc.parallelize([1, 2, 3, 4, 5], 3).coalesce(1).glom().collect() [[1, 2, 3, 4, 5]]
collect()
返回一個包含RDD所有元素的列表
collectAdMap()
返回一個key-value對。
>>> m = sc.parallelize([(1, 2), (3, 4)]).collectAsMap() >>> m[1] 2 >>> m[3] 4
上兩個方法都是隻能建立在目標資料量不大的情況下,因為資料都會被寫入記憶體。