PostgreSQL的並行查詢
並行順序掃描(Parallel sequential scan)
在PostgreSQL 9.6中,增加了對並行順序掃描的支援。順序掃描是在表上進行的掃描,在該表中一個接一個的塊順序地被評估。就其本質而言,順序掃描允許並行化。這樣,整個表將在多個workers執行緒之間順序掃描。
並行順序掃描快並不是因為可以並行地讀,而是將資料分散到了多個cpu。
abce=# explain analyze select work_hour from hh_adds_static_day where create_time <= date '20201010'-interval '10' day; QUERY PLAN -------------------------------------------------------------------------------------------------------------------------------- Seq Scan on hh_adds_static_day (cost=0.00..261864.36 rows=4241981 width=4) (actual time=0.012..1407.214 rows=4228109 loops=1) Filter: (create_time <= '2020-09-30 00:00:00'::timestamp without time zone) Rows Removed by Filter: 735600 Planning Time: 0.108 ms Execution Time: 1585.835 ms (5 rows) abce=#
順序掃描產生了大量的行,但是沒有使用聚合函式。因此,查詢使用的是單個cpu核心。
增加一個sum函式後,很明顯使用了兩個工作執行緒,從而使得查詢加速:
abce=# explain analyze select sum(work_hour) from hh_adds_static_day where create_time <= date '20201010'-interval '10' day; QUERY PLAN ---------------------------------------------------------------------------------------------------------------------------------------------------------- Finalize Aggregate (cost=231089.60..231089.61 rows=1 width=4) (actual time=749.998..751.529 rows=1 loops=1) -> Gather (cost=231089.38..231089.59 rows=2 width=4) (actual time=749.867..751.515 rows=3 loops=1) Workers Planned: 2 Workers Launched: 2 -> Partial Aggregate (cost=230089.38..230089.39 rows=1 width=4) (actual time=746.463..746.464 rows=1 loops=3) -> Parallel Seq Scan on hh_adds_static_day (cost=0.00..225670.65 rows=1767492 width=4) (actual time=0.032..489.501 rows=1409370 loops=3) Filter: (create_time <= '2020-09-30 00:00:00'::timestamp without time zone) Rows Removed by Filter: 245200 Planning Time: 0.112 ms Execution Time: 751.611 ms (10 rows) abce=#
並行聚合(Parallel Aggregation)
在資料庫中,計算聚合是非常昂貴的操作。如果以單個程序進行執行,則這將花費相當長的時間。在PostgreSQL 9.6中,通過簡單地將它們分成多個塊(分而治之策略)來增加了平行計算的能力。多個worker執行緒執行聚合的部分,然後leader再根據它們的結果計算最終結果。
從技術上講,將Partial Aggregate節點新增到計劃樹中,並且每個Partial Aggregate節點包含一個worker執行緒的輸出。然後將這些輸出傳送到Finalize Aggregate節點,該節點合併來自多個(所有)Partial Aggregate節點的聚合。如此有效的並行部分計劃在根部包括一個Finalize Aggregate節點,以及一個將Partial Aggregate節點作為子節點的Gather節點。
''Parallel Seq Scan''節點生成用於部分聚合(''Partial Aggregate'')的行。
''Partial Aggregate''節點使用SUM()函式減少這些行。最後,由''Gather''節點收集每個worker的總和計數。
''Finalize Aggregate''節點計算最後的總和。如果你使用的自己的聚合函式,別忘了將其標記為''parallel safe''的。
worker數量
可以動態調整worker的數量:
abce=# show max_parallel_workers_per_gather; max_parallel_workers_per_gather --------------------------------- 2 (1 row) abce=# alter system set max_parallel_workers_per_gather=4; ALTER SYSTEM abce=# select * from pg_reload_conf(); pg_reload_conf ---------------- t (1 row) abce=# explain analyze select sum(work_hour) from hh_adds_static_day where create_time <= date '20201010'-interval '10' day; QUERY PLAN --------------------------------------------------------------------------------------------------------------------------------------------------------- Finalize Aggregate (cost=218981.25..218981.26 rows=1 width=4) (actual time=473.424..475.156 rows=1 loops=1) -> Gather (cost=218980.83..218981.24 rows=4 width=4) (actual time=473.314..475.144 rows=5 loops=1) Workers Planned: 4 Workers Launched: 4 -> Partial Aggregate (cost=217980.83..217980.84 rows=1 width=4) (actual time=468.769..468.770 rows=1 loops=5) -> Parallel Seq Scan on hh_adds_static_day (cost=0.00..215329.59 rows=1060495 width=4) (actual time=0.036..306.854 rows=845622 loops=5) Filter: (create_time <= '2020-09-30 00:00:00'::timestamp without time zone) Rows Removed by Filter: 147120 Planning Time: 0.150 ms Execution Time: 475.218 ms (10 rows) abce=#
我們將數量從2變成了4。
使用多少個worker程序
首先,max_parallel_workers_per_gather引數定義了worker的最小數量。
其次,查詢執行器從池中獲取worker的數量受限於max_parallel_workers的值。
最後,最頂層的限制是max_worker_processes的值,該引數定義了後臺worker程序的總數量。
如果分配worker程序失敗,就會切換成單程序執行。
查詢計劃器會根據表或索引的大小,考慮減少worker程序的數量。受引數min_parallel_table_scan_size、min_parallel_index_scan_size影響。
引數的預設設定:
abce=# show min_parallel_table_scan_size; min_parallel_table_scan_size ------------------------------ 8MB (1 row) abce=# show min_parallel_index_scan_size ; min_parallel_index_scan_size ------------------------------ 512kB (1 row) abce=#
影響關係:
set min_parallel_table_scan_size='8MB' 8MB table => 1 worker 24MB table => 2 workers 72MB table => 3 workers x => log(x / min_parallel_table_scan_size) / log(3) + 1 worker
每當表的大小是min_parallel(index|table)scan_size的三倍,postgresql就會增加一個worker程序。worker程序的數量不是基於cost的。
在實踐中,這些規則並不總是被遵守的,可以對特定的表執行alter table ... set (parallel_workers=N)進行設定。
為什麼並行執行沒有被使用?
除了並行執行的一些限制,postgresql也檢查成本(cost):
·parallel_setup_cost避免了對小的查詢採用並行執行。它對用於記憶體設定、程序啟動和初始通訊的時間進行建模
·parallel_tuple_cost:leader程序和worker程序之間的通訊會花費很長的時間。時間與worker傳送的元組數量成比例。該引數對通訊成本進行建模。
巢狀迴圈連線(Nested loop joins)
從9.6版本開始,postgresql支援對“Nested loop”執行並行操作:
explain (costs off) select c_custkey, count(o_orderkey) from customer left outer join orders on c_custkey = o_custkey and o_comment not like '%special%deposits%' group by c_custkey; QUERY PLAN -------------------------------------------------------------------------------------- Finalize GroupAggregate Group Key: customer.c_custkey -> Gather Merge Workers Planned: 4 -> Partial GroupAggregate Group Key: customer.c_custkey -> Nested Loop Left Join -> Parallel Index Only Scan using customer_pkey on customer -> Index Scan using idx_orders_custkey on orders Index Cond: (customer.c_custkey = o_custkey) Filter: ((o_comment)::text !~~ '%special%deposits%'::text)
雜湊連線(Hash Join)
在PostgreSQL 11之前,每個worker都構建自己的雜湊表。因此,4個以上的workers程序無法提高效能。新的實現使用一個共享雜湊表。每個worker都可以利用WORK_MEM來構建雜湊表。
select l_shipmode, sum(case when o_orderpriority = '1-URGENT' or o_orderpriority = '2-HIGH' then 1 else 0 end) as high_line_count, sum(case when o_orderpriority <> '1-URGENT' and o_orderpriority <> '2-HIGH' then 1 else 0 end) as low_line_count from orders, lineitem where o_orderkey = l_orderkey and l_shipmode in ('MAIL', 'AIR') and l_commitdate < l_receiptdate and l_shipdate < l_commitdate and l_receiptdate >= date '1996-01-01' and l_receiptdate < date '1996-01-01' + interval '1' year group by l_shipmode order by l_shipmode LIMIT 1;
QUERY PLAN ----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- Limit (cost=1964755.66..1964961.44 rows=1 width=27) (actual time=7579.592..7922.997 rows=1 loops=1) -> Finalize GroupAggregate (cost=1964755.66..1966196.11 rows=7 width=27) (actual time=7579.590..7579.591 rows=1 loops=1) Group Key: lineitem.l_shipmode -> Gather Merge (cost=1964755.66..1966195.83 rows=28 width=27) (actual time=7559.593..7922.319 rows=6 loops=1) Workers Planned: 4 Workers Launched: 4 -> Partial GroupAggregate (cost=1963755.61..1965192.44 rows=7 width=27) (actual time=7548.103..7564.592 rows=2 loops=5) Group Key: lineitem.l_shipmode -> Sort (cost=1963755.61..1963935.20 rows=71838 width=27) (actual time=7530.280..7539.688 rows=62519 loops=5) Sort Key: lineitem.l_shipmode Sort Method: external merge Disk: 2304kB Worker 0: Sort Method: external merge Disk: 2064kB Worker 1: Sort Method: external merge Disk: 2384kB Worker 2: Sort Method: external merge Disk: 2264kB Worker 3: Sort Method: external merge Disk: 2336kB -> Parallel Hash Join (cost=382571.01..1957960.99 rows=71838 width=27) (actual time=7036.917..7499.692 rows=62519 loops=5) Hash Cond: (lineitem.l_orderkey = orders.o_orderkey) -> Parallel Seq Scan on lineitem (cost=0.00..1552386.40 rows=71838 width=19) (actual time=0.583..4901.063 rows=62519 loops=5) Filter: ((l_shipmode = ANY ('{MAIL,AIR}'::bpchar[])) AND (l_commitdate < l_receiptdate) AND (l_shipdate < l_commitdate) AND (l_receiptdate >= '1996-01-01'::date) AND (l_receiptdate < '1997-01-01 00:00:00'::timestamp without time zone)) Rows Removed by Filter: 11934691 -> Parallel Hash (cost=313722.45..313722.45 rows=3750045 width=20) (actual time=2011.518..2011.518 rows=3000000 loops=5) Buckets: 65536 Batches: 256 Memory Usage: 3840kB -> Parallel Seq Scan on orders (cost=0.00..313722.45 rows=3750045 width=20) (actual time=0.029..995.948 rows=3000000 loops=5) Planning Time: 0.977 ms Execution Time: 7923.770 ms
這裡每個worker幫助構建一個共享的hash表。
合併連線(Merge Join)
鑑於合併連線的自身特性,使其不支援並行查詢。如果合併連線是查詢執行的最後一個階段-你仍可以看到並行執行。
-- Query 2 from TPC-H explain (costs off) select s_acctbal, s_name, n_name, p_partkey, p_mfgr, s_address, s_phone, s_comment from part, supplier, partsupp, nation, region where p_partkey = ps_partkey and s_suppkey = ps_suppkey and p_size = 36 and p_type like '%BRASS' and s_nationkey = n_nationkey and n_regionkey = r_regionkey and r_name = 'AMERICA' and ps_supplycost = ( select min(ps_supplycost) from partsupp, supplier, nation, region where p_partkey = ps_partkey and s_suppkey = ps_suppkey and s_nationkey = n_nationkey and n_regionkey = r_regionkey and r_name = 'AMERICA' ) order by s_acctbal desc, n_name, s_name, p_partkey LIMIT 100; QUERY PLAN ---------------------------------------------------------------------------------------------------------- Limit -> Sort Sort Key: supplier.s_acctbal DESC, nation.n_name, supplier.s_name, part.p_partkey -> Merge Join Merge Cond: (part.p_partkey = partsupp.ps_partkey) Join Filter: (partsupp.ps_supplycost = (SubPlan 1)) -> Gather Merge Workers Planned: 4 -> Parallel Index Scan using <strong>part_pkey</strong> on part Filter: (((p_type)::text ~~ '%BRASS'::text) AND (p_size = 36)) -> Materialize -> Sort Sort Key: partsupp.ps_partkey -> Nested Loop -> Nested Loop Join Filter: (nation.n_regionkey = region.r_regionkey) -> Seq Scan on region Filter: (r_name = 'AMERICA'::bpchar) -> Hash Join Hash Cond: (supplier.s_nationkey = nation.n_nationkey) -> Seq Scan on supplier -> Hash -> Seq Scan on nation -> Index Scan using idx_partsupp_suppkey on partsupp Index Cond: (ps_suppkey = supplier.s_suppkey) SubPlan 1 -> Aggregate -> Nested Loop Join Filter: (nation_1.n_regionkey = region_1.r_regionkey) -> Seq Scan on region region_1 Filter: (r_name = 'AMERICA'::bpchar) -> Nested Loop -> Nested Loop -> Index Scan using idx_partsupp_partkey on partsupp partsupp_1 Index Cond: (part.p_partkey = ps_partkey) -> Index Scan using supplier_pkey on supplier supplier_1 Index Cond: (s_suppkey = partsupp_1.ps_suppkey) -> Index Scan using nation_pkey on nation nation_1 Index Cond: (n_nationkey = supplier_1.s_nationkey)
“Merge Join”節點在“Gather Merge”上方。 因此,合併不使用並行執行。但是“Parallel Index Scan”節點仍然有助於part_pkey。
分割槽智慧連線(Partition-wise join)
PostgreSQL 11預設禁用分割槽智慧連線(partition-wise join)功能。分割槽智慧聯接的計劃成本很高。分割槽相似的表的聯接可以逐分割槽進行。這允許postgres使用較小的雜湊表。每個分割槽聯接操作都可以並行執行。
tpch=# set enable_partitionwise_join=t; tpch=# explain (costs off) select * from prt1 t1, prt2 t2 where t1.a = t2.b and t1.b = 0 and t2.b between 0 and 10000; QUERY PLAN --------------------------------------------------- Append -> Hash Join Hash Cond: (t2.b = t1.a) -> Seq Scan on prt2_p1 t2 Filter: ((b >= 0) AND (b <= 10000)) -> Hash -> Seq Scan on prt1_p1 t1 Filter: (b = 0) -> Hash Join Hash Cond: (t2_1.b = t1_1.a) -> Seq Scan on prt2_p2 t2_1 Filter: ((b >= 0) AND (b <= 10000)) -> Hash -> Seq Scan on prt1_p2 t1_1 Filter: (b = 0) tpch=# set parallel_setup_cost = 1; tpch=# set parallel_tuple_cost = 0.01; tpch=# explain (costs off) select * from prt1 t1, prt2 t2 where t1.a = t2.b and t1.b = 0 and t2.b between 0 and 10000; QUERY PLAN ----------------------------------------------------------- Gather Workers Planned: 4 -> Parallel Append -> Parallel Hash Join Hash Cond: (t2_1.b = t1_1.a) -> Parallel Seq Scan on prt2_p2 t2_1 Filter: ((b >= 0) AND (b <= 10000)) -> Parallel Hash -> Parallel Seq Scan on prt1_p2 t1_1 Filter: (b = 0) -> Parallel Hash Join Hash Cond: (t2.b = t1.a) -> Parallel Seq Scan on prt2_p1 t2 Filter: ((b >= 0) AND (b <= 10000)) -> Parallel Hash -> Parallel Seq Scan on prt1_p1 t1 Filter: (b = 0)
最重要的是,僅在分割槽足夠大的情況下,分割槽智慧聯接才能使用並行執行。
通常可以在UNION ALL查詢中看到這一點。缺點是並行性差,因為每個work最終都是為單個查詢工作。
即使啟用了四個worker,也只有兩個被啟動。
tpch=# explain (costs off) select sum(l_quantity) as sum_qty from lineitem where l_shipdate <= date '1998-12-01' - interval '105' day union all select sum(l_quantity) as sum_qty from lineitem where l_shipdate <= date '2000-12-01' - interval '105' day; QUERY PLAN ------------------------------------------------------------------------------------------------ Gather Workers Planned: 2 -> Parallel Append -> Aggregate -> Seq Scan on lineitem Filter: (l_shipdate <= '2000-08-18 00:00:00'::timestamp without time zone) -> Aggregate -> Seq Scan on lineitem lineitem_1 Filter: (l_shipdate <= '1998-08-18 00:00:00'::timestamp without time zone)
重要的變數
·work_mem
·max_parallel_workers_per_gather:執行器為每個並行執行的計劃節點分配的worker數量
·max_worker_processes
·max_parallel_workers
在9.6版本中,並行查詢執行被引入;
在10版本中,預設開啟並行執行;
在負載重的oltp系統上,建議關閉並行執行。