This example demonstrates how to reduce resource consumption and improve SQL performance by modifying the sharding keys.
Example SQL
create table test(col int, id int, name text)
distribute by hash (col);
create table test_1(col int, id int, name varchar(64))
distribute by hash(name);
insert into test select 1, generate_series(1, 100000), md5(random()::text);
insert into test select 64, generate_series(1, 100000), md5(random()::text);
insert into test_1 select generate_series(1, 100000), generate_series(1, 100000), md5(random()::text);
Execution Plan
explain analyze select * from test a join test_1 b on a.col=b.id ;
QUERY PLAN
--------------------------------------------------------------------------------------------------------------------
Streaming(type: GATHER) (cost=13.29..29.25 rows=10 width=194) (actual time=106.033..1529.207 rows=200000 loops=1)
Spawn on: All datanodes
-> Hash Join (cost=13.29..28.64 rows=20 width=194) (Actual time: never executed)
Hash Cond: (a.col = b.id)
-> Streaming(type: BROADCAST) (cost=0.00..15.18 rows=40 width=40) (Actual time: never executed)
Spawn on: All datanodes
-> Seq Scan on test a (cost=0.00..13.13 rows=20 width=40) (Actual time: never executed)
-> Hash (cost=13.13..13.13 rows=21 width=154) (Actual time: never executed)
Buckets: 0 Batches: 0 Memory Usage: 0kB
-> Seq Scan on test_1 b (cost=0.00..13.13 rows=20 width=154) (Actual time: never executed)
Total runtime: 1562.160 ms
Since the sharding key for the test
table is the col
field and the sharding key for the test_1
table is name
, the join condition is a.col = b.id
. The execution plan involves broadcasting the test
table to all datanodes, resulting in each datanode having a copy of the test
table. Each datanode then performs the join based on the condition a.col = b.id
. After all datanodes complete the join, the results are returned to the upper-level coordinator node (CN) through streaming (GATHER). This process involves data interaction between datanodes, causing additional network overhead, which can be optimized.
Optimization Point: To eliminate the overhead caused by data interactions and network communication between datanodes, set the sharding key of the test_1
table to the id
field based on the join condition.
create table test(col int, id int, name text)
distribute by hash (col);
create table test_1(col int, id int, name varchar(64))
distribute by hash(id);
insert into test select 1, generate_series(1, 100000), md5(random()::text);
insert into test select 64, generate_series(1, 100000), md5(random()::text);
insert into test_1 select generate_series(1, 100000), generate_series(1, 100000), md5(random()::text);
Optimized Execution Plan
postgres=# explain analyze select * from test a join test_1 b on a.col=b.id;
QUERY PLAN
-------------------------------------------------------------------------------------------------------------------------
Data Node Scan (cost=0.00..0.00 rows=1000 width=194) (actual time=36.912..894.167 rows=200000 loops=1)
Node/s: All datanodes
Remote query: SELECT a.col, a.id, a.name, b.col, b.id, b.name FROM public.test a JOIN public.test_1 b ON a.col = b.id
-> Hash Join (cost=805.59..238675.59 rows=20775000 width=195)
Hash Cond: (a.col = b.id)
-> Seq Scan on test a (cost=0.00..3870.00 rows=200000 width=41)
-> Hash (cost=675.75..675.75 rows=20775 width=154)
-> Seq Scan on test_1 b (cost=0.00..675.75 rows=20775 width=154)
Total runtime: 955.638 ms
(9 rows)
The execution time has improved from 1562.160 ms to 955.638 ms. The execution plan shows that the overhead caused by data interactions and network communication between datanodes has been eliminated. Each datanode performs the join query only within its own node, and the results are then returned to the upper-level coordinator node.
Conclusion
This example demonstrates how to utilize the join condition and adjust the sharding keys of the tables to eliminate the overhead caused by data interactions and network communication between datanodes. This optimization results in a more efficient execution plan.