GBase 8c SQL Performance Optimization with Sharding Keys

Cong Li - Jun 27 - - Dev Community

This example demonstrates how to reduce resource consumption and improve SQL performance by modifying the sharding keys.

Example SQL

create table test(col int, id int, name text)
distribute by hash (col);

create table test_1(col int, id int, name varchar(64))
distribute by hash(name);

insert into test select 1, generate_series(1, 100000), md5(random()::text);
insert into test select 64, generate_series(1, 100000), md5(random()::text);

insert into test_1 select generate_series(1, 100000), generate_series(1, 100000), md5(random()::text);
Enter fullscreen mode Exit fullscreen mode

Image description

Execution Plan

explain analyze select * from test a join test_1 b on a.col=b.id ; 
                                                     QUERY PLAN                                                     
--------------------------------------------------------------------------------------------------------------------
 Streaming(type: GATHER)  (cost=13.29..29.25 rows=10 width=194) (actual time=106.033..1529.207 rows=200000 loops=1)
   Spawn on: All datanodes
   ->  Hash Join  (cost=13.29..28.64 rows=20 width=194) (Actual time: never executed)
         Hash Cond: (a.col = b.id)
         ->  Streaming(type: BROADCAST)  (cost=0.00..15.18 rows=40 width=40) (Actual time: never executed)
               Spawn on: All datanodes
               ->  Seq Scan on test a  (cost=0.00..13.13 rows=20 width=40) (Actual time: never executed)
         ->  Hash  (cost=13.13..13.13 rows=21 width=154) (Actual time: never executed)
                Buckets: 0  Batches: 0  Memory Usage: 0kB
               ->  Seq Scan on test_1 b  (cost=0.00..13.13 rows=20 width=154) (Actual time: never executed)
 Total runtime: 1562.160 ms
Enter fullscreen mode Exit fullscreen mode

Since the sharding key for the test table is the col field and the sharding key for the test_1 table is name, the join condition is a.col = b.id. The execution plan involves broadcasting the test table to all datanodes, resulting in each datanode having a copy of the test table. Each datanode then performs the join based on the condition a.col = b.id. After all datanodes complete the join, the results are returned to the upper-level coordinator node (CN) through streaming (GATHER). This process involves data interaction between datanodes, causing additional network overhead, which can be optimized.

Optimization Point: To eliminate the overhead caused by data interactions and network communication between datanodes, set the sharding key of the test_1 table to the id field based on the join condition.

create table test(col int, id int, name text)
distribute by hash (col);

create table test_1(col int, id int, name varchar(64))
distribute by hash(id);

insert into test select 1, generate_series(1, 100000), md5(random()::text);
insert into test select 64, generate_series(1, 100000), md5(random()::text);

insert into test_1 select generate_series(1, 100000), generate_series(1, 100000), md5(random()::text);
Enter fullscreen mode Exit fullscreen mode

Image description

Optimized Execution Plan

postgres=# explain analyze select * from test a join test_1 b on a.col=b.id; 
                                                      QUERY PLAN                                                        
-------------------------------------------------------------------------------------------------------------------------
Data Node Scan  (cost=0.00..0.00 rows=1000 width=194) (actual time=36.912..894.167 rows=200000 loops=1)
  Node/s: All datanodes
  Remote query: SELECT a.col, a.id, a.name, b.col, b.id, b.name FROM public.test a JOIN public.test_1 b ON a.col = b.id
  ->  Hash Join  (cost=805.59..238675.59 rows=20775000 width=195)
        Hash Cond: (a.col = b.id)
        ->  Seq Scan on test a  (cost=0.00..3870.00 rows=200000 width=41)
        ->  Hash  (cost=675.75..675.75 rows=20775 width=154)
              ->  Seq Scan on test_1 b  (cost=0.00..675.75 rows=20775 width=154)
Total runtime: 955.638 ms
(9 rows)
Enter fullscreen mode Exit fullscreen mode

The execution time has improved from 1562.160 ms to 955.638 ms. The execution plan shows that the overhead caused by data interactions and network communication between datanodes has been eliminated. Each datanode performs the join query only within its own node, and the results are then returned to the upper-level coordinator node.

Conclusion

This example demonstrates how to utilize the join condition and adjust the sharding keys of the tables to eliminate the overhead caused by data interactions and network communication between datanodes. This optimization results in a more efficient execution plan.

. . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . .
Terabox Video Player