spark range join 優化
阿新 • • 發佈:2018-12-13
文章目錄
背景
一張ip表,一張ip地理資訊表,地理資訊表每條資料包含了ip地址的起點和終點以及一些地理資訊, 需要用 ip 去關聯 gep_ip 中匹配相應的資訊 。
例如:
資料條數為 50 M 的表 ip_record,資料格式大致如下:
ip_int | info |
---|---|
123456789 | xx |
987654321 | xx |
資料條數為 7 M 的表 geoip ,資料格式大致如下:
ipstart | ipend | country | province | city | … |
---|---|---|---|---|---|
0 | 10000 | … | … | … | … |
10001 | 25000 | … | … | … | … |
native join
ip_record 和 geoip 關聯,找出ip對應的geo資訊,寫出的 sql 應該是這樣的:
SELECT A.*,
B.*
FROM ip_record A
JOIN geoip B
ON A.ip_int >= B.ipstart
AND A.ip_int <= B.ipend
會觸發一個 cartesian product ,然後通過 filter 篩出你需要的資料。
broadcast join ?
SELECT /*+ broadcast(B) */ A.*, B.* FROM ip_record A JOIN geoip B ON A.ip_int >= B.ipstart AND A.ip_int <= B.ipend
會觸發 BroadcastNestedLoopJoin
,每條 record 都會產生大量的迴圈。
上述兩種方法都會有 50M * 7M 次的迴圈。
解決方法
- 將 geoip 表的 ipstart 轉化為一個列表,進行廣播。
- 遍歷 record 表,在廣播列表中使用二分查詢到相應 ipstart。
pySpark (2.X) 程式碼實現:
from bisect import bisect_right
from pyspark.sql.types import LongType
#選取 ipstart 欄位,排序廣播
geo_start_bd = sc.broadcast(geo_ip
.select("ipstart")
.orderBy("ipstart")
.rdd
.flatMap(lambda x: x)
.collect())
#二分查詢,找到對應start
def find_le(x):
i = bisect_right(geo_start_bd.value, x)
if i:
return geo_start_bd.value[i-1]
return None
spark.udf.register("find_le",find_le)
spark.sql("""
select
a.ip_int,b.country,b.province,b.city,b.isp
from
(select *,find_le(ip_int) as ipstart from ip_record) a
left join geo_ip b
on a.ipstart = b.ipstart
""")
執行計劃 變成了 sortMergeJoin 。
結論
時間複雜度:O(N * M) -> O(N * LOG(M))
。N 為 record 數量,M 為 geo_ip 表數量。
測試環境:
- spark 2.2
- executor(3c 12g) * 15
- 所有record的資料分割槽數為 45
在這個場景中計算耗時:185 hour (預估,如果能計算出來) -> 2 min,效能提升了10000X
geo_ip 不變:計算時間隨 record 數量變化表:
record | cartesianProduct | broadcastNestLoopJoin (廣播 geo_ip) | after optimized |
---|---|---|---|
10^4 | 6.2 min | 3.5 min | 27s |
10^5 | 66 min | 30 min | 33s |
10^6 | - | - | 27s |
10^7 | - | - | 27s |
10^8 | - | - | 51s |