1. 程式人生 > 其它 >7、Spark SQL

7、Spark SQL

1.請分析SparkSQL出現的原因,並簡述SparkSQL的起源與發展。

spark產生:為了替代Mapreduce,解決Mapreduce計算短板

    隨著Spark的發展,Shark對於Hive的太多依賴(如採用Hive的語法解析器、查詢優化器等等),制約了Spark的One Stack Rule Them All的既定方針,制約了Spark各個元件的相互整合,所以提出了SparkSQL專案。SparkSQL拋棄原有Shark的程式碼,汲取了Shark的一些優點,如記憶體列儲存(In-Memory Columnar Storage)、Hive相容性等,重新開發了SparkSQL程式碼;由於擺脫了對Hive的依賴性,SparkSQL無論在資料相容、效能優化、元件擴充套件方面都得到了極大的方便。

2. 簡述RDD 和DataFrame的聯絡與區別?

      在Spark中,DataFrame是一種以RDD為基礎的分散式資料集,因此DataFrame可以完成RDD的絕大多數功能,在開發使用時,也可以呼叫方法將RDD和DataFrame進行相互轉換。DataFrame的結構類似於傳統資料庫的二維表格,並且可以從很多資料來源中建立,例如結構化檔案、外部資料庫、Hive表等資料來源。

     總的來說,DataFrame除了提供比RDD更豐富的運算元以外,更重要的特點是提升Spark框架執行效率、減少資料讀取時間以及優化執行計劃。有了DataFrame這個更高層次的抽象後,處理資料就更加簡單了,甚至可以直接用SQL來處理資料,這對於開發者來說,易用性有了很大的提升。不僅如此,通過DataFrame API或SQL處理資料,Spark 優化器(Catalyst)會自動優化,即使我們寫的程式或SQL不高效,程式也可以高效的執行。

3.DataFrame的建立

spark.read.text(url)

spark.read.json(url)

spark.read.format("text").load("people.txt")

spark.read.format("json").load("people.json")

 4. PySpark-DataFrame各種常用操作

列印資料 df.show()預設列印前20條資料

列印概要 df.printSchema()

查詢總行數 df.count()

df.head(3) #list型別,list中每個元素是Row類

輸出全部行 df.collect() #list型別,list中每個元素是Row類

查詢概況 df.describe().show()

取列 df[‘name’] df.name

基於spark.sql的操作:

建立臨時表虛擬表 df.registerTempTable('people')

spark.sql執行SQL語句 spark.sql('select name from people').show()

5. Pyspark中DataFrame與pandas中DataFrame

分別從檔案建立DataFrame

比較兩者的異同

Pyspark的DataFrame 是基於 RDD 的一種資料型別,具有比 RDD 節省空間和更高運算效率的優點。pandas的DataFrame 是一種表格型資料結構,按照列結構儲存,它含有一組有序的列,每列可以是不同的值,但每一列只能有一種資料型別。

pandas中DataFrame 是可變的,pyspark中RDDs 是不可變的,因此 DataFrame 也是不可變的

pandas中所以是自動建立的,pyspark中沒有 index 索引,若需要需要額外建立該列

pandas的是Series 結構,屬於 Pandas DataFrame 結構,pyspark的是Row 結構,屬於 Spark DataFrame 結構

pandas中DataFrame轉換為Pyspark中DataFrame

Pyspark中DataFrame轉換為pandas中DataFrame

6.從RDD轉換得到DataFrame

6.1 利用反射機制推斷RDD模式

建立RDD sc.textFile(url).map(),讀檔案,分割資料項

每個RDD元素轉換成 Row

由Row-RDD轉換到DataFrame

6.2 使用程式設計方式定義RDD模式

#下面生成“表頭” 

#下面生成“表中的記錄” 

#下面把“表頭”和“表中的記錄”拼裝在一起

7. DataFrame的儲存

df.write.text(dir)

df.write.json(dri)

df.write.format("text").save(dir)

df.write.format("json").save(dir)