Spark SQL and DataFrame Guide(1.4.1)——之DataFrames
Spark SQL是處理結構化數據的Spark模塊。它提供了DataFrames這樣的編程抽象。同一時候也能夠作為分布式SQL查詢引擎使用。
DataFrames
DataFrame是一個帶有列名的分布式數據集合。等同於一張關系型數據庫中的表或者R/Python中的data frame,只是在底層做了非常多優化;我們能夠使用結構化數據文件、Hive tables,外部數據庫或者RDDS來構造DataFrames。
1. 開始入口:
入口須要從SQLContext類或者它的子類開始,當然須要使用SparkContext創建SQLContext;這裏我們使用pyspark(已經自帶了SQLContext即sc):
from pyspark.sql import SQLContext
sqlContext = SQLContext(sc)
還能夠使用HiveContext,它能夠提供比SQLContext很多其它的功能。比如能夠使用更完整的HiveQL解析器寫查詢,使用Hive UDFs。從Hive表中讀取數據等。
使用HiveContext並不須要安裝hive,Spark默認將HiveContext單獨打包避免對hive過多的依賴
2.創建DataFrames
使用JSON文件創建:
from pyspark.sql import SQLContext
sqlContext = SQLContext(sc)
df = sqlContext.read.json("examples/src/main/resources/people.json" )
# Displays the content of the DataFrame to stdout
df.show()
註意:
這裏你可能須要將文件存入HDFS(這裏的文件在Spark安裝文件夾中,1.4版本號)
hadoop fs -mkdir examples/src/main/resources/
hadoop fs -put /appcom/spark/examples/src/main/resources/* /user/hdpuser/examples/src/main/resources/
3.DataFrame操作
from pyspark.sql import SQLContext
sqlContext = SQLContext(sc)
# Create the DataFrame
df = sqlContext.read.json("examples/src/main/resources/people.json")
# Show the content of the DataFrame
df.show()
## age name
## null Michael
## 30 Andy
## 19 Justin
# Print the schema in a tree format
df.printSchema()
## root
## |-- age: long (nullable = true)
## |-- name: string (nullable = true)
# Select only the "name" column
df.select("name").show()
## name
## Michael
## Andy
## Justin
# Select everybody, but increment the age by 1
df.select(df[‘name‘], df[‘age‘] + 1).show()
## name (age + 1)
## Michael null
## Andy 31
## Justin 20
# Select people older than 21
df.filter(df[‘age‘] > 21).show()
## age name
## 30 Andy
# Count people by age
df.groupBy("age").count().show()
## age count
## null 1
## 19 1
## 30 1
4.使用編程執行SQL查詢
SQLContext能夠使用編程執行SQL查詢並返回DataFrame。
from pyspark.sql import SQLContext
sqlContext = SQLContext(sc)
df = sqlContext.sql("SELECT * FROM table")
5.和RDD交互
將RDD轉換成DataFrames有兩種方法:
- 利用反射來判斷包括特定類型對象的RDD的schema。這樣的方法會簡化代碼而且在你已經知道schema的時候非常適用。
- 使用編程接口。構造一個schema並將其應用在已知的RDD上。
一、利用反射判斷Schema
Spark SQL能夠將含Row對象的RDD轉換成DataFrame。並判斷數據類型。通過將一個鍵值對(key/value)列表作為kwargs傳給Row類來構造Rows。
key定義了表的列名,類型通過看第一列數據來判斷。
(所以這裏RDD的第一列數據不能有缺失)未來版本號中將會通過看很多其它數據來判斷數據類型。像如今對JSON文件的處理一樣。
# sc is an existing SparkContext.
from pyspark.sql import SQLContext, Row
sqlContext = SQLContext(sc)
# Load a text file and convert each line to a Row.
lines = sc.textFile("examples/src/main/resources/people.txt")
parts = lines.map(lambda l: l.split(","))
people = parts.map(lambda p: Row(name=p[0], age=int(p[1])))
# Infer the schema, and register the DataFrame as a table.
schemaPeople = sqlContext.createDataFrame(people)
schemaPeople.registerTempTable("people")
# SQL can be run over DataFrames that have been registered as a table.
teenagers = sqlContext.sql("SELECT name FROM people WHERE age >= 13 AND age <= 19")
# The results of SQL queries are RDDs and support all the normal RDD operations.
teenNames = teenagers.map(lambda p: "Name: " + p.name)
for teenName in teenNames.collect():
print teenName
二、編程指定Schema
通過編程指定Schema須要3步:
- 從原來的RDD創建一個元祖或列表的RDD。
- 用StructType 創建一個和步驟一中創建的RDD中元祖或列表的結構相匹配的Schema。
- 通過SQLContext提供的createDataFrame方法將schema 應用到RDD上。
# Import SQLContext and data types
from pyspark.sql import SQLContext
from pyspark.sql.types import *
# sc is an existing SparkContext.
sqlContext = SQLContext(sc)
# Load a text file and convert each line to a tuple.
lines = sc.textFile("examples/src/main/resources/people.txt")
parts = lines.map(lambda l: l.split(","))
people = parts.map(lambda p: (p[0], p[1].strip()))
# The schema is encoded in a string.
schemaString = "name age"
fields = [StructField(field_name, StringType(), True) for field_name in schemaString.split()]
schema = StructType(fields)
# Apply the schema to the RDD.
schemaPeople = sqlContext.createDataFrame(people, schema)
# Register the DataFrame as a table.
schemaPeople.registerTempTable("people")
# SQL can be run over DataFrames that have been registered as a table.
results = sqlContext.sql("SELECT name FROM people")
# The results of SQL queries are RDDs and support all the normal RDD operations.
names = results.map(lambda p: "Name: " + p.name)
for name in names.collect():
print name
Spark SQL and DataFrame Guide(1.4.1)——之DataFrames