Spark RDDs vs DataFrames vs SparkSQL

  1. 單條記錄的隨機查詢

  2. aggregation聚合並且sorting後輸出


  1. Using RDD’s

  2. Using DataFrames

  3. Using SparkSQL


  • 在HDFS中3個檔案中儲存的9百萬不同記錄

  • 每條記錄11個欄位
  • 總大小 1.4 GB


  • HDP 2.4

  • Hadoop version 2.7

  • Spark 1.6

  • HDP Sandbox


  • 原始的RDD 比 DataFrames 和 SparkSQL效能要好 

  • DataFrames 和 SparkSQL 效能差不多

  • 使用DataFrames 和 SparkSQL 比 RDD 操作更直觀

  • Jobs都是獨立執行,沒有其他job的干擾


  1. Random lookup against 1 order ID from 9 Million unique order ID's

  2. GROUP all the different products with their total COUNTS and SORT DESCENDING by product name


RDD Random Lookup

#!/usr/bin/env python
from time import time
from pyspark import SparkConf, SparkContext
conf = (SparkConf()
  .set("spark.executor.instances", "10")
  .set("spark.executor.cores", 2)
  .set("spark.dynamicAllocation.enabled", "false")
", "false") .set("spark.executor.memory", "500MB")) sc = SparkContext(conf = conf) t0 = time() path = "/data/customer_orders*" lines = sc.textFile(path) ## filter where the order_id, the second field, is equal to 96922894 print lines.map(lambda line: line.split('|')).filter(lambda line: int(line[1]) == 96922894).collect() tt = str(time() - t0) print "RDD lookup performed in " + tt + " seconds"

DataFrame Random Lookup

#!/usr/bin/env python
from time import time
from pyspark.sql import *
from pyspark import SparkConf, SparkContext
conf = (SparkConf()
  .set("spark.executor.instances", "10")
  .set("spark.executor.cores", 2)
  .set("spark.dynamicAllocation.enabled", "false")
  .set("spark.shuffle.service.enabled", "false")
  .set("spark.executor.memory", "500MB"))
sc = SparkContext(conf = conf)
sqlContext = SQLContext(sc)
t0 = time()
path = "/data/customer_orders*"
lines = sc.textFile(path)
## create data frame
orders_df = sqlContext.createDataFrame( \
lines.map(lambda l: l.split("|")) \
.map(lambda p: Row(cust_id=int(p[0]), order_id=int(p[1]), email_hash=p[2], ssn_hash=p[3], product_id=int(p[4]), product_desc=p[5], \
country=p[6], state=p[7], shipping_carrier=p[8], shipping_type=p[9], shipping_class=p[10]  ) ) )
## filter where the order_id, the second field, is equal to 96922894
orders_df.where(orders_df['order_id'] == 96922894).show()
tt = str(time() - t0)
print "DataFrame performed in " + tt + " seconds"

SparkSQL Random Lookup

#!/usr/bin/env python
from time import time
from pyspark.sql import *
from pyspark import SparkConf, SparkContext
conf = (SparkConf()
  .set("spark.executor.instances", "10")
  .set("spark.executor.cores", 2)
  .set("spark.dynamicAllocation.enabled", "false")
  .set("spark.shuffle.service.enabled", "false")
  .set("spark.executor.memory", "500MB"))
sc = SparkContext(conf = conf)
sqlContext = SQLContext(sc)
t0 = time()
path = "/data/customer_orders*"
lines = sc.textFile(path)
## create data frame
orders_df = sqlContext.createDataFrame( \
lines.map(lambda l: l.split("|")) \
.map(lambda p: Row(cust_id=int(p[0]), order_id=int(p[1]), email_hash=p[2], ssn_hash=p[3], product_id=int(p[4]), product_desc=p[5], \
country=p[6], state=p[7], shipping_carrier=p[8], shipping_type=p[9], shipping_class=p[10]  ) ) )
## register data frame as a temporary table
## filter where the customer_id, the first field, is equal to 96922894
print sqlContext.sql("SELECT * FROM orders where order_id = 96922894").collect()
tt = str(time() - t0)
print "SparkSQL performed in " + tt + " seconds"

RDD with GroupBy, Count, and Sort Descending

#!/usr/bin/env python
from time import time
from pyspark import SparkConf, SparkContext
conf = (SparkConf()
  .set("spark.executor.instances", "10")
  .set("spark.executor.cores", 2)
  .set("spark.dynamicAllocation.enabled", "false")
  .set("spark.shuffle.service.enabled", "false")
  .set("spark.executor.memory", "500MB"))
sc = SparkContext(conf = conf)
t0 = time()
path = "/data/customer_orders*"
lines = sc.textFile(path)
counts = lines.map(lambda line: line.split('|')) \
.map(lambda x: (x[5], 1)) \
.reduceByKey(lambda a, b: a + b) \
.map(lambda x:(x[1],x[0])) \
for x in counts.collect():
  print x[1] + '\t' + str(x[0])
tt = str(time() - t0)
print "RDD GroupBy performed in " + tt + " seconds"

DataFrame with GroupBy, Count, and Sort Descending

#!/usr/bin/env python
from time import time
from pyspark.sql import *
from pyspark import SparkConf, SparkContext
conf = (SparkConf()
  .set("spark.executor.instances", "10")
  .set("spark.executor.cores", 2)
  .set("spark.dynamicAllocation.enabled", "false")
  .set("spark.shuffle.service.enabled", "false")
  .set("spark.executor.memory", "500MB"))
sc = SparkContext(conf = conf)
sqlContext = SQLContext(sc)
t0 = time()
path = "/data/customer_orders*"
lines = sc.textFile(path)
## create data frame
orders_df = sqlContext.createDataFrame( \
lines.map(lambda l: l.split("|")) \
.map(lambda p: Row(cust_id=int(p[0]), order_id=int(p[1]), email_hash=p[2], ssn_hash=p[3], product_id=int(p[4]), product_desc=p[5], \
country=p[6], state=p[7], shipping_carrier=p[8], shipping_type=p[9], shipping_class=p[10]  ) ) )
results = orders_df.groupBy(orders_df['product_desc']).count().sort("count",ascending=False)
for x in results.collect():
  print x
tt = str(time() - t0)
print "DataFrame performed in " + tt + " seconds"

SparkSQL with GroupBy, Count, and Sort Descending

#!/usr/bin/env python
from time import time
from pyspark.sql import *
from pyspark import SparkConf, SparkContext
conf = (SparkConf()
  .set("spark.executor.instances", "10")
  .set("spark.executor.cores", 2)
  .set("spark.dynamicAllocation.enabled", "false")
  .set("spark.shuffle.service.enabled", "false")
  .set("spark.executor.memory", "500MB"))
sc = SparkContext(conf = conf)
sqlContext = SQLContext(sc)
t0 = time()
path = "/data/customer_orders*"
lines = sc.textFile(path)
## create data frame
orders_df = sqlContext.createDataFrame(lines.map(lambda l: l.split("|")) \
.map(lambda r: Row(product=r[5])))
## register data frame as a temporary table
results = sqlContext.sql("SELECT product, count(*) AS total_count FROM orders GROUP BY product ORDER BY total_count DESC")
for x in results.collect():
  print x
tt = str(time() - t0)
print "SparkSQL performed in " + tt + " seconds"



