spark sql優化:小表大表關聯優化 & union替換or & broadcast join
阿新 • • 發佈:2019-01-27
----原語句(執行18min)
SELECT
bb.ip
FROM
(
SELECT
ip ,
sum(click) click_num,
round(sum(click) / sum(imp), 4) user_click_rate
FROM
schema.srctable1
WHERE
date = '20171020'
AND ip IS NOT NULL
AND imp > 0
GROUP BY
ip
)
bb
LEFT OUTER JOIN
(
SELECT
round(sum(click) / sum(imp), 4) avg_click_rate
FROM
schema.srctable1
WHERE
date = '20171020'
)
aa
LEFT OUTER JOIN schema.dstable cc
on
cc.ip = bb.ip
WHERE
cc.ip is null
AND
(
bb.user_click_rate > aa.avg_click_rate * 3
AND click_num > 500
)
OR
(
click_num > 1000
)
分析:
1、aa表存放的就是一個指標資料,1條記錄,列為小表2、bb表存放的是按ip聚合的明細資料,記錄很多,列為大表
3、cc表用來過濾ip,數量也很小,列為過濾表,作用很小。
檢視執行計劃,發現bb與aa進行left outer join時,引發了shuffle過程,造成大量的磁碟及網路IO,影響效能。
解決策略
優化方案1:調整大小表位置,將小表放在左邊後,提升至29s (該方案一直不太明白為啥會提升,執行計劃裡顯示的也就是大小表位置調換下而已,跟之前的沒其他區別)優化方案2: 將 or 改成 union,提升至35s(各種調整,一直懷疑跟or有關係,後面調整成union其他不變,果真效率不一樣;但方案1只是調整了下大小表順序,並未調整其他,其效率同樣提升很大;不太明白sparksql內部到底走了什麼優化機制,後面繼續研究);
優化方案3: 採用cache+broadcast方式,提升至20s(該方案將小表快取至記憶體,進行map側關聯)
方案具體實施
----方案2:or 改成 union(執行35s)
I select
aa.ip
from
(
SELECT
bb.ip ip
FROM
(
SELECT
ip ,
sum(click) click_num,
round(sum(click) / sum(imp), 4)
user_click_rate
FROM
schema.srctable1
WHERE
date = '20171020'
AND ip IS NOT NULL
AND imp > 0
GROUP BY
ip
)
bb
LEFT OUTER JOIN
(
SELECT
round(sum(click) / sum(imp), 4)
avg_click_rate
FROM
schema.srctable1
WHERE
date = '20171020'
)
aa
WHERE
(
bb.user_click_rate > aa.avg_click_rate * 3
AND click_num > 20
)
union
SELECT
bb.ip ip
FROM
(
SELECT
ip ,
sum(click) click_num,
round(sum(click) / sum(imp), 4)
user_click_rate
FROM
schema.srctable1
WHERE
date = '20171020'
AND ip IS NOT NULL
AND imp > 0
GROUP BY
ip
)
bb
LEFT OUTER JOIN
(
SELECT
round(sum(click) / sum(imp), 4)
avg_click_rate
FROM
schema.srctable1
WHERE
date = '20171020'
)
aa
WHERE
click_num > 40
)
aa
LEFT OUTER JOIN schema.dstable cc
on
aa.ip = cc.ip
where
cc.ip is null
-----cache+broadcast方式(20s)
原理:使用broadcast將會把小表分發到每臺執行節點上,因此,關聯操作都在本地完成,基本就取消了shuffle的過程,執行效率大幅度提高。
cache table cta
as
SELECT
round(sum(click) / sum(imp), 4) avg_click_rate
FROM
schema.srctable1
WHERE
date = '20171020';
INSERT into TABLE schema.dstable
SELECT
bb.ip
FROM
(
SELECT
ip ,
sum(click) click_num,
round(sum(click) / sum(imp), 4) user_click_rate
FROM
schema.srctable1
WHERE
date = '20171020'
AND ip IS NOT NULL
AND imp > 0
GROUP BY
ip
)
bb
LEFT OUTER JOIN cta aa
LEFT OUTER JOIN schema.dstable cc
on
cc.ip = bb.ip
WHERE
cc.ip is null
AND
(
bb.user_click_rate > aa.avg_click_rate * 3
AND click_num > 500
)
OR
(
click_num > 1000
)
注意:
cache 表不一定會被廣播到Executor,執行map side join,還受另外一個引數:spark.sql.autoBroadcastJoinThreshold影響,該引數判斷是否將該表廣播;
spark.sql.autoBroadcastJoinThreshold引數預設值是10M,所以只有cache的表小於10M的才被廣播到Executor上去執行map side join。