1. 程式人生 > >spark中的Dataset和DataFrame

spark中的Dataset和DataFrame

參考《Spark與Hadoop大資料分析》Venkat Ankam和官方文件

利用DataFrame進行分析

建立DataFrame

從spark2.0及更高版本開始,SparkSession成為了關係型功能的入口點。當使用Hive時,SparkSession必須使用enableHiveSupport方法建立。

從結構化資料檔案中讀取

#pyspark
df = spark.read.parquet("xxx.parquet")
df2 = spark.read.json("xxx.json") 

使用列表、模式和預設資料型別建立DataFrame

myList = [(50
, "DataFrame"), (60, "pandas")]
myschema = ['col_1', 'col_2'] df = spark.createDataFrame(myList,myschema)

RDD轉換成DataFrame來建立DataFrame。

myList = [(50, "DataFrame"), (60, "pandas")]
myschema = ['col_1', 'col_2']
df = sc.parallelize(myList).toDF(myschema)
df.printSchema()

Hive表中建立DataFrame

sample_07 = spark.table
("sample_07") sample_07.show() spark.sql("select * from xxx").show()

外部資料庫

首先把/usr/lib/hive/mysql-connector-java.jar複製到Spark的JAR包目錄,

df = spark.read.format('jdbc').options(url='jdbc:mysql://地址', dbtable='表名', user='使用者名稱', password='密碼').loads
df.show()

使用.rdd將DataFrame轉換為RDD

df_rdd = df.rdd
df_rdd.show
()

常用的Dataset和DataFrame操作

輸入輸出資料

read:為任何資料提供通用讀取功能
write:為任何資料來源提供通用寫功能

基本函式

1,As[U]:返回把記錄對映到特定型別的新Dataset。
2,toDF:返回帶有重新命名列的新DataFrame。
3,explain:將(邏輯或物理)計劃列印到控制檯,用於除錯。
4,printSchema:以樹形格式把模式打印出來。
5, createTempView:使用給定的名稱將當前DataFrame註冊為臨時表。
6,cache()或persist():根據指定的持久化級別把Dataset持久化。

DSL函式domain-specific-language (DSL)

域特定語言函式,是用於分析的。這些函式是延遲的,不會啟動執行。
agg:以不分組的方式在整個Dataset/DataFrame上進行聚合。
distinct:返回一個帶有唯一行的新的Dataset/DataFrame。
drop:返回一個刪除了一列的新Dataset/DataFrame。
filter:利用給定的條件對行進行過濾。
join:利用給定的join表示式與另一個Dataset/DataFrame進行join操作。
limit:僅僅提取前n行,返回一個新的Dataset/DataFrame。
sort:返回一個按升序對指定的列進行排序後的新Dataset/DataFrame。
groupby:利用指定的列對Dataset/DataFrame進行分組。
unionAll:返回一個包含對兩個DataFrame的行進行union產生的新Dataset/DataFrame。
na:用於處理缺失值或空值。
示例:

#pyspark shell
#建立一個DataFrame
df = spark.createDataFrame([(1, 2, 3), (4, 5, 6)], ['a', 'b', 'c'])
df.show()
#結果:含有a,b,c三列的DataFrame, 和pandas中的DataFrame非常像。
+---+---+---+
|  a|  b|  c|
+---+---+---+
|  1|  2|  3|
|  4|  5|  6|
+---+---+---+

#以列表的形式返回df的欄位名: ['a', 'b', 'c']
df.columns
#統計資料表的行數:2
df.count()
#選擇a列
df.a
#協方差cov(),計算a列和b列的協方差covariance:4.5
df.cov('a', 'b')

#建立臨時表:createGlobalTempView(name) 和createTempView(name),兩個臨時表的週期不一樣,前者和spark application相關,後者和spark sessione 有關。
df.createGlobalTempView("people")
df_2 = spark.sql('select * from global_temp.people')
df_2.show()
+---+---+---+
|  a|  b|  c|
+---+---+---+
|  1|  2|  3|
|  4|  5|  6|
+---+---+---+
df.createTempView('dd')
spark.sql('select * from dd').show()

#篩選filter(),仍返回一個DataFrame
df_2 = df.filter(df.a>1)
#也可以用pandas中的用法,等價於:df_2 = df[df['a']>1]
df_2.show()
+---+---+---+
|  a|  b|  c|
+---+---+---+
|  4|  5|  6|
+---+---+---+
#按列篩選select(),仍返回一個DataFrame
df.select('a', 'c').show()
#或者用pandas中的方式:df[['a', 'c']].show()
+---+---+
|  a|  c|
+---+---+
|  1|  3|
|  4|  6|
+---+---+
#df.dtypes,檢視每一列的資料型別,和pandas中的用法一樣。
[('a', 'bigint'), ('b', 'bigint'), ('c', 'bigint')]

