Spark SQL and DataFrames
阿新 • • 發佈:2019-01-08
1.SparkSession
SparkSQL的操作都建立在SparkSession上,建立一個SparkSession叫spark,後面程式碼都基於此,不再提示
from pyspark.sql import SparkSession
spark = SparkSession\
.builder\
.appName("PythonSQL")\
.config("spark.some.config.option", "some-value")\
.getOrCreate()
2.建立SparkDataFrames
df = spark.read .json("examples/src/main/resources/people.json")
df.show()
3.DataFrame的基本操作
讀取檔案生成DataFrame df = spark.read.json("examples/src/main/resources/people.json")
檢視內容 df.show()
樹結構打印表結構df.printSchema()
選擇一列df.select("name").show()
選擇兩列,其中一列+1df.select(df['name'],df['age']+1).show()
篩選df.filter(df['age']>21).show()
分組聚合
df.groupBy("age").count().show()
4.由RDD轉換
方式1:Row推斷模式
from pyspark.sql import Row
sc = spark.sparkContext
lines = sc.textFile("examples/src/main/resources/people.txt")
parts = lines.map(lambda l:l.split(','))
people = parts.map(lambda p:Row(name=p[0],age=int(p[1])))
schemaPeople = spark.createSchema(people)
schemaPeople.createOrReplaceTempView('people' )
名為spark
的SparkSession註冊了名為people
的table,可通過spark.sql()
執行對註冊的表的sql語句。
table由RDD轉化來,由Row()
建立列,用createDataFrame()
註冊table,用createOrReplaceTempView()
建立表名。
teenagers = spark.sql('SELECT name FROM people WHERE age >= 13 AND age <= 19')
teenNames = teenagers.map(lambda p:'name:' + p.name)
for teenName in teenNames.collect():
print(teenName)
已經註冊了表的SparkSession執行的sql語句可用RDD的操作。
方式2:StructType指定模式
from pyspark.sql.types import *
sc = spark.sparkContext
lines = sc.textFile("examples/src/main/resources/people.txt")
parts = lines.map(lambda l:l.split(','))
people = parts.map(lambda p:(p[0],p[1].strip()))
schema = StructType().add('name','string',True).add('age','int',True)
schemaPeople = spark.createDataFrame(people, schema)
schemaPeople.createOrReplaceTempView("people")
RDD轉換為DataFrame沒有用Row時未識別模式,通過StructType()
用新增add()方法加入StructField('列名','資料型別',是否允許null)
建立表結構(即模式schema),註冊table時候StructType()物件作為第二個引數以createDataFrame
加入SparkSession。