How to Efficiently Monitor PostgreSQL with Supabase: Real-Time Data, Event Triggers, and Minimizing Overhead
Hello Developers,
Have you ever wondered how to continuously monitor your database to perform actions within your application? Whether it's displaying real-time data to users or automatically triggering events based on data changes (like notifications), you might be curious about the best approach to achieve this. Is polling the database constantly for updates the best solution? This method can be a major headache and lead to unnecessary overhead on both your application and database (I've seen companies struggle with this and end up reworking their systems after a while!).
The key is to minimize I/O operations, as processing costs can be quite high. Efficiently managing I/O operations is crucial.
There are simple solutions available that require minimal setup and provide almost everything you need out-of-the-box. One excellent example is Firebase Realtime Database, which abstracts many complexities by automatically syncing data changes to your client. It's highly recommended, especially for mobile applications. However, if you're already using a different database and migrating to Firebase isn't feasible, or if you prefer to stick with PostgreSQL, there are alternative solutions available.
Today, I'll demonstrate how to monitor your PostgreSQL database using a fascinating open-source library called Supabase, which is often considered an open-source alternative to Firebase. Supabase offers a variety of useful modules, including Fire Storage, Realtime Database, Authentication/Authorization, Functions, Auto-generated APIs, and more. For this example, we'll use the realtime module.
Before diving into the code, it’s important to understand the underlying concepts and mechanics of Supabase. This project is built with Elixir using the Phoenix framework and utilizes logical replication to listen for and replicate changes in your PostgreSQL database.
What is Logical Replication and WAL?
In essence, logical replication is a method for replicating data, including full datasets or changes, to another location. It typically involves a publish-subscribe model where the publisher gathers changes from one or more tables (publication) and the subscriber listens for and collects these changes.
In the context of databases, logical replication refers to replicating data between two or more databases. It captures changes (e.g., INSERTS, UPDATES, DELETES) at the database level and sends these updates to target databases.
This is usually done via Write-Ahead Logging (WAL), which logs changes before they are executed. WAL ensures data integrity, atomicity, and durability, and is widely used in event-driven systems. In a database context, any change to data files is first logged before being applied to the database.
Implementation of Supabase With Postgres
Now, let’s focus on implementing a small project to test Supabase and monitor our database in real-time. We’ll create an application that lists all services according to their status (pending, in progress, and completed) without repeatedly querying the database for updates. Our goal is to make the application real-time, updating instantly with any changes. This approach can significantly reduce the load on your database in larger projects.
In essence, we will use Supabase Realtime to listen for all modifications in our PostgreSQL database and transmit these updates to our NodeJS application. This application will handle the modified data and push updates to clients via WebSocket.
I will outline the main components of the project and provide an explanation of each. At the end, I will share a link to the repository where you can view the complete project code.
Project Structure: An Overview
This will be our project structure. Below are details of the main components:
-
/docker-compose.yml
: This file contains the Docker Compose configuration, defining two containers. One container runs Supabase Realtime, responsible for monitoring changes in the database. The other container hosts our PostgreSQL database.
version: '3'
services:
realtime:
image: supabase/realtime:v1.0.0
ports:
- "4000:4000"
environment:
DB_HOST: postgres-realtime_db_1
DB_NAME: postgres
DB_USER: postgres
DB_PASSWORD: postgres
DB_PORT: 5432
PORT: 4000
# JWT_SECRET: SOMETHING_SUPER_SECRET
SECURE_CHANNELS: 'false'
networks:
- realtime-network
depends_on:
- db
db:
build: database/.
restart: always
environment:
POSTGRES_PASSWORD: postgres
ports:
- "5432:5432"
networks:
- realtime-network
networks:
realtime-network:
driver: bridge
-
/database
: This directory contains the Docker configurations for our database as well as the settings required for real-time monitoring.
Here’s the structure of our database:
-
/database/Dockerfile
: This Dockerfile is invoked by thedocker-compose.yml
and uses a PostgreSQL image. It runs a script located at/database/scripts/00-init.sql
to set up the database tables and apply necessary configurations to enable Supabase to monitor changes in the database. -
/database/scripts/00-init.sql
: This SQL script contains the code to create the database tables and configure the database so that Supabase can effectively listen for changes.
Script for creating tables and inserting data:
CREATE TABLE public.status (
id bigint GENERATED BY DEFAULT AS IDENTITY PRIMARY KEY,
name VARCHAR(255
);
INSERT INTO
public.status (name)
VALUES
('PENDDING'),
('IN_PROGRESS'),
('CONCLUDED');
CREATE TABLE public.service (
id bigint GENERATED BY DEFAULT AS IDENTITY PRIMARY KEY,
name VARCHAR(255) NOT NULL,
status_id bigint REFERENCES public.status NOT NULL
);
INSERT INTO
public.service (name, status_id)
VALUES
('Reboque', 1),
('Bateria', 3),
('Pane Seca', 1),
('Troca de Luz', 2);
Configuring PostgreSQL for Supabase Realtime: Setting Up WAL for Change Capture
Supabase uses Write-Ahead Logging (WAL) to capture modifications in our database. Therefore, we need to configure certain settings in PostgreSQL to enable Supabase Realtime to work effectively.
-- adds information necessary to support logical decoding.
-- Each level includes the information logged at all lower levels.
ALTER SYSTEM SET wal_level='logical';
-- Specifies the maximum number of concurrent connections from standby servers or streaming base backup clients
-- (i.e., the maximum number of simultaneously running WAL sender processes).
-- 10 already is the default value
-- RECOMMENDATIONS: If you are replicating, you want to set this to the maximum number of standby servers you mighy possibly have
-- Performance impact when set above zero, but no additional penalty for setting it higher.s
ALTER SYSTEM SET max_wal_senders='10';
-- Specifies the maximum number of replication slots (see streaming-replication-slots) that the server can support
-- 10 already is the default value
ALTER SYSTEM SET max_replication_slots='10';
-- Create the Replication publication
CREATE PUBLICATION supabase_realtime FOR ALL TABLES;
-- Send the previus values
ALTER TABLE public.service REPLICA IDENTITY FULL;
ALTER TABLE public.status REPLICA IDENTITY FULL;
All these configurations are necessary and are tailored to the project we will create. For instance, they involve setting up the specific tables for our project. In our final project, I have included these configurations and the table creation script to be executed automatically when starting our Docker containers.
-
/server
: This directory contains the code for our project, which is responsible for receiving updates identified by Supabase Realtime and sending them to the client via WebSocket.
Inside this project, everything is organized according to its specific responsibilities.
-
/server/services/
: This directory will include all the services used in our project.- Database: Responsible for executing queries on our database. This service will be utilized primarily at the project startup to fetch all initial data. Afterward, it will only handle updates for any changes that occur.
const pg = require('pg');
class Database {
pgPool;
pgClient;
constructor() {
this.pgPool = new pg.Pool({
host: '0.0.0.0',
user: 'postgres',
password: 'postgres',
database: 'postgres',
port: 5432,
});
}
async connect() {
try {
this.pgClient = await this.pgPool.connect();
} catch (error) {
console.log('Connection error' , error)
}
}
async query({ text, params }) {
const res = await this.pgClient.query(text, params);
return res;
}
disconnect() {
this.pgClient.release();
}
async disconnectPool() {
await this.pgPool.end();
}
}
module.exports = Database;
-
DatabaseListener: Responsible for listening to changes identified by Supabase in our database. We will use the
@supabase/realtime-js
library to facilitate this.
const { RealtimeClient } = require('@supabase/realtime-js'); // "@supabase/realtime-js": "^1.3.3",
class DatabaseListener {
// creating the connection with our supabase realtime, it's running on docker at port 4000
socket = new RealtimeClient(process.env.REALTIME_URL || 'ws://localhost:4000/socket');
constructor() {
// when start our DatabaseListner, we'll connect and listen our connection
this.socket.connect();
this.socket.onOpen(() => console.log('Socket opened.'));
this.socket.onClose(() => console.log('Socket closed.'));
this.socket.onError((e) => console.log('Socket error', e.message));
}
// generic method to listen our database by channel
_on(eventName, callback, channel = '*') {
const databaseChanges = this.socket.channel(`realtime:${channel}`);
databaseChanges.on(eventName, (e) => {
console.log('EVENT', e);
callback(e);
});
databaseChanges.subscribe();
}
// method to listen all modifications from our database (update, insert, delete') by channel
onAll(callback, channel = '*') {
this._on('*', callback, channel);
}
// method to listen updates events in our database by channel
onUpdate(callback, channel = '*') {
this._on('UPDATE', callback, channel);
}
// method to listen inserts events in our database by channel
onInsert(callback, channel = '*') {
this._on('INSERT', callback, channel);
}
// method to listen deletes events in our database by channel
onDelete(callback, channel = '*') {
this._on('DELETE', callback, channel);
}
}
module.exports = DatabaseListener;
- Socket: Responsible for managing the WebSocket connection with our client (frontend in our case) to transmit database modifications.
const socket = require('socket.io'); // "socket.io": "^4.4.0"
class Socket {
io = null;
constructor(server) {
// starting socket server
this.io = socket(server);
}
getIo() {
// method to get the socket connection
return this.io;
}
// method listen the socket connection
onConnection(callback) {
console.log(`Connected socket ${socket.id}`);
this.io.on('connection', (socket) => {
callback(socket);
});
}
// emit an event with some data to our clients
sendToAllSubscribers(eventName, data) {
this.io.emit(eventName, data);
}
}
module.exports = Socket;
/server/models/
: This directory contains all the models for interacting with our database tables. Each model is organized by table, and since we are only interacting with the 'Services' table, we will have just this model.
- Service: This file will establish a connection with our database to fetch all services and their statuses. It will be called only at the start of the application to retrieve all initial data. After that, it will focus solely on listening for changes, thus avoiding multiple database queries.
const Database = require('../../services/Database');
class Service {
// Don't allow instance this class, it'll be possible just use static methods
constructor() {
throw new Error('It is not instantiable')
}
// method to fetch all services in our database. We'll use this method when our server start
static async findAll() {
let servicesFound = [];
try {
const database = new Database();
await database.connect();
const result = await database.query({
text: 'SELECT * FROM public.service',
params: []
});
servicesFound = result?.rows || [];
} catch (error) {
console.log(error);
}
return servicesFound;
}
}
module.exports = Service;
/server/controllers
: This directory will contain the business logic required to make the project work. It will utilize the models and services from the project. In our case, there will be only one controller.
- ServiceController: This controller will fetch all 'Services' from our database and send them to the client via WebSocket. It will also listen for any changes in the database and, if any are detected, will send these updates to the clients via WebSocket.
const DatabaseListener = require('../../services/DatabaseListener');
const Service = require('../../models/Service');
// variable to help us to map our services by status
const helperToServiceMapStatus = {
1: 'servicesPendding',
2: 'servicesInProgress',
3: 'servicesConcluded'
};
class ServiceController {
// instance of ous Database Listener
static databaseChanges = new DatabaseListener();
// arrays that our services in database will be saved by status. It'll be saved in memory and it'll send for our clients by socket
// warn: all services will be saved in memory, so you should be alert about the size of these list. Maybe in a production it's better you save it in a kind of database, like redis
static servicesDto = {
servicesPendding: [],
servicesInProgress: [],
servicesConcluded: []
};
// don't allow instance this class, it'll be just allowed use static methods
constructor() {
throw new Error('It is not instantiable')
}
// method to fetch all service from database, separe it by status and send to our clients by socket
static async fetchServicesFromDatabaseAndSend(socket) {
const servicesFound = await Service.findAll();
servicesFound.map(service => {
const serviceKey = helperToServiceMapStatus[service.status_id];
ServiceController.servicesDto[serviceKey].push(service);
});
socket.onConnection((_socket) => {
console.log(`Socket connection `, _socket.id);
socket.sendToAllSubscribers('receivedMessage', ServiceController.servicesDto);
})
}
// listing all service status update in our database, then send it to our clients by socket
static listenAndSendServicesStatus(socket) {
// listing updates in our table 'service' in the public schema with status_id equals 1
ServiceController.databaseChanges.onUpdate((e) => {
ServiceController._managementState(e.old_record, e.record);
socket.sendToAllSubscribers('receivedMessage', ServiceController.servicesDto);
}, 'public:service:status_id=eq.1');
// listing updates in our table 'service' in the public schema with status_id equals 2
ServiceController.databaseChanges.onUpdate((e) => {
ServiceController._managementState(e.old_record, e.record);
socket.sendToAllSubscribers('receivedMessage', ServiceController.servicesDto);
}, 'public:service:status_id=eq.2');
// listing updates in our table 'service' in the public schema with status_id equals 3
ServiceController.databaseChanges.onUpdate((e) => {
ServiceController._managementState(e.old_record, e.record);
socket.sendToAllSubscribers('receivedMessage', ServiceController.servicesDto);
}, 'public:service:status_id=eq.3');
}
// method to help us to manage our data that we're sending our client
static _managementState(oldRecord, newRecord) {
const oldService = helperToServiceMapStatus[oldRecord.status_id];
const newService = helperToServiceMapStatus[newRecord.status_id];
ServiceController.servicesDto[oldService] = ServiceController.servicesDto[oldService].filter(_service => (_service.id != oldRecord.id));
ServiceController.servicesDto[newService].push(newRecord);
}
}
module.exports = ServiceController;
-
/server/src/server.js
: Será onde iremos configurar nosso servidor para rodar o nosso frontend utilizando express e além disso iniciar nosso projeto backend.
const express = require('express'); //"express": "^4.17.1",
const path = require('path');
const app = express();
const server = require('http').createServer(app);
const Socket = require('../services/Socket');
const ServiceController = require('../controllers/ServiceController');
app.use(express.static(path.join(__dirname, '../../public')));
app.set('views', path.join(__dirname, '../../public'));
app.engine('html', require('ejs').renderFile);
app.set('view engine', 'html');
app.use('/public', (req, res) => {
res.render('index.html');
});
(async () => {
// starting our socket
const socket = new Socket(server);
// fetching our all database data and sending to our clients by socket
ServiceController.fetchServicesFromDatabaseAndSend(socket);
// listing our database changes and sending it to our clients by socket
ServiceController.listenAndSendServicesStatus(socket);
})();
server.listen(8181);
/public/index.html
: This is our frontend project, featuring a simple layout that lists the services from our database according to their status. It will establish a WebSocket connection with our backend to receive status updates for the services.
<!DOCTYPE html>
<html lang="en">
<head>
<meta charset="UTF-8">
<meta name="viewport" content="width=device-width, initial-scale=1.0">
<title>Services</title>
</head>
<body>
<h2>Pendding Services</h2>
<ul id="pendding"></ul>
<br><br>
<h2>In Progress Services</h2>
<ul id="inProgress"></ul>
<br><br>
<h2>Concluded Services</h2>
<ul id="concluded"></ul>
<br><br>
</body>
<script type="module">
import { io } from "https://cdn.socket.io/4.4.0/socket.io.esm.min.js";
const socket = io('http://localhost:8181');
const htmlPendding = document.getElementById('pendding');
const htmlInProgress = document.getElementById('inProgress');
const htmlConcluded = document.getElementById('concluded');
socket.emit('sendMessage', {
text: 'working'
});
// listing our socket to receive our data
socket.on('receivedMessage', (data) => {
htmlPendding.innerHTML = '';
htmlInProgress.innerHTML = '';
htmlConcluded.innerHTML = '';
// putting our socket data in its specific group
const { servicesPendding, servicesInProgress, servicesConcluded } = data;
servicesPendding.map(service => {
console.log('service', service);
const item = document.createElement('li');
item.textContent = service.name;
htmlPendding.appendChild(item);
window.scrollTo(0, document.body.scrollHeight);
});
servicesInProgress.map(service => {
const item = document.createElement('li');
item.textContent = service.name;
htmlInProgress.appendChild(item);
window.scrollTo(0, document.body.scrollHeight);
});
servicesConcluded.map(service => {
console.log('service concluded', service);
const item = document.createElement('li');
item.textContent = service.name;
htmlConcluded.appendChild(item);
window.scrollTo(0, document.body.scrollHeight);
});
});
</script>
</html>
/package.json
:
{
"name": "realtime-pg",
"version": "1.0.0",
"main": "index.js",
"license": "MIT",
"scripts": {
"start": "forever -w --minUptime 2000 --spinSleepTime 2000 server/src/server.js",
"start-node": "node server/src/server.js"
},
"dependencies": {
"@supabase/realtime-js": "^1.3.3",
"ejs": "^3.1.6",
"express": "^4.17.1",
"forever": "^4.0.1",
"pg": "^8.7.1",
"socket.io": "^4.4.0"
}
}
How to Run the Project:
- Ensure that Docker and Docker Compose are installed on your machine.
- Navigate to the root directory of the project and run the following command:
docker-compose up --build
. This will build the Docker images and start the containers, including Supabase and PostgreSQL. - Install the dependencies and start the project by running:
npm install && npm run start-node
. - Modify the
status_id
column in thepublic.service
table to see change logs with both the old and new values in the terminal from step 3. To view the web page with automatic HTML updates, visit: http://localhost:8181.
Github:
This project offers an intriguing approach to real-time application development, allowing you to trigger various events based on changes in your PostgreSQL database. For instance, you can set up push notifications to alert clients whenever specific changes occur in the database. By creating a dedicated project for real-time actions, you can keep your API or service focused, clean, and straightforward.
I hope this content has been helpful. If you know of other solutions or have experiences to share, please let us know! Let's exchange ideas and learn together. ;)
For any questions, feel free to reach out to me on LinkedIn or Instagram:
There, I share insights about the daily life of a Software Engineer/Entrepreneur.
Thank you for your time, and happy coding! 😊
Best regards,
Vitor Braggion