pyspark-Spark程式設計指南
阿新 • • 發佈:2019-01-08
參考:
1、http://spark.apache.org/docs/latest/rdd-programming-guide.html
2、https://github.com/apache/spark/tree/v2.2.0
Spark程式設計指南
連線Spark
from pyspark import SparkContext, SparkConf
初始化Spark
from pyspark import SparkContext, SparkConf conf = SparkConf().setAppName(appName).setMaster(master) sc = SparkContext(conf=conf)
使用Shell
$ ./bin/pyspark --master local[4] $ ./bin/pyspark --master local[4] --py-files code.py
彈性分散式資料集 (RDDs)
並行集合
data = [1, 2, 3, 4, 5] distData = sc.parallelize(data)
外部資料集
# Text file RDDs can be created using SparkContext’s textFile method. # This method takes an URI for the file (either a local path on the machine,# or a hdfs://, s3n://, etc URI) and reads it as a collection of lines. >>> distFile = sc.textFile("data.txt") textFile("/my/directory"), textFile("/my/directory/*.txt"), and textFile("/my/directory/*.gz"). # lets you read a directory containing multiple small text files SparkContext.wholeTextFiles RDD.saveAsPickleFile andSparkContext.pickleFile # support saving an RDD in a simple format consisting of pickled Python objects.
儲存和載入SequenceFiles
>>> rdd = sc.parallelize(range(1, 4)).map(lambda x: (x, "a" * x)) >>> rdd.saveAsSequenceFile("path/to/file") >>> sorted(sc.sequenceFile("path/to/file").collect()) [(1, u'a'), (2, u'aa'), (3, u'aaa')]
儲存和載入其他Hadoop輸入/輸出格式
$ ./bin/pyspark --jars /path/to/elasticsearch-hadoop.jar >>> conf = {"es.resource" : "index/type"} # assume Elasticsearch is running on localhost defaults >>> rdd = sc.newAPIHadoopRDD("org.elasticsearch.hadoop.mr.EsInputFormat", "org.apache.hadoop.io.NullWritable", "org.elasticsearch.hadoop.mr.LinkedMapWritable", conf=conf) >>> rdd.first() # the result is a MapWritable that is converted to a Python dict (u'Elasticsearch ID', {u'field1': True, u'field2': u'Some Text', u'field3': 12345})
RDD 操作
lines = sc.textFile("data.txt") lineLengths = lines.map(lambda s: len(s)) totalLength = lineLengths.reduce(lambda a, b: a + b) lineLengths.persist()
傳遞函式到Spark
"""MyScript.py""" if __name__ == "__main__": def myFunc(s): words = s.split(" ") return len(words) sc = SparkContext(...) sc.textFile("file.txt").map(myFunc) class MyClass(object): def func(self, s): return s def doStuff(self, rdd): return rdd.map(self.func) class MyClass(object): def __init__(self): self.field = "Hello" def doStuff(self, rdd): return rdd.map(lambda s: self.field + s) def doStuff(self, rdd): field = self.field return rdd.map(lambda s: field + s)
瞭解關閉
例子
counter = 0 rdd = sc.parallelize(data) # Wrong: Don't do this!! def increment_counter(x): global counter counter += x rdd.foreach(increment_counter) print("Counter value: ", counter)
使用鍵值對
lines = sc.textFile("data.txt") pairs = lines.map(lambda s: (s, 1)) counts = pairs.reduceByKey(lambda a, b: a + b)
變數共享
廣播變數
>>> broadcastVar = sc.broadcast([1, 2, 3])
<pyspark.broadcast.Broadcast object at 0x102789f10> >>> broadcastVar.value # list [1, 2, 3] da=sc.parallelize(broadcastVar.value) # -->RDD
累加器
>>> accum = sc.accumulator(0) >>> accum Accumulator<id=0, value=0> >>> sc.parallelize([1, 2, 3, 4]).foreach(lambda x: accum.add(x)) ... 10/09/29 18:41:08 INFO SparkContext: Tasks finished in 0.317106 s >>> accum.value # <type 'int'> 10
class VectorAccumulatorParam(AccumulatorParam): def zero(self, initialValue): return Vector.zeros(initialValue.size) def addInPlace(self, v1, v2): v1 += v2 return v1 # Then, create an Accumulator of this type: vecAccum = sc.accumulator(Vector(...), VectorAccumulatorParam())
accum = sc.accumulator(0) def g(x): accum.add(x) return f(x) data.map(g) # Here, accum is still 0 because no actions have caused the `map` to be computed.