PySpark中RDD與DataFrame
1. 彈性資料集RDD
RDD是一個抽象的分散式資料集合,它提供了一系列轉化操作(例如基本的map()
、flatMap()
、filter()
,類集合操作union()
、intersection()
、subtract()
)和行動操作(例如collect()
、count()
、take()
、top()
、reduce()
、foreach()
)。可以說,RDD是非常靈活的資料集合,其中可以存放型別相同或者互異的資料,同時可以指定任何自己期望的函式對其中的資料進行處理。
建立一個RDD:
# 從list中建立
rdd = sc.parallelize([1, '2', (3, 4), ['5', '6' ]])
# 從檔案中讀取
rdd = sc.textFile('\path\to\file')
還有一類RDD是key-value Pair RDD,即規定RDD每個元素都是一個二元組,其中第一個值是key,第二個值為value,key一般選取RDD中每個元素的一個欄位。
建立一個Pair RDD:
# 建立一個普通RDD
rdd = sc.parallelize([('a', 1, 2), ('b', 3, 4), ('c', 5, 6)])
# 提取每個元素的第一個元素作為key剩餘元素作為value建立Pair RDD
pair_rdd = rdd.map(lambda x: (x[0 ], x[1:]))
可以看到Pair RDD實質上仍然是一個普通的RDD,那為什麼它要單獨拿出來講呢?
這是因為,Pair RDD由於有key的存在,與普通的RDD相比更加格式化,這種特性就會給Pair RDD賦予一些特殊的操作,例如groupByKey()
可以將具有相同key進行分組,其結果仍然得到Pair RDD,然後利用mapValues()
對相同key的value進行函式計算;reduceByKey()
、countByKey()
和sortByKey()
等一系列“ByKey()”操作同理。
另外,兩個Pair RDD具有像SQL一樣的連線操作,例如兩個Pair RDD進行join()
leftOuterJoin()
、rightOuterJoin()
、fullOuterJoin()
等操作同理。
2. Spark SQL中的DataFrame
Pair RDD已經被一定程度的格式化了,它的每個元素會具有key,但是value仍然具有很大的靈活性。DataFrame是一種完全格式化的資料集合,和資料庫中的表的概念比較接近,它每列資料必須具有相同的資料型別。也正是由於DataFrame知道資料集合所有的型別資訊,DataFrame可以進行列處理優化而獲得比RDD更優的效能。
在內部實現上,DataFrame是由Row
物件為元素組成的集合,每個Row
物件儲存DataFrame的一行,Row
物件中記錄每個域=>值
的對映,因而Row
可以被看做是一個結構體型別。可以通過建立多個tuple/list
、dict
、Row
然後構建DataFrame。
注:用dict
構建DataFrame已經廢棄了,推薦用Row
。
# 建立list的list
lists = [['a', 1], ['b', 2]]
# 構建具有預設生成的列_1、_2的DataFrame
dataframe = spark.createDataFrame(lists)
# 建立dict的list
dicts = [{'col1':'a', 'col2':1}, {'col1':'b', 'col2':2}]
# 構建具有列col1、col2的DataFrame
dataframe = spark.createDataFrame(dicts)
# 建立Row的list
rows = [Row(col1='a', col2=1), Row(col1='b', col2=2)]
# 構建具有列col1、col2的DataFrame
dataframe = spark.createDataFrame(rows)
雖然DataFrame被完全格式化了,但是其中每列可以儲存的型別仍然是非常豐富的,包括基本的資料型別、list、tuple、dict和Row,這也就意味著所有的複雜資料型別都可以相互巢狀,從而解除了完全格式化的限制。例如,你可以在一列中儲存list型別,而每行list按需儲存不定長的資料。
那麼,RDD和DataFrame還有哪些使用上的區別呢?
- RDD:沒有列名稱,只能使用數字來索引;具有
map()
、reduce()
等方法並可指定任意函式進行計算; - DataFrame:一定有列名稱(即使是預設生成的),可以通過
.col_name
或者['col_name']
來索引列;具有表的相關操作(例如select()
、filter()
、where()
、join
),但是沒有map()
、reduce()
等方法。
3. RDD轉換為DataFrame
什麼樣的RDD可以轉換為DataFrame?
RDD靈活性很大,並不是所有RDD都能轉換為DataFrame,而那些每個元素具有一定相似格式的時候才可以。
為什麼RDD需要轉換為DataFrame?
當RDD進行類似表的相應操作時,都需要指定相應的函式,轉換為DataFrame書寫更簡單,並且執行效率高。
怎麼樣將RDD轉換為DataFrame?
就像之前的例子一樣,可以利用
dataframe = spark.createDataFrame(rdd, schema=None, samplingRatio=None)
來將RDD轉換為DataFrame,其中的引數設定需要注意:
schema:DataFrame各列型別資訊,在提前知道RDD所有型別資訊時設定。例如
schema = StructType([StructField('col1', StringType()),
StructField('col2', IntegerType())])
samplingRatio:推測各列型別資訊的取樣比例,在未知RDD所有型別資訊時,spark需要根據一定的資料量進行型別推測;預設情況下,spark會抽取前100的RDD進行推測,之後在真正將RDD轉換為DataFrame時如果遇到型別資訊不符會報錯 Some of types cannot be determined by the first 100 rows, please try again with sampling 。同理取樣比例較低,推測型別資訊也可能錯誤。
4. DataFrame轉換為RDD
有時候DataFrame的表相關操作不能處理一些問題,例如需要對一些資料利用指定的函式進行計算時,就需要將DataFrame轉換為RDD。DataFrame可以直接利用.rdd
獲取對應的RDD物件,此RDD物件的每個元素使用Row
物件來表示,每列值會成為Row
物件的一個域=>值
對映。例如
dataframe = spark.createDataFrame([Row(col1='a', col2=1), Row(col1='b', col2=2)])
>>>
+----+----+
|col1|col2|
+----+----+
| a| 1|
| b| 2|
+----+----+
rdd = dataframe.rdd
>>> [Row(col1=u'a', col2=1), Row(col1=u'b', col2=2)]
DataFrame轉化後的RDD如果需要和一般形式的RDD進行操作(例如join),還需要做索引將數值從Row中取出,比如轉化為Pair RDD可以這樣操作
rdd = rdd.map(lambda x: [x[0], x[1:]])
>>> [[u'a', (1,)], [u'b', (2,)]]
注意:DataFrame轉化的RDD可能包含Row(col1='a')
,它和'a'
是不同的物件,所以如果與一般的RDD進行join,還需要索引Row取出數值。