1. 程式人生 > >pyspark系列--統計基礎

pyspark系列--統計基礎

統計基礎

1. 簡單統計

在資料分析中,基本統計分析已經能滿足95%的需求了,什麼是基本統計分析呢,就是均值,方差,標準差,抽樣,卡方,相關係數,協方差,假設檢驗等。如果你的需求超出了這個範疇,我想你應該從事很高深的工作吧,或者你在一個很厲害的公司或者很牛逼的部門,那麼你也不用擔心spark做不到,因為有人會幫你做到的。

spark dataframe的基本統計函式已經包含在 pyspark.sql.functions 中,類似的,dataframe本身也有一些統計方法。

2. 隨機數

# 基於dataframe生成相同行數的隨機數
from pyspark.sql.functions import
rand, randn # 均勻分佈和正太分佈函式 color_df.select(rand(seed=10).alias("uniform"), randn(seed=27).alias("normal"))\ .show() # 或者隨機生成指定行數的dataframe df = spark.range(0, 10).withColumn('rand1', rand(seed=10)) \ .withColumn('rand2', rand(seed=27)) df.show()

3. 四捨五入

from
pyspark.sql.functions import round df = spark.createDataFrame([(2.5,)], ['a']) df.select(round('a', 0).alias('r')).show()

4. 抽樣

from pyspark.sql
spark = SparkSession \
    .builder \
    .appName('my_first_app_name') \
    .getOrCreate()

# 生成測試資料
colors = ['white','green','yellow','red','brown'
,'pink'] color_df=pd.DataFrame(colors,columns=['color']) color_df['length']=color_df['color'].apply(len) # 抽樣 sample1 = color_df.sample( withReplacement=False, # 無放回抽樣 fraction=0.6, seed=1000) sample1.show()

5. 描述性統計

# dataframe本身也有基本統計的方法,和pandas一致
import numpy as np
import pandas as pd

# 1.生成測試資料
df=pd.DataFrame(np.random.rand(5,5),columns=['a','b','c','d','e']).\
    applymap(lambda x: int(x*10))
df.iloc[2,2]=np.nan

spark_df=spark.createDataFrame(df)
spark_df.show()

# 2.描述性統計資訊
spark_df.describe().show()

# 3.針對一個欄位的統計資訊
spark_df.describe('a').show()

6. 最大值最小值

from pyspark.sql.functions import min, max
color_df.select(min('uniform'), max('uniform')).show()

7. 均值方差

均值方差標準差前面提到過,這裡再複習一下

from pyspark.sql.functions import mean, stddev  # 同樣是在function裡面

color_df.select(mean('uniform').alias('mean'),
                stddev('uniform').alias('stddev'))\
    .show()

8. 協方差與相關係數

# 協方差
df.stat.cov('rand1','rand2')

# 樣本協方差
from pyspark.sql.functions import covar_pop
df.agg(covar_samp("rand1", "rand1").alias('new_col')).collect()

# 相關係數
df.stat.corr('rand1', 'rand2')

9. 交叉表(列聯表)

# 交叉列表
# Create a DataFrame with two columns (name, item)
names = ["Alice", "Bob", "Mike"]
items = ["milk", "bread", "butter", "apples", "oranges"]
df = spark.createDataFrame([(names[i % 3], items[i % 5]) for i in range(100)], ["name", "item"])
df.show(5)

df.stat.crosstab("name", "item").show()
# +---------+------+-----+------+----+-------+
# |name_item|apples|bread|butter|milk|oranges|
# +---------+------+-----+------+----+-------+
# |      Bob|     6|    7|     7|   6|      7|
# |     Mike|     7|    6|     7|   7|      6|
# |    Alice|     7|    7|     6|   7|      7|
# +---------+------+-----+------+----+-------+

10. 頻繁專案元素

# 找出現次數最多的元素(頻數分佈)
df = spark.createDataFrame([(1, 2, 3) if i % 2 == 0 else (i, 2 * i, i % 4) for i in range(100)],
                           ["a", "b", "c"])
df.show(10)

# 下面的程式碼找到每列出現次數佔總的40%以上頻繁專案
df.stat.freqItems(["a", "b", "c"], 0.4).show()
# +-----------+-----------+-----------+
# |a_freqItems|b_freqItems|c_freqItems|
# +-----------+-----------+-----------+
# |    [23, 1]|    [2, 46]|     [1, 3]|
# +-----------+-----------+-----------+
# “23”和“1”是列“a”的頻繁值

11. 其他數學函式

通過觀察pyspark.sql.functions模組,發現還有很多常用的好用的函式。

11.1. 數學函式

函式 作用
log 對數
log2 以2為底的對數
factorial 階乘

12. 元素去重計數

from pyspark.sql import functions as func

df = spark.createDataFrame([(1, 2, 3) if i % 2 == 0 else (i, 2 * i, i % 4) for i in range(10)],
                           ["a", "b", "c"])
# 注意agg函式的使用
df.agg(func.countDistinct('a')).show()

13. 聚合函式 grouping

沒看懂,誰看懂了告訴我。

Aggregate function: indicates whether a specified column in a GROUP BY list is aggregated
or not, returns 1 for aggregated or 0 for not aggregated in the result set.
from pyspark.sql import functions as func

df.cube("name").agg(func.grouping("name"), func.sum("age")).orderBy("name").show()

# +-----+--------------+--------+
# | name|grouping(name)|sum(age)|
# +-----+--------------+--------+
# | null|             1|       7|
# |Alice|             0|       2|
# |  Bob|             0|       5|
# +-----+--------------+--------+

14. 聚合函式 grouping_id

同樣沒看懂。

Aggregate function: returns the level of grouping, equals to

(grouping(c1) << (n-1)) + (grouping(c2) << (n-2)) + ... + grouping(cn)

note:: The list of columns should match with grouping columns exactly, or empty (means all the grouping columns).
df.cube("name").agg(grouping_id(), sum("age")).orderBy("name").show()
# +-----+-------------+--------+
# | name|grouping_id()|sum(age)|
# +-----+-------------+--------+
# | null|            1|       7|
# |Alice|            0|       2|
# |  Bob|            0|       5|
# +-----+-------------+--------+

15. 分組排序

from pyspark.sql.types import IntegerType, StringType
from pyspark.sql import Window
from pyspark.sql.functions import *
rdd = sc.parallelize([(1,'Alice', 18),(2,'Andy', 19),(3,'Bob', 17),(1,'Justin', 21),(1,'Cindy', 20)])
schema = StructType([
    StructField("id", IntegerType(), True),
    StructField("name", StringType(), True),
    StructField("age", IntegerType(), True)
])

df = spark.createDataFrame(rdd, schema)
# 按照每個組內的年齡排序,組外的分佈並不管
df.withColumn("rn", row_number().over(Window.partitionBy("id").orderBy("age"))).show()
# +---+------+---+---+
# | id|  name|age| rn|
# +---+------+---+---+
# |  1| Alice| 18|  1|
# |  1| Cindy| 20|  2|
# |  1|Justin| 21|  3|
# |  3|   Bob| 17|  1|
# |  2|  Andy| 19|  1|
# +---+------+---+---+

# 按照年齡排序,組外面分佈也管
df.withColumn("rn", row_number().over(Window.partitionBy("id").orderBy("age"))).orderBy("age").show()
# +---+------+---+---+
# | id|  name|age| rn|
# +---+------+---+---+
# |  3|   Bob| 17|  1|
# |  1| Alice| 18|  1|
# |  2|  Andy| 19|  1|
# |  1| Cindy| 20|  2|
# |  1|Justin| 21|  3|
# +---+------+---+---+