#describe(), 返回一個DataFrame,用於檢視資料表的描述情況,包括最佳,計數等 
df.describe().show()
+-------+------------------+------------------+------------------+
|summary|                 a|                 b|                 c|
+-------+------------------+------------------+------------------+
|  count|                 2|                 2|                 2|
|   mean|               2.5|               3.5|               4.5|
| stddev|2.1213203435596424|2.1213203435596424|2.1213203435596424|
|    min|                 1|                 2|                 3|
|    max|                 4|                 5|                 6|
+-------+------------------+------------------+------------------+

#drop(),刪除特定列,返回新的DataFrame
df.drop('a', 'b').show()
+---+
|  c|
+---+
|  3|
|  6|
+---+
#distinct()去掉重複行, 返回一個DataFrame, 考慮所有的列。
df.distinct().show()
#drop_duplicates()(等價於dropDuplicates()),可以對特定的列進行去重,列名放在列表中。
df.drop_duplicates(['a', 'b'])#只考慮a, b列相同的行數,保留第一行。

#刪除含有空值的行,並返回一個去空值後的DataFrame,可以只考慮特定的列。
dropna(how='any', thresh=None, subset=None)
how 為'any'表示刪除含有空值的行,'all'表示刪除全為空值的行。
df.dropna('any', None, ['a']).show()

#df.fillna()填充空缺值,返回一個DataFrame。如果引數為單個值,即所有的空缺值用該值填充。引數也可以是字典,對不同欄位中的空缺值填充不同的值。
df.fillna(0)
df.fillna({'a':0, 'b':'null'})

#連線join(), 第三個引數:inner, left, right。類似mysql中的連線操作。
df.join(df_2, df.a == df_2.a, 'left').show()
+---+---+---+----+----+----+                                                    
|  a|  b|  c|   a|   b|   c|
+---+---+---+----+----+----+
|  1|  2|  3|null|null|null|
|  4|  5|  6|   4|   5|   6|
+---+---+---+----+----+----+
#crossJoin():笛卡爾積cartesian product
df.crossJoin(df).show()
+---+---+---+---+---+---+
|  a|  b|  c|  a|  b|  c|
+---+---+---+---+---+---+
|  1|  2|  3|  1|  2|  3|
|  1|  2|  3|  4|  5|  6|
|  4|  5|  6|  1|  2|  3|
|  4|  5|  6|  4|  5|  6|
+---+---+---+---+---+---+

#foreach(),類似於很多程式語言中foreach(),傳入一個函式,但作用於DataFrame中所有的行
#foreachPartition(), 作用於DataFrame中的partition。
df.foreach(lambda x:print(x))
結果:
Row(a=1, b=2, c=3)
Row(a=4, b=5, c=6)

df.foreachPartition(lambda x : print(x))
結果:
<itertools.chain object at 0x7fe0db8e4518>

內建函式、聚合函式、視窗函式

內建函式:和使用者自定義函式一樣,對單個行進行操作,計算出單個值。如substr、round。
聚合函式:對若干行進行操作,計算出單個值。如:min、max、sum、mean、first、last、avg、count、countDistinct和approxCountDistinct。
視窗函式:對與當前行相關的若干行進行操作。如:rank、denseRank、percentRank、ntile和rowNumber。

動作

collect:返回一個包含該DataFrame中所有行的陣列。
count:對該 DataFrame 中的行數進行計數。
describe:對數值列計算其統計資料。
show:將前n行列印到控制檯。
take:以行的列表的形式返回前n行。

RDD操作

也可以在DataFrame上運用RDD操作,如:map、flatMap、coalesce、repartition、foreach、toJson和toJavaRDD, 這些在DSL中沒有的。

Data Source API

Data Source API提供了一個使用Spark SQL載入和儲存資料的單一介面。通過spark.sql.sources.default配置屬性,設定預設資料來源為parquet,可以根據需要修改。

內建資料來源

包括:text, json, parquet, orc, jdbc和csv。

#讀
df = spark.read.json("xxx.json")#或者像下面這樣寫
df = spark.read.load("xxx.json", format='json')#預設是parquet,如果是parquet,可以不寫format。
#寫
df.write.json("newjson_dir")#或者像下面這樣寫
df.write.format("json").save("newjson_dir")

外部資料來源

外部資料來源在spark packages中可用,不包括在spark中。
spark packages 提供了用於從NoSQL資料庫(HBase、Cassandra等)讀取不同檔案格式和資料的元件包。當你想要在應用程式中包含一個spark元件包時,需要使用–packages命令列選項,加上以逗號分隔的jar包的maven座標(coordinate)列表, 把它包含在驅動程序和執行程序的classpath上。

AVRO

XML

pandas

基於DataFram的Spark-on-HBase聯結器

Hive on Spark

Hive on Spark 是為了使現有Hive使用者能夠直接在Spark執行引擎,而不是在MapReduce上執行Hive查詢而建立的。在Hive或beeline客戶端中將執行引擎更改為spark。

hive>> set hive.execution.engine=spark;#hive命令列