學習隨筆--pyspark RDD常用操作
阿新 • • 發佈:2018-08-27
-- port asm mas 單獨 進行 ces group odin
# -*- coding:utf-8 -*- from pyspark import SparkContext, SparkConf from pyspark.streaming import StreamingContext import math appName = "jhl_spark_1" # 你的應用程序名稱 master = "local" # 設置單機 conf = SparkConf().setAppName(appName).setMaster(master) # 配置SparkContext sc = SparkContext(conf=conf) # parallelize:並行化數據,轉化為RDD data = [1, 2, 3, 4, 5] distData = sc.parallelize(data, numSlices=10) # numSlices為分塊數目,根據集群數進行分塊 # textFile讀取外部數據 #rdd = sc.textFile("./c2.txt") # 以行為單位讀取外部文件,並轉化為RDD #print rdd.collect() # map:叠代,對數據集中數據進行單獨操作 def my_add(l): return (l, l) data = [1, 2, 3, 4, 5] distData = sc.parallelize(data) # 並行化數據集 result = distData.map(my_add) print (result.collect()) # 返回一個分布數據集 #[(1, 1), (2, 2), (3, 3), (4, 4), (5, 5)] # filter:過濾數據 def my_add(l): result = False if l > 2: result = True return result data = [1, 2, 3, 4, 5] distData = sc.parallelize(data) # 並行化數據集,分片 result = distData.filter(my_add) print (result.collect()) # 返回一個分布數據集 #[3, 4, 5] # zip:將兩個RDD對應元素組合為元組 x = sc.parallelize(range(0, 5)) y = sc.parallelize(range(1000, 1005)) print x.zip(y).collect() #[(0, 1000), (1, 1001), (2, 1002), (3, 1003), (4, 1004)] # union 組合兩個RDD print x.union(x).collect() ##[0, 1, 2, 3, 4, 0, 1, 2, 3, 4] # Aciton操作 # collect:返回RDD中的數據 rdd = sc.parallelize(range(1, 10)) print rdd print rdd.collect() #ParallelCollectionRDD[9] at parallelize at PythonRDD.scala:475 #[1, 2, 3, 4, 5, 6, 7, 8, 9] # collectAsMap:以rdd元素為元組,以元組中一個元素作為索引返回RDD中的數據 m = sc.parallelize([(‘a‘, 2), (3, 4)]).collectAsMap() print m[‘a‘] print m[3] #2 #4 # groupby函數:根據提供的方法為RDD分組: rdd = sc.parallelize([1, 1, 2, 3, 5, 8]) def fun(i): return i % 2 result = rdd.groupBy(fun).collect() print [(x, sorted(y)) for (x, y) in result] #[(0, [2, 8]), (1, [1, 1, 3, 5])] # reduce:對數據集進行運算 rdd = sc.parallelize(range(1, 10)) result = rdd.reduce(lambda a, b: a + b) print result #45
學習隨筆--pyspark RDD常用操作