spark中的Dataset和DataFrame
參考《Spark與Hadoop大資料分析》Venkat Ankam和官方文件。
利用DataFrame進行分析
建立DataFrame
從spark2.0及更高版本開始,SparkSession成為了關係型功能的入口點。當使用Hive時,SparkSession必須使用enableHiveSupport方法建立。
從結構化資料檔案中讀取
#pyspark
df = spark.read.parquet("xxx.parquet")
df2 = spark.read.json("xxx.json")
使用列表、模式和預設資料型別建立DataFrame
myList = [(50 , "DataFrame"), (60, "pandas")]
myschema = ['col_1', 'col_2']
df = spark.createDataFrame(myList,myschema)
RDD轉換成DataFrame來建立DataFrame。
myList = [(50, "DataFrame"), (60, "pandas")]
myschema = ['col_1', 'col_2']
df = sc.parallelize(myList).toDF(myschema)
df.printSchema()
Hive表中建立DataFrame
sample_07 = spark.table ("sample_07")
sample_07.show()
spark.sql("select * from xxx").show()
外部資料庫
首先把/usr/lib/hive/mysql-connector-java.jar複製到Spark的JAR包目錄,
df = spark.read.format('jdbc').options(url='jdbc:mysql://地址', dbtable='表名', user='使用者名稱', password='密碼').loads
df.show()
使用.rdd將DataFrame轉換為RDD
df_rdd = df.rdd
df_rdd.show ()
常用的Dataset和DataFrame操作
輸入輸出資料
read:為任何資料提供通用讀取功能
write:為任何資料來源提供通用寫功能
基本函式
1,As[U]:返回把記錄對映到特定型別的新Dataset。
2,toDF:返回帶有重新命名列的新DataFrame。
3,explain:將(邏輯或物理)計劃列印到控制檯,用於除錯。
4,printSchema:以樹形格式把模式打印出來。
5, createTempView:使用給定的名稱將當前DataFrame註冊為臨時表。
6,cache()或persist():根據指定的持久化級別把Dataset持久化。
DSL函式domain-specific-language (DSL)
域特定語言函式,是用於分析的。這些函式是延遲的,不會啟動執行。
agg:以不分組的方式在整個Dataset/DataFrame上進行聚合。
distinct:返回一個帶有唯一行的新的Dataset/DataFrame。
drop:返回一個刪除了一列的新Dataset/DataFrame。
filter:利用給定的條件對行進行過濾。
join:利用給定的join表示式與另一個Dataset/DataFrame進行join操作。
limit:僅僅提取前n行,返回一個新的Dataset/DataFrame。
sort:返回一個按升序對指定的列進行排序後的新Dataset/DataFrame。
groupby:利用指定的列對Dataset/DataFrame進行分組。
unionAll:返回一個包含對兩個DataFrame的行進行union產生的新Dataset/DataFrame。
na:用於處理缺失值或空值。
示例:
#pyspark shell
#建立一個DataFrame
df = spark.createDataFrame([(1, 2, 3), (4, 5, 6)], ['a', 'b', 'c'])
df.show()
#結果:含有a,b,c三列的DataFrame, 和pandas中的DataFrame非常像。
+---+---+---+
| a| b| c|
+---+---+---+
| 1| 2| 3|
| 4| 5| 6|
+---+---+---+
#以列表的形式返回df的欄位名: ['a', 'b', 'c']
df.columns
#統計資料表的行數:2
df.count()
#選擇a列
df.a
#協方差cov(),計算a列和b列的協方差covariance:4.5
df.cov('a', 'b')
#建立臨時表:createGlobalTempView(name) 和createTempView(name),兩個臨時表的週期不一樣,前者和spark application相關,後者和spark sessione 有關。
df.createGlobalTempView("people")
df_2 = spark.sql('select * from global_temp.people')
df_2.show()
+---+---+---+
| a| b| c|
+---+---+---+
| 1| 2| 3|
| 4| 5| 6|
+---+---+---+
df.createTempView('dd')
spark.sql('select * from dd').show()
#篩選filter(),仍返回一個DataFrame
df_2 = df.filter(df.a>1)
#也可以用pandas中的用法,等價於:df_2 = df[df['a']>1]
df_2.show()
+---+---+---+
| a| b| c|
+---+---+---+
| 4| 5| 6|
+---+---+---+
#按列篩選select(),仍返回一個DataFrame
df.select('a', 'c').show()
#或者用pandas中的方式:df[['a', 'c']].show()
+---+---+
| a| c|
+---+---+
| 1| 3|
| 4| 6|
+---+---+
#df.dtypes,檢視每一列的資料型別,和pandas中的用法一樣。
[('a', 'bigint'), ('b', 'bigint'), ('c', 'bigint')]
#describe(), 返回一個DataFrame,用於檢視資料表的描述情況,包括最佳,計數等
df.describe().show()
+-------+------------------+------------------+------------------+
|summary| a| b| c|
+-------+------------------+------------------+------------------+
| count| 2| 2| 2|
| mean| 2.5| 3.5| 4.5|
| stddev|2.1213203435596424|2.1213203435596424|2.1213203435596424|
| min| 1| 2| 3|
| max| 4| 5| 6|
+-------+------------------+------------------+------------------+
#drop(),刪除特定列,返回新的DataFrame
df.drop('a', 'b').show()
+---+
| c|
+---+
| 3|
| 6|
+---+
#distinct()去掉重複行, 返回一個DataFrame, 考慮所有的列。
df.distinct().show()
#drop_duplicates()(等價於dropDuplicates()),可以對特定的列進行去重,列名放在列表中。
df.drop_duplicates(['a', 'b'])#只考慮a, b列相同的行數,保留第一行。
#刪除含有空值的行,並返回一個去空值後的DataFrame,可以只考慮特定的列。
dropna(how='any', thresh=None, subset=None)
how 為'any'表示刪除含有空值的行,'all'表示刪除全為空值的行。
df.dropna('any', None, ['a']).show()
#df.fillna()填充空缺值,返回一個DataFrame。如果引數為單個值,即所有的空缺值用該值填充。引數也可以是字典,對不同欄位中的空缺值填充不同的值。
df.fillna(0)
df.fillna({'a':0, 'b':'null'})
#連線join(), 第三個引數:inner, left, right。類似mysql中的連線操作。
df.join(df_2, df.a == df_2.a, 'left').show()
+---+---+---+----+----+----+
| a| b| c| a| b| c|
+---+---+---+----+----+----+
| 1| 2| 3|null|null|null|
| 4| 5| 6| 4| 5| 6|
+---+---+---+----+----+----+
#crossJoin():笛卡爾積cartesian product
df.crossJoin(df).show()
+---+---+---+---+---+---+
| a| b| c| a| b| c|
+---+---+---+---+---+---+
| 1| 2| 3| 1| 2| 3|
| 1| 2| 3| 4| 5| 6|
| 4| 5| 6| 1| 2| 3|
| 4| 5| 6| 4| 5| 6|
+---+---+---+---+---+---+
#foreach(),類似於很多程式語言中foreach(),傳入一個函式,但作用於DataFrame中所有的行
#foreachPartition(), 作用於DataFrame中的partition。
df.foreach(lambda x:print(x))
結果:
Row(a=1, b=2, c=3)
Row(a=4, b=5, c=6)
df.foreachPartition(lambda x : print(x))
結果:
<itertools.chain object at 0x7fe0db8e4518>
內建函式、聚合函式、視窗函式
內建函式:和使用者自定義函式一樣,對單個行進行操作,計算出單個值。如substr、round。
聚合函式:對若干行進行操作,計算出單個值。如:min、max、sum、mean、first、last、avg、count、countDistinct和approxCountDistinct。
視窗函式:對與當前行相關的若干行進行操作。如:rank、denseRank、percentRank、ntile和rowNumber。
動作
collect:返回一個包含該DataFrame中所有行的陣列。
count:對該 DataFrame 中的行數進行計數。
describe:對數值列計算其統計資料。
show:將前n行列印到控制檯。
take:以行的列表的形式返回前n行。
RDD操作
也可以在DataFrame上運用RDD操作,如:map、flatMap、coalesce、repartition、foreach、toJson和toJavaRDD, 這些在DSL中沒有的。
Data Source API
Data Source API提供了一個使用Spark SQL載入和儲存資料的單一介面。通過spark.sql.sources.default配置屬性,設定預設資料來源為parquet,可以根據需要修改。
內建資料來源
包括:text, json, parquet, orc, jdbc和csv。
#讀
df = spark.read.json("xxx.json")#或者像下面這樣寫
df = spark.read.load("xxx.json", format='json')#預設是parquet,如果是parquet,可以不寫format。
#寫
df.write.json("newjson_dir")#或者像下面這樣寫
df.write.format("json").save("newjson_dir")
外部資料來源
外部資料來源在spark packages中可用,不包括在spark中。
spark packages 提供了用於從NoSQL資料庫(HBase、Cassandra等)讀取不同檔案格式和資料的元件包。當你想要在應用程式中包含一個spark元件包時,需要使用–packages命令列選項,加上以逗號分隔的jar包的maven座標(coordinate)列表, 把它包含在驅動程序和執行程序的classpath上。
AVRO
XML
pandas
基於DataFram的Spark-on-HBase聯結器
Hive on Spark
Hive on Spark 是為了使現有Hive使用者能夠直接在Spark執行引擎,而不是在MapReduce上執行Hive查詢而建立的。在Hive或beeline客戶端中將執行引擎更改為spark。
hive>> set hive.execution.engine=spark;#hive命令列