1. 程式人生 > >學習隨筆--pyspark RDD常用操作

學習隨筆--pyspark RDD常用操作

-- port asm mas 單獨 進行 ces group odin

# -*- coding:utf-8 -*-
from pyspark import SparkContext, SparkConf
from pyspark.streaming import StreamingContext
import math

appName = "jhl_spark_1"  # 你的應用程序名稱
master = "local"  # 設置單機
conf = SparkConf().setAppName(appName).setMaster(master)  # 配置SparkContext
sc = SparkContext(conf=conf)

# parallelize:並行化數據,轉化為RDD
data = [1, 2, 3, 4, 5]
distData = sc.parallelize(data, numSlices=10)  # numSlices為分塊數目,根據集群數進行分塊

# textFile讀取外部數據
#rdd = sc.textFile("./c2.txt")  # 以行為單位讀取外部文件,並轉化為RDD
#print rdd.collect()


# map:叠代,對數據集中數據進行單獨操作
def my_add(l):
    return (l, l)


data = [1, 2, 3, 4, 5]
distData = sc.parallelize(data)  # 並行化數據集
result = distData.map(my_add)
print (result.collect())  # 返回一個分布數據集
#[(1, 1), (2, 2), (3, 3), (4, 4), (5, 5)]

# filter:過濾數據
def my_add(l):
    result = False
    if l > 2:
        result = True
    return result


data = [1, 2, 3, 4, 5]
distData = sc.parallelize(data)  # 並行化數據集,分片
result = distData.filter(my_add)
print (result.collect())  # 返回一個分布數據集
#[3, 4, 5]
# zip:將兩個RDD對應元素組合為元組
x = sc.parallelize(range(0, 5))
y = sc.parallelize(range(1000, 1005))
print x.zip(y).collect()
#[(0, 1000), (1, 1001), (2, 1002), (3, 1003), (4, 1004)]
# union 組合兩個RDD
print x.union(x).collect()
##[0, 1, 2, 3, 4, 0, 1, 2, 3, 4]
# Aciton操作

# collect:返回RDD中的數據
rdd = sc.parallelize(range(1, 10))
print rdd
print rdd.collect()
#ParallelCollectionRDD[9] at parallelize at PythonRDD.scala:475
#[1, 2, 3, 4, 5, 6, 7, 8, 9]
# collectAsMap:以rdd元素為元組,以元組中一個元素作為索引返回RDD中的數據
m = sc.parallelize([(‘a‘, 2), (3, 4)]).collectAsMap()
print m[‘a‘]
print m[3]
#2
#4
# groupby函數:根據提供的方法為RDD分組:
rdd = sc.parallelize([1, 1, 2, 3, 5, 8])


def fun(i):
    return i % 2


result = rdd.groupBy(fun).collect()
print [(x, sorted(y)) for (x, y) in result]
#[(0, [2, 8]), (1, [1, 1, 3, 5])]
# reduce:對數據集進行運算
rdd = sc.parallelize(range(1, 10))
result = rdd.reduce(lambda a, b: a + b)
print result
#45

  

學習隨筆--pyspark RDD常用操作