In this article, we experiment with triggers as a tool for keeping aggregated data consistent when using Active Record and your favorite SQL database. Instead of using sophisticated tools such as ElasticSearch for filtering and searching, we will demonstrate a simple approach that achieves the same result with some out-of-the-box database features. As a bonus, learn how to avoid nasty race conditions!
You can find all the examples in the gist.
Sometimes you need to sort and filter records in the database by some aggregated values. For instance, you might be building a paginated list of users in an admin panel, and you want to implement filtering by the number of orders and the total amount that users have spent on them. There are several tools like ElasticSearch, which are good at filtering by aggregates, but setting up a massive search engine and all required infrastructure to process a couple of columns sounds like an overkill. Let's find a more straightforward way!
Trigger finger
Imagine the following data model:
ActiveRecord::Schema.define do
create_table "orders", force: :cascade do |t|
t.bigint "user_id", null: false
t.decimal "amount"
t.datetime "created_at", precision: 6, null: false
t.datetime "updated_at", precision: 6, null: false
t.index ["user_id"], name: "index_orders_on_user_id"
end
create_table "users", force: :cascade do |t|
t.datetime "created_at", precision: 6, null: false
t.datetime "updated_at", precision: 6, null: false
end
add_foreign_key "orders", "users"
end
class User < ActiveRecord::Base
has_many :orders
end
class Order < ActiveRecord::Base
belongs_to :user
end
Let's see how we can filter and paginate users by their order total. We can easily achieve our goal with the vanilla SQL statement, but we will immediately run into performance issues. To demonstrate, let's fill the database with 10,000 users and 100,000 orders and use explain (you can find a single file implementation of this example in this gist):
User.insert_all(10_000.times.map { { created_at: Time.now, updated_at: Time.now } })
Order.insert_all(
10_000.times.map do
{
user_id: rand(1...1000),
amount: rand(1000) / 10.0,
created_at: Time.now,
updated_at: Time.now
}
end
)
ActiveRecord::Base.connection.execute <<~SQL
EXPLAIN ANALYZE SELECT users.id, SUM(orders.amount), COUNT(orders.id)
FROM users JOIN orders ON orders.user_id = users.id
GROUP BY users.id
HAVING SUM(orders.amount) > 100 AND COUNT(orders.id) > 1
ORDER BY SUM(orders.amount)
LIMIT 50
SQL
This is the result you might see:
Limit (cost=3206.16..3206.29 rows=50 width=48) (actual time=59.737..59.746 rows=50 loops=1)
-> Sort (cost=3206.16..3208.95 rows=1116 width=48) (actual time=59.736..59.739 rows=50 loops=1)
Sort Key: (sum(orders.amount))
Sort Method: top-N heapsort Memory: 31kB
-> HashAggregate (cost=2968.13..3169.09 rows=1116 width=48) (actual time=59.103..59.452 rows=1000 loops=1)
Group Key: users.id
Filter: ((sum(orders.amount) > '100'::numeric) AND (count(orders.id) > 1))
-> Hash Join (cost=290.08..2050.73 rows=73392 width=48) (actual time=2.793..37.022 rows=100000 loops=1)
Hash Cond: (orders.user_id = users.id)
-> Seq Scan on orders (cost=0.00..1567.92 rows=73392 width=48) (actual time=0.011..11.650 rows=100000 loops=1)
-> Hash (cost=164.48..164.48 rows=10048 width=8) (actual time=2.760..2.760 rows=10000 loops=1)
Buckets: 16384 Batches: 1 Memory Usage: 519kB
-> Seq Scan on users (cost=0.00..164.48 rows=10048 width=8) (actual time=0.006..1.220 rows=10000 loops=1)
Planning Time: 0.237 ms
Execution Time: 64.151 ms
With a bigger database, it will take even more time! We need to find a better solution, the one that scales. Let's denormalize our database and store orders_amount
in the separate user_stats
table:
class CreateUserStats < ActiveRecord::Migration[6.0]
def change
create_table :user_stats do |t|
t.integer :user_id, null: false, foreign_key: true
t.decimal :orders_amount
t.integer :orders_count
t.index :user_id, unique: true
end
end
end
Now we should decide how to keep orders_count
and orders_amount
in sync. ActiveRecord callbacks do not look like a proper place to handle such operations, because we want to have our stats updated even when data is changed with a plain SQL (e.g., in the migration). There is a built-in counter_cache
option for the belongs_to
association, but it cannot help us with orders_amount
. Triggers to the rescue!
A trigger is a function that is automatically invoked when INSERT, UPDATE, or DELETE is performed on the table.
To work with triggers from our Rails app, we can use gems like hair_trigger, fx, or even write them by hand. In this example, we use hair_trigger
, which can generate migrations for trigger updates using only the latest version of the SQL procedure.
Heads up! There is a known hair_trigger issue with Rails 6 and Zeitwerk, if you face it–feel free to use my fork for now. Don't forget to switch back when the fix is out!
Let's add our trigger to the Order
model. We want to perform the UPSERT: if there is no row with the matching user_id
in the user_stats
table–we add a new row, otherwise–update the existing one (make sure there is a unique
constraint on the user_id
column):
class Order < ActiveRecord::Base
belongs_to :user
trigger.after(:insert) do
<<~SQL
INSERT INTO user_stats (user_id, orders_amount, orders_count)
SELECT
NEW.user_id as user_id,
SUM(orders.amount) as orders_amount,
COUNT(orders.id) as orders_count
FROM orders WHERE orders.user_id = NEW.user_id
ON CONFLICT (user_id) DO UPDATE
SET
orders_amount = EXCLUDED.orders_amount,
orders_count = EXCLUDED.orders_count;
SQL
end
end
Now we should generate the migration with rake db:generate_trigger_migration
, run migrations with rails db:migrate
, and run the application.
Off to the races
It might seem to be working, but what if we try to insert multiple orders in parallel? (you can run the following code as a rake task or check my implementation here)
user = User.create
threads = []
4.times do
threads << Thread.new(user.id) do |user_id|
user = User.find(user_id)
user.orders.create(amount: rand(1000) / 10.0)
end
end
threads.each(&:join)
inconsistent_stats = UserStat.joins(user: :orders)
.where(user_id: user.id)
.having("user_stats.orders_amount <> SUM(orders.amount)")
.group("user_stats.id")
if inconsistent_stats.any?
calculated_amount = UserStat.find_by(user: user).orders_amount
real_amount = Order.where(user: user).sum(:amount).to_f
puts
puts "Race condition detected:"
puts "calculated amount: #{calculated_amount}"
puts "real amount: #{real_amount}."
else
puts
puts "Data is consistent."
end
There is a huge chance that there will be a race condition, but why? The problem is that the trigger runs inside the current transaction, and the default isolation level is READ COMMITTED
, which cannot handle race conditions.
PostgreSQL supports four levels of transaction isolation–READ UNCOMMITTED, READ COMMITTED, REPEATABLE READ and SERIALIZABLE
The obvious solution is to use a stricter SERIALIZABLE
isolation level, but, unfortunately, an isolation level cannot be changed inside a running transaction. Creating a new explicit transaction every time we work with orders does not sound right either, so let's try another approach for making sure our triggers are always executed in sequence–advisory locks.
The only thing we need to change is to add lock PERFORM pg_advisory_xact_lock(NEW.user_id);
at the beginning of our procedure code:
class Order < ActiveRecord::Base
belongs_to :user
trigger.after(:insert) do
<<~SQL
PERFORM pg_advisory_xact_lock(NEW.user_id);
INSERT INTO user_stats (user_id, orders_amount, orders_count)
SELECT
NEW.user_id as user_id,
SUM(orders.amount) as orders_amount,
COUNT(orders.id) as orders_count
FROM orders WHERE orders.user_id = NEW.user_id
ON CONFLICT (user_id) DO UPDATE
SET
orders_amount = EXCLUDED.orders_amount,
orders_count = EXCLUDED.orders_count;
SQL
end
end
It's way faster! The updated version of the code is here, if you run it, you'll see that race condition is gone, and the app can handle parallel requests. Let's add the index to the orders_amount
column in the user_stats
table, change the query, and compare the performance:
EXPLAIN ANALYZE SELECT user_id, orders_amount, orders_count
FROM user_stats
WHERE orders_amount > 100 AND orders_count > 1
ORDER BY orders_amount
LIMIT 50
Limit (cost=0.29..22.99 rows=50 width=40) (actual time=0.059..11.241 rows=50 loops=1)
-> Index Scan using index_user_stats_on_orders_amount on user_stats (cost=0.29..3438.69 rows=7573 width=40) (actual time=0.058..11.2 rows=50 loops=1)
Index Cond: (orders_amount > '100'::numeric)
Filter: (orders_count > 1)
Planning Time: 0.105 ms
Execution Time: 11.272 ms
Lock-free alternative
There is a way (suggested by Sergey Ponomarev) to achieve the same result without locks and make it work faster—use deltas (you can find the full implementation here):
class Order < ActiveRecord::Base
belongs_to :user
trigger.after(:insert) do
<<~SQL
INSERT INTO user_stats (user_id, orders_amount, orders_count)
SELECT
NEW.user_id as user_id,
NEW.amount as orders_amount,
1 as orders_count
ON CONFLICT (user_id) DO UPDATE
SET
orders_amount = user_stats.orders_amount + EXCLUDED.orders_amount,
orders_count = user_stats.orders_count + EXCLUDED.orders_count;
SQL
end
end
The trick here is not to use any subqueries, so race conditions would not be possible. As a bonus, you'll get better performance when inserting new records. This approach might come handy for simple cases like the one described in this article, but when you are dealing with more complex logic, you might want to resort to locks (imagine that orders have statuses, we need to cache counts of orders in each status and orders can be updated).
Loop instead of UPSERT
In previous examples, we use UPSERT, which was introduced in PostgreSQL 9.5, but what if we use the older version? Let's review how the trigger works again: it tries to insert a new row into the user_stats
table and, if a conflict happens, it updates the existing row. In the real-world application there will be conflicts most of the time (to be precise–insert happens only once for each user). We can use this fact and rewrite our trigger in the following way (the example of a loop inside the trigger is here):
class Order < ActiveRecord::Base
belongs_to :user
trigger.after(:insert) do
<<~SQL
<<insert_update>>
LOOP
UPDATE user_stats
SET orders_count = orders_count + 1,
orders_amount = orders_amount + NEW.amount
WHERE user_id = NEW.user_id;
EXIT insert_update WHEN FOUND;
BEGIN
INSERT INTO user_stats (
user_id, orders_amount, orders_count
) VALUES (
NEW.user_id, 1, NEW.amount
);
EXIT insert_update;
EXCEPTION
WHEN UNIQUE_VIOLATION THEN
-- do nothing
END;
END LOOP insert_update;
SQL
end
end
In this case, we have inverted our logic: trigger tries to update the existing row, and if it misses—the new row gets inserted.
Working with aggregated data is hard: when you have a lot of counter (and other kinds of) caches, it makes sense to use a special tool for that. However, for simple cases, we can stay with good old database triggers: when configured properly, they are quite performant!