spark dataframe和spark sql
阿新 • • 發佈:2022-04-08
三、掌握對spark dataframe和spark sql的認識和使用(包括建立、各種常用操作,具體到程式碼的編寫使用);
1、DataFrame介紹
在Spark中,Spark DataFrame和Spark SQL是SparkRDD高層次的封裝,Spark DataFrame以RDD為基礎,是一種與傳統資料庫中的二維表格相類似的分散式資料集。
DataFrame與RDD的主要區別:前者包含每一列的名稱和型別
2、建立Spark DataFrame
(1)通過本地資料結構(list或tuple)建立Spark DataFrame
from pyspark import SparkContext from pyspark import SQLContext # 例項化一個SparkContext物件sc(如果是pyspark命令列,預設有一個sc物件,無需例項化sc) sc = SparkContext() # 例項化一個SQLContext物件sqlContext ,需要將sc作為引數傳入(如果是pyspark命令列,預設有一個sqlContext物件,無需例項化sqlContext ) sqlContext = SQLContext(sc) a_list = [("Jack", 20, "male"), ("Rose", 18, "female"), ("Tom", 19, "male")] df = sqlContext. createDataFrame (a_list, [“name”, “age”, “gender”]) # 第一個引數為序列物件,第二個引數指定列名 df.show() # show方法展示當前Spark DataFrame的資料
(2)通過RDD建立Spark DataFrame
from pyspark import SparkContext from pyspark import SQLContext sc = SparkContext() sqlContext = SQLContext(sc) a_list = [("Jack", 20, "male"), ("Rose", 18, "female"), ("Tom", 19, "male")] rdd = sc.parallelize(a_list) df = sqlContext.createDataFrame(rdd, ["name", "age", "gender"]) # 第一個引數為RDD物件,第二個引數指定列名 df.show() # show方法展示當前Spark DataFrame的資料
(3)通過Pandas DataFrame建立Spark DataFrame
from pyspark import SparkContext from pyspark import SQLContext import pandas as pd pd_df = pd.read_csv("student_info.csv") sc = SparkContext() sqlContext = SQLContext(sc) df = sqlContext.createDataFrame(pd_df) # pandas dataframe已經是行列結構的資料,自帶列名,不需要再設定列名,第二個引數若傳則有,不傳則預設 df.show() # show方法展示當前Spark DataFrame的資料
注意: createDataFrame方法的引數,請參考上一頁PPT。第二個引數不傳的話,如果該資料來源有列名,就會用該列名,否則使用spark的預設列名。如果傳第二個引數,則不管資料來源中有沒有列名,都以傳入的引數為列名。
(4)通過csv檔案建立Spark DataFrame
sc = SparkContext()
sqlContext = SQLContext(sc)
df_reader = sqlContext.read # sqlContext的read屬性可以獲取一個DataFrameReader物件
df = df_reader.schema("name String, age Int, gender String").csv(path="student_info.csv") # 路徑可以是hdfs路徑
df.show()
注意:上述程式中對csv檔案的格式是有要求的,要求表頭不帶列名——因為程式中的紅色部分已經宣告表頭了,csv檔案中再有表頭,會造成第一行全是null值
(5)通過txt檔案建立Spark DataFrame
sc = SparkContext()
sqlContext = SQLContext(sc)
df_reader = sqlContext.read # sqlContext的read屬性可以獲取一個DataFrameReader物件
df = df_reader.schema(“name String”).text(paths=“names.txt”) # paths可以是一個檔案,也可以是一個資料夾
df.show()
注意:txt檔案的格式是有要求的,要求每一行只能是一個欄位的資料,並且該欄位的型別只能為String型別。
(6)通過json檔案建立Spark DataFrame
sc = SparkContext()
sqlContext = SQLContext(sc)
df_reader = sqlContext.read # sqlContext的read屬性可以獲取一個DataFrameReader物件
df = df_reader.json(path="student_info.json")
df.show()
(7)通過parquet檔案建立Spark DataFrame
sc = SparkContext()
sqlContext = SQLContext(sc)
df_reader = sqlContext.read # sqlContext的read屬性可以獲取一個DataFrameReader物件
df = df_reader. parquet(paths=“user_info”) # 注意這是一個資料夾,裡面有多個parquet檔案
df.show()
(8)通過Hive資料表建立Spark DataFrame
sc = SparkContext()
sqlContext = SQLContext(sc)
df = sqlContext.table(“test_table")
df.show()
(9)通過mysql資料庫表建立Spark DataFrame
sc = SparkContext()
sqlContext = SQLContext(sc)
df_reader = sqlContext.read # sqlContext的read屬性可以獲取一個DataFrameReader物件
# url設定資料庫地址
# table設定要查詢的資料庫表
# properties 設定連線引數,是一個字典
df = df_reader.jdbc(url="jdbc:mysql://127.0.0.1:3306/student", table="student_info",
properties=dict(user="root", password="root", driver="com.mysql.jdbc.Driver"))
df.show(10)
3、DataFrame的常用操作(函式/方法)
4、Spark SQL操作示例
4、Spark SQL與DataFrame的區別與聯絡
DataFrame是一個分散式的資料集合,它按行組織,每行包含一組列,每列都有一個名稱和一個關聯的型別。換句話說,這個分散式資料集合具有由Schema定義的結構。你可以將它視為關係資料庫中的表,但在底層,它具有更豐富的優化。
Spark SQL 是由DataFrame派生出來的,使用Spark SQL 之前必須先建立DataFrame,再註冊臨時表,然後才能使用Spark SQL。
Spark DataFrame是Spark的核心基礎類, Spark SQL 是對DataFrame的高階封裝。
Spark SQL相比於Spark DataFrame更靈活易用。