1. 程式人生 > 實用技巧 >Spark 兩種方法計算分組取Top N

Spark 兩種方法計算分組取Top N

Spark 分組取Top N運算

大資料處理中,對資料分組後,取TopN是非常常見的運算。

下面我們以一個例子來展示spark如何進行分組取Top的運算。

1、RDD方法分組取TopN

from pyspark import SparkContext
sc = SparkContext()

準備資料,把資料轉換為rdd格式

data_list = [
(0, "cat26", 130.9), (0, "cat13", 122.1), (0, "cat95", 119.6), (0, "cat105", 11.3),
(1, "cat67", 128.5), (1, "cat4", 126.8), (1, "cat13", 112.6), (1, "cat23", 15.3),
(2, "cat56", 139.6), (2, "cat40", 129.7), (2, "cat187", 127.9), (2, "cat68", 19.8),
(3, "cat8", 135.6)
] data = sc.parallelize(data_list)
data.collect()
[(0, 'cat26', 130.9),
(0, 'cat13', 122.1),
(0, 'cat95', 119.6),
(0, 'cat105', 11.3),
(1, 'cat67', 128.5),
(1, 'cat4', 126.8),
(1, 'cat13', 112.6),
(1, 'cat23', 15.3),
(2, 'cat56', 139.6),
(2, 'cat40', 129.7),
(2, 'cat187', 127.9),
(2, 'cat68', 19.8),
(3, 'cat8', 135.6)]

對資料使用groupBy操作來分組。可以看到分組後資料為(key, list_data)

d1 = data.groupBy(lambda x:x[0])
temp = d1.collect()
print(list(temp[0][1]))
print(temp)
[(0, 'cat26', 130.9), (0, 'cat13', 122.1), (0, 'cat95', 119.6), (0, 'cat105', 11.3)]
[(0, <pyspark.resultiterable.ResultIterable object at 0x0000000007D2C710>), (1, <pyspark.resultiterable.ResultIterable object at 0x0000000007D2C780>), (2, <pyspark.resultiterable.ResultIterable object at 0x0000000007D2C898>), (3, <pyspark.resultiterable.ResultIterable object at 0x0000000007D2C9B0>)]

使用mapValues方法對資料進行排序。

可以根據需要來取Top N 資料。

這裡取Top 3 的資料

d2 = d1.mapValues(lambda x: sorted(x, key=lambda y:y[2])[:3])
d2.collect()
[(0, [(0, 'cat105', 11.3), (0, 'cat95', 119.6), (0, 'cat13', 122.1)]),
(1, [(1, 'cat23', 15.3), (1, 'cat13', 112.6), (1, 'cat4', 126.8)]),
(2, [(2, 'cat68', 19.8), (2, 'cat187', 127.9), (2, 'cat40', 129.7)]),
(3, [(3, 'cat8', 135.6)])]

使用flatmap方法把結果拉平,變成一個list返回。

d3 = d2.flatMap(lambda x:[i for i in x[1]])
d3.collect()
[(0, 'cat105', 11.3),
(0, 'cat95', 119.6),
(0, 'cat13', 122.1),
(1, 'cat23', 15.3),
(1, 'cat13', 112.6),
(1, 'cat4', 126.8),
(2, 'cat68', 19.8),
(2, 'cat187', 127.9),
(2, 'cat40', 129.7),
(3, 'cat8', 135.6)]

整體程式碼

