1. 程式人生 > 其它 >spark dataframe和spark sql

spark dataframe和spark sql

三、掌握對spark dataframe和spark sql的認識和使用(包括建立、各種常用操作,具體到程式碼的編寫使用);

1、DataFrame介紹

在Spark中,Spark DataFrame和Spark SQL是SparkRDD高層次的封裝,Spark DataFrame以RDD為基礎,是一種與傳統資料庫中的二維表格相類似的分散式資料集。

DataFrame與RDD的主要區別:前者包含每一列的名稱和型別

2、建立Spark DataFrame

(1)通過本地資料結構(list或tuple)建立Spark DataFrame

from pyspark import SparkContext
from pyspark import SQLContext
# 例項化一個SparkContext物件sc(如果是pyspark命令列,預設有一個sc物件,無需例項化sc)
sc = SparkContext()
# 例項化一個SQLContext物件sqlContext ,需要將sc作為引數傳入(如果是pyspark命令列,預設有一個sqlContext物件,無需例項化sqlContext )
sqlContext = SQLContext(sc)
a_list = [("Jack", 20, "male"), ("Rose", 18, "female"), ("Tom", 19, "male")]
df = sqlContext. createDataFrame (a_list, [“name”, “age”, “gender”]) # 第一個引數為序列物件,第二個引數指定列名
df.show()  # show方法展示當前Spark DataFrame的資料

(2)通過RDD建立Spark DataFrame

from pyspark import SparkContext
from pyspark import SQLContext
sc = SparkContext()
sqlContext = SQLContext(sc)
a_list = [("Jack", 20, "male"), ("Rose", 18, "female"), ("Tom", 19, "male")]
rdd = sc.parallelize(a_list)
df = sqlContext.createDataFrame(rdd, ["name", "age", "gender"])  # 第一個引數為RDD物件,第二個引數指定列名
df.show()  # show方法展示當前Spark DataFrame的資料

(3)通過Pandas DataFrame建立Spark DataFrame

from pyspark import SparkContext
from pyspark import SQLContext
import pandas as pd
pd_df = pd.read_csv("student_info.csv")
sc = SparkContext()
sqlContext = SQLContext(sc)
df = sqlContext.createDataFrame(pd_df)  # pandas dataframe已經是行列結構的資料,自帶列名,不需要再設定列名,第二個引數若傳則有,不傳則預設
df.show()  # show方法展示當前Spark DataFrame的資料

注意: createDataFrame方法的引數,請參考上一頁PPT。第二個引數不傳的話,如果該資料來源有列名,就會用該列名,否則使用spark的預設列名。如果傳第二個引數,則不管資料來源中有沒有列名,都以傳入的引數為列名。

(4)通過csv檔案建立Spark DataFrame

sc = SparkContext()
sqlContext = SQLContext(sc)
df_reader = sqlContext.read  # sqlContext的read屬性可以獲取一個DataFrameReader物件
df = df_reader.schema("name String, age Int, gender String").csv(path="student_info.csv")  # 路徑可以是hdfs路徑
df.show()

注意:上述程式中對csv檔案的格式是有要求的,要求表頭不帶列名——因為程式中的紅色部分已經宣告表頭了,csv檔案中再有表頭,會造成第一行全是null值

(5)通過txt檔案建立Spark DataFrame

sc = SparkContext()
sqlContext = SQLContext(sc)
df_reader = sqlContext.read  # sqlContext的read屬性可以獲取一個DataFrameReader物件
df = df_reader.schema(“name String”).text(paths=“names.txt”) # paths可以是一個檔案,也可以是一個資料夾
df.show()

注意:txt檔案的格式是有要求的,要求每一行只能是一個欄位的資料,並且該欄位的型別只能為String型別。

(6)通過json檔案建立Spark DataFrame

sc = SparkContext()
sqlContext = SQLContext(sc)
df_reader = sqlContext.read  # sqlContext的read屬性可以獲取一個DataFrameReader物件
df = df_reader.json(path="student_info.json")
df.show()

(7)通過parquet檔案建立Spark DataFrame

sc = SparkContext()
sqlContext = SQLContext(sc)
df_reader = sqlContext.read  # sqlContext的read屬性可以獲取一個DataFrameReader物件
df = df_reader. parquet(paths=“user_info”)  #  注意這是一個資料夾,裡面有多個parquet檔案
df.show()

(8)通過Hive資料表建立Spark DataFrame

sc = SparkContext()
sqlContext = SQLContext(sc)
df = sqlContext.table(“test_table")
df.show()

(9)通過mysql資料庫表建立Spark DataFrame

sc = SparkContext()
sqlContext = SQLContext(sc)
df_reader = sqlContext.read  # sqlContext的read屬性可以獲取一個DataFrameReader物件
# url設定資料庫地址
# table設定要查詢的資料庫表
# properties 設定連線引數,是一個字典
df = df_reader.jdbc(url="jdbc:mysql://127.0.0.1:3306/student", table="student_info",
                    properties=dict(user="root", password="root", driver="com.mysql.jdbc.Driver"))
df.show(10)

3、DataFrame的常用操作(函式/方法)

4、Spark SQL操作示例

4、Spark SQL與DataFrame的區別與聯絡

DataFrame是一個分散式的資料集合,它按行組織,每行包含一組列,每列都有一個名稱和一個關聯的型別。換句話說,這個分散式資料集合具有由Schema定義的結構。你可以將它視為關係資料庫中的表,但在底層,它具有更豐富的優化。
Spark SQL 是由DataFrame派生出來的,使用Spark SQL 之前必須先建立DataFrame,再註冊臨時表,然後才能使用Spark SQL。
Spark DataFrame是Spark的核心基礎類, Spark SQL 是對DataFrame的高階封裝。
Spark SQL相比於Spark DataFrame更靈活易用。