1. 程式人生 > >spark range join 優化

spark range join 優化

文章目錄

背景

一張ip表,一張ip地理資訊表,地理資訊表每條資料包含了ip地址的起點和終點以及一些地理資訊, 需要用 ip 去關聯 gep_ip 中匹配相應的資訊 。

例如:
資料條數為 50 M 的表 ip_record,資料格式大致如下:

ip_int info
123456789 xx
987654321 xx

資料條數為 7 M 的表 geoip ,資料格式大致如下:

ipstart ipend country province city
0 10000
10001 25000

native join

ip_record 和 geoip 關聯,找出ip對應的geo資訊,寫出的 sql 應該是這樣的:

SELECT A.*,
       B.*
FROM   ip_record A
       JOIN geoip B
        ON A.ip_int >= B.ipstart
         AND A.ip_int <= B.ipend

會觸發一個 cartesian product ,然後通過 filter 篩出你需要的資料。

broadcast join ?

SELECT 
    /*+ broadcast(B) */
    A.*,
    B.*
FROM   
    ip_record A
    JOIN geoip B
    ON A.ip_int >= B.ipstart
    AND A.ip_int <= B.ipend

會觸發 BroadcastNestedLoopJoin ,每條 record 都會產生大量的迴圈。

上述兩種方法都會有 50M * 7M 次的迴圈。

解決方法

  1. 將 geoip 表的 ipstart 轉化為一個列表,進行廣播。
  2. 遍歷 record 表,在廣播列表中使用二分查詢到相應 ipstart。

pySpark (2.X) 程式碼實現:

from bisect import bisect_right
from pyspark.sql.types import LongType

#選取 ipstart 欄位,排序廣播
geo_start_bd = sc.broadcast(geo_ip
  .select("ipstart")
  .orderBy("ipstart") 
  .rdd
  .flatMap(lambda x: x)
  .collect())

#二分查詢,找到對應start
def find_le(x):
    i = bisect_right(geo_start_bd.value, x)
    if i:
        return geo_start_bd.value[i-1]
    return None

spark.udf.register("find_le",find_le)

spark.sql("""
    select 
        a.ip_int,b.country,b.province,b.city,b.isp
    from 
        (select *,find_le(ip_int) as ipstart from ip_record) a
    left join geo_ip b
    on a.ipstart = b.ipstart
""")

執行計劃 變成了 sortMergeJoin 。

結論

時間複雜度:O(N * M) -> O(N * LOG(M)) 。N 為 record 數量,M 為 geo_ip 表數量。

測試環境:

  • spark 2.2
  • executor(3c 12g) * 15
  • 所有record的資料分割槽數為 45

在這個場景中計算耗時:185 hour (預估,如果能計算出來) -> 2 min,效能提升了10000X

geo_ip 不變:計算時間隨 record 數量變化表:

record cartesianProduct broadcastNestLoopJoin (廣播 geo_ip) after optimized
10^4 6.2 min 3.5 min 27s
10^5 66 min 30 min 33s
10^6 - - 27s
10^7 - - 27s
10^8 - - 51s

參考

SPARK-8682