from pyspark import SparkContext
# sc = SparkContext() topN = 3
data_list = [
(0, "cat26", 130.9), (0, "cat13", 122.1), (0, "cat95", 119.6), (0, "cat105", 11.3),
(1, "cat67", 128.5), (1, "cat4", 126.8), (1, "cat13", 112.6), (1, "cat23", 15.3),
(2, "cat56", 139.6), (2, "cat40", 129.7), (2, "cat187", 127.9), (2, "cat68", 19.8),
(3, "cat8", 135.6)
] data = sc.parallelize(data_list)
d1 = data.groupBy(lambda x:x[0])
d2 = d1.mapValues(lambda x: sorted(x, key=lambda y:y[2])[:topN])
d3 = d2.flatMap(lambda x:[i for i in x[1]])
d3.collect()
[(0, 'cat105', 11.3),
(0, 'cat95', 119.6),
(0, 'cat13', 122.1),
(1, 'cat23', 15.3),
(1, 'cat13', 112.6),
(1, 'cat4', 126.8),
(2, 'cat68', 19.8),
(2, 'cat187', 127.9),
(2, 'cat40', 129.7),
(3, 'cat8', 135.6)]

2、Dataframe方法分組取TopN

dataframe資料格式分組取top N,簡單的方法是使用Window方法

from pyspark.sql import SparkSession
from pyspark.sql import functions as func
from pyspark.sql import Window spark = SparkSession.builder.getOrCreate() data_list = [
(0, "cat26", 130.9), (0, "cat13", 122.1), (0, "cat95", 119.6), (0, "cat105", 11.3),
(1, "cat67", 128.5), (1, "cat4", 126.8), (1, "cat13", 112.6), (1, "cat23", 15.3),
(2, "cat56", 139.6), (2, "cat40", 129.7), (2, "cat187", 127.9), (2, "cat68", 19.8),
(3, "cat8", 135.6)
]
根據資料建立dataframe,並給資料列命名
df = spark.createDataFrame(data_list, ["Hour", "Category", "TotalValue"])
df.show()
+----+--------+----------+
|Hour|Category|TotalValue|
+----+--------+----------+
| 0| cat26| 130.9|
| 0| cat13| 122.1|
| 0| cat95| 119.6|
| 0| cat105| 11.3|
| 1| cat67| 128.5|
| 1| cat4| 126.8|
| 1| cat13| 112.6|
| 1| cat23| 15.3|
| 2| cat56| 139.6|
| 2| cat40| 129.7|
| 2| cat187| 127.9|
| 2| cat68| 19.8|
| 3| cat8| 135.6|
+----+--------+----------+
  1. 使用視窗方法,分片引數為分組的key,

  2. orderBy的引數為排序的key,這裡使用desc降序排列。

  3. withColumn(colName, col),為df新增一列,資料為對window函式生成的資料編號

  4. where方法取rn列值小於3的資料,即取top3資料

w = Window.partitionBy(df.Hour).orderBy(df.TotalValue.desc())
top3 = df.withColumn('rn', func.row_number().over(w)).where('rn <=3')
top3.show()
+----+--------+----------+---+
|Hour|Category|TotalValue| rn|
+----+--------+----------+---+
| 0| cat26| 130.9| 1|
| 0| cat13| 122.1| 2|
| 0| cat95| 119.6| 3|
| 1| cat67| 128.5| 1|
| 1| cat4| 126.8| 2|
| 1| cat13| 112.6| 3|
| 3| cat8| 135.6| 1|
| 2| cat56| 139.6| 1|
| 2| cat40| 129.7| 2|
| 2| cat187| 127.9| 3|
+----+--------+----------+---+
### 程式碼彙總

from pyspark.sql import SparkSession
from pyspark.sql import functions as func
from pyspark.sql import Window spark = SparkSession.builder.getOrCreate() data_list = [
(0, "cat26", 130.9), (0, "cat13", 122.1), (0, "cat95", 119.6), (0, "cat105", 11.3),
(1, "cat67", 128.5), (1, "cat4", 126.8), (1, "cat13", 112.6), (1, "cat23", 15.3),
(2, "cat56", 139.6), (2, "cat40", 129.7), (2, "cat187", 127.9), (2, "cat68", 19.8),
(3, "cat8", 135.6)
]
df = spark.createDataFrame(data_list, ["Hour", "Category", "TotalValue"]) w = Window.partitionBy(df.Hour).orderBy(df.TotalValue.desc())
top3 = df.withColumn('rn', func.row_number().over(w)).where('rn <=3') top3.show()