I have two tables with a hundred thousand rows, each having an id
on which I can join them, a random value
, and a flag
that is true
for 10% of rows.
drop table if exists t1, t2 cascade;
create table t1 ( id bigint primary key , value float, flag boolean);
create table t2 ( id bigint primary key , value float, flag boolean);
create index t1_value on t1(value asc, flag, id);
insert into t1
select n, random(), n%10=1 from generate_series(1,100000) n;
insert into t2
select n, random(), n%10=2 from generate_series(1,100000) n;
Nested Loop
I executed a query that selects 10% of rows from table t1, joins it with table t2, and further filters the results to exclude rows that have a flag set in either table:
yugabyte=# explain ( costs off, analyze, buffers, dist)
select * from t1 join t2 using(id)
where t1.value>0.9
and ( t1.flag or t2.flag)
;
QUERY PLAN
-------------------------------------------------------------------------------------------
Nested Loop (actual time=3.681..4713.204 rows=1946 loops=1)
-> Index Only Scan using t1_value on t1 (actual time=1.657..18.561 rows=10072 loops=1)
Index Cond: (value > '0.9'::double precision)
Heap Fetches: 0
Storage Index Read Requests: 10
Storage Index Read Execution Time: 1.546 ms
-> Index Scan using t2_pkey on t2 (actual time=0.451..0.451 rows=0 loops=10072)
Index Cond: (id = t1.id)
Filter: (t1.flag OR flag)
Rows Removed by Filter: 1
Storage Table Read Requests: 1
Storage Table Read Execution Time: 0.408 ms
Planning Time: 11.876 ms
Execution Time: 4714.614 ms
Storage Read Requests: 10082
Storage Read Execution Time: 4115.495 ms
Time: 4745.950 ms (00:04.746)
This has read only 10% of rows from the outer table, thanks to the index on the outer row, and not more than that from the inner row, thanks to the Nested Loop. However the response time is high, nearly five seconds, because of this Nested Loop which accesses to the inner table for each row from the outer table: loops=9854
. This results to nearly ten thousand of read requests which takes too long in a Distributed SQL database.
I want my query to be scalable when the table grow and Nested Loop is a good solution to avoid a full scan on the inner table.
Batched Nested Loop
YugabyteDB optimizes this with Batched Nested Loops. I can enable it by setting a batch size, but the result is the same:
yugabyte=# explain ( costs off, analyze, buffers, dist)
/*+ Set(yb_bnl_batch_size 1024) */
select * from t1 join t2 using(id)
where t1.value>0.9
and ( t1.flag or t2.flag)
;
QUERY PLAN
-------------------------------------------------------------------------------------------
Nested Loop (actual time=3.387..4725.424 rows=1946 loops=1)
-> Index Only Scan using t1_value on t1 (actual time=2.132..19.240 rows=10072 loops=1)
Index Cond: (value > '0.9'::double precision)
Heap Fetches: 0
Storage Index Read Requests: 10
Storage Index Read Execution Time: 1.721 ms
-> Index Scan using t2_pkey on t2 (actual time=0.452..0.452 rows=0 loops=10072)
Index Cond: (id = t1.id)
Filter: (t1.flag OR flag)
Rows Removed by Filter: 1
Storage Table Read Requests: 1
Storage Table Read Execution Time: 0.410 ms
Planning Time: 0.209 ms
Execution Time: 4726.803 ms
Storage Read Requests: 10082
Storage Read Execution Time: 4130.211 ms
Time: 4740.531 ms (00:04.741)
Batch Nested Loop was not possible with this query as it still uses Nested Loop
rather than YB Batched Nested Loop Join
.
When this happen, look at the inner join condition. In this case there's a filter (t1.flag OR t2.flag)
that has been pushed down to the inner scan but it cannot be applied in batch as it requires more columns from the outer table than the ones used for the join condition.
When this happens, you may have to re-write the query so that this filter is not pushed down, or is split in two to be pushed down.
Materialized WITH clause (CTE)
A materialized WITH clause (also called Common Table Expression) can prevent this push down while keeping the filter expression:
yugabyte=# explain ( costs off, analyze, buffers, dist)
/*+ Set(yb_bnl_batch_size 1024) */
with bnl as (
select
id, t1.value t1_value, t2.value t2_value
,t1.flag t1_flag, t2.flag t2_flag
from t1 join t2 using(id)
where t1.value>0.9
) select * from bnl
where ( t1_flag or t2_flag)
;
QUERY PLAN
--------------------------------------------------------------------------------------------------
CTE Scan on bnl (actual time=6.585..69.199 rows=1946 loops=1)
Filter: (t1_flag OR t2_flag)
Rows Removed by Filter: 8126
CTE bnl
-> YB Batched Nested Loop Join (actual time=6.580..64.171 rows=10072 loops=1)
Join Filter: (t1.id = t2.id)
-> Index Only Scan using t1_value on t1 (actual time=1.484..5.268 rows=10072 loops=1)
Index Cond: (value > '0.9'::double precision)
Heap Fetches: 0
Storage Index Read Requests: 10
Storage Index Read Execution Time: 1.370 ms
-> Index Scan using t2_pkey on t2 (actual time=4.058..4.422 rows=1007 loops=10)
Index Cond: (id = ANY (ARRAY[t1.id, $1, $2, ..., $1023]))
Storage Table Read Requests: 1
Storage Table Read Execution Time: 3.219 ms
Planning Time: 0.745 ms
Execution Time: 70.210 ms
Storage Read Requests: 20
Storage Read Execution Time: 33.558 ms
Time: 84.881 ms
Here, the join condition only was pushed down, and batched, to retrieve the 10% of rows in ten loops (rows=985 loops=10
), with a total of ten Read Requests
for each table. The filter on the flags was applied on this result to reduce it to rows=1978
.
OR expansion to UNION ALL
The previous query rewrite optimized the join, but there are still 90% of rows that are fetched from the storage and discarded by a single process applying the Filter
, which is visible by Rows Removed by Filter
. I can rewrite the query to pushdown each filter into separate subqueries, and concatenate the result with a UNION instead of OR:
yugabyte=# explain ( costs off, analyze, buffers, dist)
/*+ Set(yb_bnl_batch_size 1024) */
select * from t1 join t2 using(id)
where t1.value>0.9 and t1.flag -----------
union all -- |
select * from t1 join t2 using(id) -- v
where t1.value>0.9 and t2.flag and (not t1.flag or t1.flag is null)
;
QUERY PLAN
----------------------------------------------------------------------------------------------------
Append (actual time=12.861..62.439 rows=1946 loops=1)
-> YB Batched Nested Loop Join (actual time=12.859..13.872 rows=972 loops=1)
Join Filter: (t1.id = t2.id)
-> Index Only Scan using t1_value on t1 (actual time=7.961..8.277 rows=972 loops=1)
Index Cond: (value > '0.9'::double precision)
Remote Filter: flag
Heap Fetches: 0
Storage Index Read Requests: 1
Storage Index Read Execution Time: 7.805 ms
-> Index Scan using t2_pkey on t2 (actual time=4.041..4.386 rows=972 loops=1)
Index Cond: (id = ANY (ARRAY[t1.id, $1, $2, ..., $1023]))
Storage Table Read Requests: 1
Storage Table Read Execution Time: 3.092 ms
-> YB Batched Nested Loop Join (actual time=6.770..48.138 rows=974 loops=1)
Join Filter: (t1_1.id = t2_1.id)
-> Index Only Scan using t1_value on t1 t1_1 (actual time=1.679..5.014 rows=9100 loops=1)
Index Cond: (value > '0.9'::double precision)
Remote Filter: ((NOT flag) OR (flag IS NULL))
Heap Fetches: 0
Storage Index Read Requests: 9
Storage Index Read Execution Time: 1.535 ms
-> Index Scan using t2_pkey on t2 t2_1 (actual time=3.934..3.977 rows=108 loops=9)
Index Cond: (id = ANY (ARRAY[t1_1.id, $1025, $1026, ..., $2047]))
Remote Filter: flag
Storage Table Read Requests: 1
Storage Table Read Execution Time: 3.123 ms
Planning Time: 1.301 ms
Execution Time: 63.488 ms
Storage Read Requests: 20
Storage Read Execution Time: 40.539 ms
Time: 77.983 ms
Now the additional filter on the flags was pushed down to each index scan. The first branch of the concatenation with UNION ALL applies one of it (t1.flag
is true) and the second branch the other (t2.flag
equals to true). To avoid duplicate rows, the second branch adds the negation of the first one (t1.flag
is not true) and, if this flag can contain null, one of the branches must add them (t1.flag is null
) to the result.
In addition to the join key, the presence of the flag in the index can further enhance the performance of the query. Since the index was created on (value ASC, flag ASC, id ASC)
, it can do an Index Only Scan, or at least filter all rows before going to the table. The other branch scans the primary key, which is equivalent to an Index Only Scan in YugabyteDB, even if it is not directly visible in the execution plan.
Which one to choose?
The decision to use either the WITH
or UNION
solution depends on several factors such as the selectivity of the predicate, the availability of indexes, and the complexity of the filtering expression. It is worth noting that the cost estimation of each solution is not a trivial task, which makes it difficult for a database optimizer to perform this transformation automatically.