pyspark cookbook 常用操作
阿新 • • 發佈:2018-12-21
Access SparkSession
from pyspark.sql import SparkSession
# get the default SparkSession instance
spark = SparkSession.builder.getOrCreate()
sc = spark.sparkContext
sc.setLogLevel('INFO')
Access custom configurations
$ spark-submit \ --master spark://localhost:7077 \ --properties-file my-properties.conf \ your_spark_app.py # in my-properties.conf # spark.myapp.db_host 192.168.100.10
from pyspark.sql import SparkSession
spark = SparkSession.builder.getOrCreate()
sc = spark.sparkContext
print(sc.getConf().get('spark.myapp.db_host'))
Create a RDD
array = [ 1, 5, 7, 2, 3, ] rdd = sc.parallelize(array) df = rdd.toDF('int') # +-----+ # |value| # +-----+ # | 1| # | 5| # | 7| # | 2| # | 3| # +-----+
Read data from MySQL
$ pyspark --packages "mysql:mysql-connector-java:5.1.41"
url = 'jdbc:mysql://127.0.0.1:3306/albedo'
properties = {'user': 'root', 'password': '123'}
spark.read.jdbc(url, 'app_repostarring', properties=properties)
Write data to MySQL
CREATE TABLE `recommender_songrecommend` ( `id` int(11) NOT NULL AUTO_INCREMENT, `user_id` int(11) NOT NULL, `song_id` int(11) NOT NULL, `score` double NOT NULL, PRIMARY KEY (`id`) ) ENGINE=InnoDB AUTO_INCREMENT=1;
matrix = [
(1, 2, 0.1),
(3, 4, 0.23456),
(5, 6, 7.89),
(7, 8, -10.111213),
]
df = spark.createDataFrame(matrix, ['user_id','song_id', 'score'])
url = 'jdbc:mysql://192.168.11.200:3306/DATABASE?user=USER&password=PASSWORD&verifyServerCertificate=false&useSSL=false&rewriteBatchedStatements=true'
properties = {'driver': 'com.mysql.jdbc.Driver'}
df \
.selectExpr('user AS user_id', 'item AS song_id', 'score') \
.write.jdbc(url, table='recommender_songrecommend', mode='append', properties=properties)
Read data from SQLite
$ pyspark --packages "org.xerial:sqlite-jdbc:3.16.1"
from pyspark.sql.functions import lit
from pyspark.sql.types import IntegerType, StructField, StructType, TimestampType
props = {'driver': 'org.sqlite.JDBC', 'date_string_format': 'yyyy-MM-dd HH:mm:ss'}
df = spark.read.jdbc("jdbc:sqlite:db.sqlite3", "app_repostarring", properties=props)
df = df.where(df.stargazers_count >= min_stargazers_count)
df = df.select('from_user_id', 'repo_id', 'created_at')
df = df.toDF('user', 'item')
df = df.withColumn('rating', lit(1))
schema = StructType([
StructField('user', IntegerType(), nullable=False),
StructField('item', IntegerType(), nullable=False),
StructField('rating', IntegerType(), nullable=False),
StructField('item_created_at', TimestampType(), nullable=False),
])
df = spark.createDataFrame(df.rdd, schema)
Read data from parquet
from pyspark.sql.utils import AnalysisException
raw_df_filepath = 'raw_df.parquet'
try:
raw_df = spark.read.format('parquet').load(raw_df_filepath)
except AnalysisException as exc:
if 'Path does not exist' in exc.desc:
raw_df = load_raw_data()
raw_df.write.format('parquet').save(raw_df_filepath)
else:
raise exc
Create a DataFrame
matrix = [
(1, 1, 1),
(1, 2, 1),
(2, 1, 0),
(3, 1, 1),
(3, 3, 1),
(3, 4, 1),
(4, 1, 0),
(4, 2, 0),
(5, 9, 1),
(5, 5, 0),
]
df = spark.createDataFrame(matrix, ['user', 'item', 'rating'])
Create a DataFrame with explicit schema
from pyspark.sql.types import *
matrix = [
('Alice', 0.5, 5.0),
('Bob', 0.2, 92.0),
('Tom', 0.0, 122.0),
('Zack', 0.1, 1.0),
]
schema = StructType([
StructField('name', StringType(), nullable=False),
StructField('prediction', DoubleType(), nullable=False),
StructField('rating', DoubleType(), nullable=False)
])
df = spark.createDataFrame(matrix, schema)
df.printSchema()
new_df = spark.createDataFrame(someRDD, df.schema)
Create a nested schema
from pyspark.sql.types import ArrayType
from pyspark.sql.types import FloatType
from pyspark.sql.types import IntegerType
from pyspark.sql.types import StructField
from pyspark.sql.types import StructType
recommendation_schema = StructType([
StructField('item', IntegerType(), nullable=False),
StructField('rating', FloatType(), nullable=False),
])
user_recommendations_schema = StructType([
StructField('user', IntegerType(), nullable=False),
StructField('recommendations', ArrayType(recommendation_schema), nullable=False),
])
matrix = [
(1, [[100, 1.0], [200, 2.0]]),
(2, [[300, 3.0]]),
]
df = spark.createDataFrame(matrix, user_recommendations_schema)
df.printSchema()
# root
# |-- user: integer (nullable = false)
# |-- recommendations: array (nullable = false)
# | |-- element: struct (containsNull = true)
# | | |-- item: integer (nullable = false)
# | | |-- rating: float (nullable = false)
# |-- recommendations_count: integer (nullable = false)
df.show()
# +----+--------------------+---------------------+
# |user| recommendations|recommendations_count|
# +----+--------------------+---------------------+
# | 1|[[100,1.0], [200,...| 2|
# | 2| [[300,3.0]]| 1|
# +----+--------------------+---------------------+
Change schema of a DataFrame
from pyspark.sql.types import DoubleType, IntegerType, StructField, StructType
schema = StructType([
StructField('user', IntegerType(), nullable=False),
StructField('item', IntegerType(), nullable=False),
StructField('rating', DoubleType(), nullable=False),
])
df = spark.createDataFrame(df.rdd, schema)
Get numbers of partitions
df.rdd.getNumPartitions()
Split a DataFrame into chunks (partitions)
def write_to_db(partial_rows):
values_tuples = [(row.user, row.item, row.score, year, week) for row in chunk_rows]
result = your_mysql_insert_func(values_tuples)
return [result, ]
save_result = df \
.repartition(400) \
.rdd \
.mapPartitions(write_to_db) \
.collect()
Show a DataFrame
# print the schema in a tree format
df.printSchema()
# print top 20 rows
df.show()
# print top 100 rows
df.show(100)
# calculate the descriptive statistics
df.describe().show()
df.describe(['like_count', ]).show()
Create a column with a literal value
from pyspark.sql.functions import lit
df = df.withColumn('like', lit(1))
Return a fraction of a DataFrame
fraction = {
'u1': 0.5,
'u2': 0.5,
'u3': 0.5,
'u4': 0.5,
'u5': 0.5,
}
df.sampleBy('user', fraction).show()
# +----+----+----------+
# |user|item|prediction|
# +----+----+----------+
# | u1| i1| 1|
# | u3| i4| 4|
# | u4| i1| 1|
# | u4| i2| 3|
# | u5| i5| 5|
# +----+----+----------+
Show distinct values of a column
df.select('user').distinct().show()
Rename columns
df.printSchema()
# root
# |-- from_user_id: integer (nullable = true)
# |-- repo_id: integer (nullable = true)
df = df.toDF('user', 'item')
# or
df = df.withColumnRenamed('from_user_id', 'user').withColumnRenamed('repo_id', 'item')
Convert a column to double type
df = df.withColumn('prediction', df['prediction'].cast('double'))
Update a colume based on conditions
from pyspark.sql.functions import when
df = df.withColumn('label', when(df['prediction'] > 0.5, 1).otherwise(0))
Drop columns from a DataFrame
predictions = predictions.dropna(subset=['prediction', S])
DataFrame subtract another DataFrame
matrix1 = [
(1, 2, 123),
(3, 4, 1),
(5, 6, 45),
(7, 8, 3424),
]
df1 = spark.createDataFrame(matrix1, ['user','item', 'play_count'])
matrix2 = [
(3, 4),
(5, 6),
(9, 1),
(7, 8),
(1, 6),
(1, 1),
]
df2 = spark.createDataFrame(matrix2, ['user','item'])
df2.subtract(df1.select('user', 'item')).show()
# +----+----+
# |user|item|
# +----+----+
# | 1| 1|
# | 1| 6|
# | 9| 1|
# +----+----+
# or
testing_rdd = df.rdd.subtract(training.rdd)
testing = spark.createDataFrame(testing_rdd, df.schema)
Convert a DataFrame column into a Python list
# list
popluar_items = [row['item'] for row in popular_df.select('item').collect()]
# set
all_items = {row.id for row in als_model.itemFactors.select('id').collect()}
Concatenate (merge) two DataFrames
full_df = df1.union(df2)
Convert a DataFrame to a Python dict
d = df.rdd.collectAsMap()
d['some_key']
Compute (approximate or exact) median of a numerical column
approximate_median = df.approxQuantile('count', [0.5, ], 0.25)
exact_median = df.approxQuantile('count', [0.5, ], 0.0)
maximum = df.approxQuantile('count', [1.0, ], 0.1)
minimum = df.approxQuantile('count', [0.0, ], 0.1)
Find frequent items for columns
df = rating_df.freqItems(['item', ], support=0.01)
# +--------------------+
# | item_freqItems|
# +--------------------+
# |[194512, 371798, ...|
# +--------------------+
# get the value of a DataFrame column
popular_items = df.collect()[0].item_freqItems
Broadcast a value
bc_candidates = sc.broadcast(set([1, 2, 4, 5, 8]))
print(bc_candidates.value)
# {8, 1, 2, 4, 5}
bc_df = sc.broadcast(df.collect())
df = spark.createDataFrame(bc_df.value)
Broadcast a DataFrame in join
import pyspark.sql.functions as F
large_df.join(F.broadcast(small_df), 'some_key')
Cache a DataFrame
df.cache()
Show query execution plan
df.explain(extended=True)
Use SQL to query a DataFrame
props = {'driver': 'org.sqlite.JDBC', 'date_string_format': 'yyyy-MM-dd HH:mm:ss'}
df = spark.read.jdbc("jdbc:sqlite:db.sqlite3", "app_repostarring", properties=props)
df.createOrReplaceTempView('repo_stars')
query = 'SELECT DISTINCT repo_id AS item FROM repo_stars WHERE stargazers_count > 1000'
df2 = spark.sql(query)
df2.show()
query = """
SELECT
from_user_id AS user,
count(repo_id) AS count
FROM repo_stars
GROUP BY from_user_id
ORDER BY count DESC
"""
df = spark.sql(query)
params = {'top_n': top_n}
query = """
SELECT
repo_id AS item,
MAX(stargazers_count) AS stars
FROM repo_stars
GROUP BY repo_id
ORDER BY stars DESC
LIMIT {top_n}
""".format(**params)
popular_df = spark.sql(query)
WHERE ... IN ...
from pyspark.sql.functions import col
item_ids = [1, 2, 5, 8]
raw_df \
.where(col('repo_id').isin(item_ids)) \
.select('repo_url') \
.collectAsMap()
ORDER BY multiple columns
import pyspark.sql.functions as F
rating_df = raw_df \
.selectExpr('from_user_id AS user', 'repo_id AS item', '1 AS rating', 'starred_at') \
.orderBy('user', F.col('starred_at').desc())
Aggregate
df.agg(min('user'), max('user'), min('item'), max('item')).show()
max_value = user_star_count_df.agg(F.max('stars')).collect()[0]["max('stars')"]
SELECT COUNT(DISTINCT xxx) ...
import pyspark.sql.functions as F
matrix = [
(1, 1, 1),
(1, 2, 1),
(2, 1, 0),
(3, 1, 1),
(3, 3, 1),
(3, 4, 1),
(4, 1, 0),
(4, 2, 0),
(5, 9, 1),
(5, 5, 0),
]
df = spark.createDataFrame(matrix, ['user', 'item', 'rating'])
df.agg(F.countDistinct('user')).show()
# +--------------------+
# |count(DISTINCT user)|
# +--------------------+
# | 5|
# +--------------------+
SELECT MAX(xxx) ... GROUP BY
df.groupBy('user').count().filter('count >= 4').show()
popular_df = raw_df \
.groupBy('repo_id') \
.agg(F.max('stargazers_count').alias('stars')) \
.orderBy('stars', ascending=False) \
.limit(top_n)
SELECT COUNT() ... GROUP BY
prediction_df.groupBy('user').count().show()
# +------+-----+
# | user|count|
# +------+-----+
# |649661| 15|
# |464340| 15|
# |468780| 15|
# |489233| 11|
# |455143| 14|
# +------+-----+
stargazers_count_df = rating_df \
.groupBy('item') \
.agg(F.count('user').alias('stargazers_count')) \
.orderBy('stargazers_count', ascending=False)
# +--------+----------------+
# | item|stargazers_count|
# +--------+----------------+
# | 3544424| 137|
# | 2126244| 120|
# | 7691631| 115|
# | 1362490| 112|
# | 943149| 112|
# +--------+----------------+
starred_count_df = rating_df \
.groupBy('user') \
.agg(F.count('item').alias('starred_count')) \
.orderBy('starred_count', ascending=False)
# +-------+-------------+
# | user|starred_count|
# +-------+-------------+
# | 48936| 7201|
# |4560482| 5898|
# |3382565| 3978|
# | 10652| 3586|
# | 31996| 3459|
# +-------+-------------+
You may want to use approx_count_distinct
.
GROUP_CONCAT a column
from pyspark.sql.functions import expr
per_user_predictions_df = output_df \
.orderBy(['user', 'prediction'], ascending=False) \
.groupBy('user') \
.agg(expr('collect_list(item) as items'))
# +--------+--------------------+
# | user| items|
# +--------+--------------------+
# | 2142|[36560369, 197450...|
# | 47217|[40693501, 643554...|
# +--------+--------------------+
GROUP_CONCAT multiple columns
from pyspark.sql.functions import col, collect_list, struct
matrix = [
(1, 1, 0.1),
(1, 2, 5.1),
(1, 6, 0.0),
(2, 6, 9.3),
(3, 1, 0.54),
(3, 5, 0.83),
(4, 1, 0.65),
(4, 4, 1.023),
]
df = spark.createDataFrame(matrix, ['user', 'item', 'prediction'])
df \
.groupBy("user") \
.agg(collect_list(struct(col('item'), col('prediction'))).alias("recommendations")) \
.show(truncate=False)
# +----+---------------------------+
# |user|recommendations |
# +----+---------------------------+
# |1 |[[1,0.1], [2,5.1], [6,0.0]]|
# |3 |[[1,0.54], [5,0.83]] |
# |2 |[[6,9.3]] |
# |4 |[[1,0.65], [4,1.023]] |
# +----+---------------------------+
SELECT ... RANK() OVER (PARTITION BY ... ORDER BY)
from pyspark.sql import Window
from pyspark.sql.functions import col
from pyspark.sql.functions import expr
import pyspark.sql.functions as F
window_spec = Window.partitionBy('from_user_id').orderBy(col('starred_at').desc())
per_user_actual_items_df = raw_df \
.select('from_user_id', 'repo_id', 'starred_at', F.rank().over(window_spec).alias('rank')) \
.where('rank <= 10') \
.groupBy('from_user_id') \
.agg(expr('collect_list(repo_id) as items')) \
.withColumnRenamed('from_user_id', 'user')
# +--------+--------------------+
# | user| items|
# +--------+--------------------+
# | 2142|[29122050, 634846...|
# | 59990|[9820191, 8729416...|
# +--------+--------------------+
window_spec = Window.partitionBy('user').orderBy(col('prediction').desc())
per_user_predicted_items_df = output_df \
.select('user', 'item', 'prediction', F.rank().over(window_spec).alias('rank')) \
.where('rank <= 10') \
.groupBy('user') \
.agg(expr('collect_list(item) as items'))
# +--------+--------------------+
# | user| items|
# +--------+--------------------+
# | 2142|[36560369, 197450...|
# | 47217|[40693501, 643554...|
# +--------+--------------------+
Left anti join / Left excluding join
clean_df = rating_df.join(to_remove_items, 'item', 'left_anti')
Outer join
m1 = [
(1, 1, 1),
(1, 2, 1),
(1, 4, 1),
(2, 2, 1),
(2, 3, 1),
(3, 5, 1),
]
m1df = spark.createDataFrame(m1, ['user', 'item', 'rating'])
m1df = m1df.where('user = 1').alias('m1df')
m2 = [
(1, 100),
(2, 200),
(3, 300),
(4, 400),
(5, 500),
(6, 600),
]
m2df = spark.createDataFrame(m2, ['item', 'count'])
m2df = m2df.alias('m2df')
m1df.join(m2df, m1df.item == m2df.item, 'rightouter') \
.where('m1df.user IS NULL') \
.orderBy('m2df.count', ascending=False) \
.selectExpr('1 AS user', 'm2df.item', '0 AS rating') \
.show()
# +----+----+------+
# |user|item|rating|
# +----+----+------+
# | 1| 6| 0|
# | 1| 5| 0|
# | 1| 3| 0|
# +----+----+------+
Cross join
df = m1df.select('user').distinct().crossJoin(m2df.select('item'))
df.show()
query = """
SELECT f1.user, f2.item, 0 AS rating
FROM f1
CROSS JOIN f2
"""
df = spark.sql(query)
df.show()
all_user_item_pair_df = als_model.userFactors.selectExpr('id AS user') \
.crossJoin(alsModel.itemFactors.selectExpr('id AS item'))
# +----+----+
# |user|item|
# +----+----+
# |xxxx|oooo|
# +----+----+