Spark SQL and DataFrame Guide(1.4.1)——之DataFrames

Spark SQL是處理結構化數據的Spark模塊。它提供了DataFrames這樣的編程抽象。同一時候也能夠作為分布式SQL查詢引擎使用。


DataFrame是一個帶有列名的分布式數據集合。等同於一張關系型數據庫中的表或者R/Python中的data frame,只是在底層做了非常多優化;我們能夠使用結構化數據文件、Hive tables,外部數據庫或者RDDS來構造DataFrames。

1. 開始入口:


from pyspark.sql import SQLContext
sqlContext = SQLContext(sc)

還能夠使用HiveContext,它能夠提供比SQLContext很多其它的功能。比如能夠使用更完整的HiveQL解析器寫查詢,使用Hive UDFs。從Hive表中讀取數據等。



from pyspark.sql import SQLContext
sqlContext = SQLContext(sc)

df = sqlContext.read.json("examples/src/main/resources/people.json"
) # Displays the content of the DataFrame to stdout df.show()


hadoop fs -mkdir examples/src/main/resources/
hadoop fs -put /appcom/spark/examples/src/main/resources/*         /user/hdpuser/examples/src/main/resources/


from pyspark.sql import SQLContext
sqlContext = SQLContext(sc)

# Create the DataFrame
df = sqlContext.read.json("examples/src/main/resources/people.json") # Show the content of the DataFrame df.show() ## age name ## null Michael ## 30 Andy ## 19 Justin # Print the schema in a tree format df.printSchema() ## root ## |-- age: long (nullable = true) ## |-- name: string (nullable = true) # Select only the "name" column df.select("name").show() ## name ## Michael ## Andy ## Justin # Select everybody, but increment the age by 1 df.select(df[‘name‘], df[‘age‘] + 1).show() ## name (age + 1) ## Michael null ## Andy 31 ## Justin 20 # Select people older than 21 df.filter(df[‘age‘] > 21).show() ## age name ## 30 Andy # Count people by age df.groupBy("age").count().show() ## age count ## null 1 ## 19 1 ## 30 1


from pyspark.sql import SQLContext
sqlContext = SQLContext(sc)
df = sqlContext.sql("SELECT * FROM table")



  • 利用反射來判斷包括特定類型對象的RDD的schema。這樣的方法會簡化代碼而且在你已經知道schema的時候非常適用。
  • 使用編程接口。構造一個schema並將其應用在已知的RDD上。

Spark SQL能夠將含Row對象的RDD轉換成DataFrame。並判斷數據類型。通過將一個鍵值對(key/value)列表作為kwargs傳給Row類來構造Rows。



# sc is an existing SparkContext.
from pyspark.sql import SQLContext, Row
sqlContext = SQLContext(sc)

# Load a text file and convert each line to a Row.
lines = sc.textFile("examples/src/main/resources/people.txt")
parts = lines.map(lambda l: l.split(","))
people = parts.map(lambda p: Row(name=p[0], age=int(p[1])))

# Infer the schema, and register the DataFrame as a table.
schemaPeople = sqlContext.createDataFrame(people)

# SQL can be run over DataFrames that have been registered as a table.
teenagers = sqlContext.sql("SELECT name FROM people WHERE age >= 13 AND age <= 19")

# The results of SQL queries are RDDs and support all the normal RDD operations.
teenNames = teenagers.map(lambda p: "Name: " + p.name)
for teenName in teenNames.collect():
  print teenName


  1. 從原來的RDD創建一個元祖或列表的RDD。
  2. 用StructType 創建一個和步驟一中創建的RDD中元祖或列表的結構相匹配的Schema。

  3. 通過SQLContext提供的createDataFrame方法將schema 應用到RDD上。

# Import SQLContext and data types
from pyspark.sql import SQLContext
from pyspark.sql.types import *

# sc is an existing SparkContext.
sqlContext = SQLContext(sc)

# Load a text file and convert each line to a tuple.
lines = sc.textFile("examples/src/main/resources/people.txt")
parts = lines.map(lambda l: l.split(","))
people = parts.map(lambda p: (p[0], p[1].strip()))

# The schema is encoded in a string.
schemaString = "name age"

fields = [StructField(field_name, StringType(), True) for field_name in schemaString.split()]
schema = StructType(fields)

# Apply the schema to the RDD.
schemaPeople = sqlContext.createDataFrame(people, schema)

# Register the DataFrame as a table.

# SQL can be run over DataFrames that have been registered as a table.
results = sqlContext.sql("SELECT name FROM people")

# The results of SQL queries are RDDs and support all the normal RDD operations.
names = results.map(lambda p: "Name: " + p.name)
for name in names.collect():
  print name

