How to solve dual write problem in NestJS?

Kamil Fronczak - Sep 15 - - Dev Community

There are only two hard things in Computer Science: cache invalidation and naming things.
-- Phil Karlton

Well, let's add another one to the list: consistency. In distributed systems or modular monoliths, maintaining data consistency across multiple services or modules can be quite a challenge.

Let's imagine we have an e-commerce application with two services: OrderService and PaymentService. The OrderService is responsible for confirming orders, while the PaymentService handles the payment processing. Here's a simplified example in NestJS:

// order.service.ts
@Injectable()
export class OrderService {
  constructor(
    private readonly eventEmitter: EventEmitter2,
    private readonly orderRepository: OrderRepository,
  ) {}

  async confirmOrder(orderId: string): Promise<void> {
    const order = await this.orderRepository.findOne(orderId);
    order.status = 'CONFIRMED'
    await this.orderRepository.save(order);
    this.eventEmitter.emit('order.confirmed', { orderId });
  }
}

// payment.service.ts
@Injectable()
export class PaymentService {
  constructor(
    @InjectRepository(Payment)
    private readonly paymentRepository: Repository<Payment>,
    private readonly eventEmitter: EventEmitter2,
  ) {
    this.eventEmitter.on('order.confirmed', this.handleOrderConfirmed.bind(this));
  }

 // Listener for order.confirmed
  async handleOrderConfirmed(event: { orderId: string }): Promise<void> {
      // Start payment process
      const payment = new Payment();
      payment.orderId = event.orderId;
      payment.status = 'PROCESSING';
      await this.paymentRepository.save(payment);
      // Further payment logic...
  }
} 
Enter fullscreen mode Exit fullscreen mode

In this scenario, multiple things could go wrong, leading to an inconsistent system:

  1. Order could be saved, but event could not be emitted: The OrderService confirms an order, but due to a crash or code issue, the event is not emitted.

  2. Event emitted, but Listener did not catch: The OrderService emits the event, but the PaymentService fails to catch it because of a crash or a network issue.

  3. Listener Catches Event, But Errors Occur: The PaymentService catches the event but throws an unexpected and unhandled error during payment processing.

These issues can lead to a broken process flow and an inconsistent system state, which will may require manual debugging and pulled hair out of the head.

This is where the dual write problem appears.

But, fortunately, if there is a problem, there are solutions. Let's take at possible options:

Two-Phase Commit (2PC)

https://docs.redhat.com/en/documentation/red_hat_jboss_enterprise_application_platform/5/html/transactions_development_guide/ch17s05

The Two-Phase Commit protocol ensures that all participating services agree to either commit or roll back a transaction, maintaining consistency across the system. The process is divided into two phases: the prepare phase and the commit phase.

  1. Prepare Phase: The coordinator asks all participants if they can commit the transaction. Each participant performs the necessary operations and responds with a vote (yes or no).

  2. Commit Phase: If all participants vote yes, the coordinator instructs them to commit the transaction. If any participant votes no, the coordinator instructs them to roll back.

Cons of Two-Phase Commit (2PC):

  1. Complexity: Implementing 2PC requires significant changes to your system and additional coordination logic.
  2. Performance Overhead: The synchronization between services can introduce latency, especially in high-throughput systems.
  3. Single Point of Failure: The transaction coordinator can become a single point of failure, affecting the entire system's reliability.
  4. Scalability Issues: As the number of participants increases, the protocol can become less efficient and harder to manage.

Debezium

https://debezium.io/documentation/reference/stable/architecture.html<br>

Debezium is a distributed platform that captures row-level changes in your databases so that your applications can see and respond to those changes. It provides change data capture (CDC) functionality, which can be used to ensure consistency.

By integrating Debezium with Kafka, you can ensure that any change to your database is captured and processed reliably, providing an alternative approach to solving the dual write problem.

Here’s a brief overview of how Debezium can be integrated:

  1. Setup Debezium: Configure Debezium connectors to monitor your database tables.
  2. Capture Changes: Debezium captures row-level changes and publishes them to Kafka topics.
  3. Process Events: Consume these Kafka topics in your application to process events consistently.

Cons of Debezium:

  1. Operational Complexity: Setting up and managing Debezium, Kafka, and the necessary connectors can be complex and requires additional infrastructure.

  2. Latency: While Debezium can capture changes in near real-time, there might still be slight delays between the database update and the event being processed.

  3. Resource Consumption: Running Debezium and Kafka can be resource-intensive, impacting overall system performance.

  4. Error Handling: Handling errors and ensuring reliable event processing requires careful design and monitoring to avoid data loss or inconsistencies.

Inbox and Outbox patterns

Image description

The Outbox and Inbox patterns work together to ensure that events are consistently published and processed, even in the face of failures. Here's how they can help:

  1. Outbox Pattern: This pattern ensures atomicity by saving events in a local database table (the outbox) within the same transaction as the business operation. A separate process (Or the same at first attempt) then reads these events from the outbox and publishes them. This decouples the business logic from event publishing, ensuring that events are only published if the transaction succeeds.

  2. Inbox Pattern: The inbox pattern involves recording received events in an inbox table to ensure idempotent processing. This prevents the same event from being processed multiple times and maintains consistency across services, also giving the ability to retry delivery if something went wrong.

When combined, these patterns provide robust solutions to the dual write problem:

  1. Atomicity: By recording events in the outbox as part of the same transaction that modifies the business data, you ensure that either both the data change and the event are committed or neither are.

  2. At least one delivery: You may rest assured, because your listeners will be invoked with events at least once

  3. Decoupling: Services remain decoupled, as the producer service does not directly publish events. Instead, a separate outbox processor handles event publishing.

Cons of Outbox and Inbox patterns

  1. Operational Overhead: Implementing and maintaining outbox and inbox tables and the corresponding processors can add complexity to your system.

  2. Latency: There can be a delay between the business operation and the event processing, depending on how frequently the inbox processor runs.

  3. Resource Consumption: Running additional background jobs for inbox processing can consume extra resources and may impact system performance.

While the outbox and inbox patterns do introduce some additional complexity and potential latency, these cons are generally less significant than those associated with the Two-Phase Commit (2PC) protocol or Debezium, that's why, we will stick with combination of Inbox and Outbox patterns to solve dual write problem.

Implementation

To mitigate some of the operational overhead and latency introduced by the traditional outbox and inbox patterns, you can use the newly created NestJS Inbox Outbox Package from Nestixis. This package aims to streamline the implementation and operation of these patterns, solving some problems differently from the standard approach.

Disclaimer: Nestixis is an organization that I've created recently. The goal is to provide useful packages and give back something useful to community

  1. Immediate Event Delivery: The package mimics EventEmitter2, delivering events to all listeners immediately at first try, reducing latency.

  2. Retry Mechanism: If one or more listeners fail, they will be retried based on a given interval.

  3. Retry Conditions: Listeners will be retried if either the expireAt time has not yet passed or until the listener stops throwing exceptions.

  4. Pessimistic Execution for Scalability: The drivers for MikroORM and TypeORM perform polling and execute events in a pessimistic way, enabling better scaling across multiple services.

  5. Extendability: Due to easy interface to implement interface, it should not be a problem to create custom drivers for e.g. drizzle or prisma.

Inbox pattern used in the packageInbox pattern used in the package

Outbox pattern used in the packageOutbox pattern used in the package

Prerequisites

  • Docker
  • Node.js and npm
  • NestJS CLI

Step 1: Set Up PostgreSQL Database

First, we'll use Docker to set up our PostgreSQL database:

docker run -d \
  --name postgres-container \
  -e POSTGRES_DB=inbox_outbox \
  -e POSTGRES_USER=user \
  -e POSTGRES_PASSWORD=user \
  -p 5432:5432 \
  postgres
Enter fullscreen mode Exit fullscreen mode

This command creates a PostgreSQL container named postgres-container with a database called inbox_outbox.

Step 2: Initialize NestJS Project
Create a new NestJS project:

nest new nestjs-inbox-outbox-example
cd nestjs-inbox-outbox-example
Enter fullscreen mode Exit fullscreen mode

Step 3: Install Required Packages

npm i @nestjs/typeorm typeorm pg @nestixis/nestjs-inbox-outbox @nestixis/nestjs-inbox-outbox-typeorm-driver
Enter fullscreen mode Exit fullscreen mode

Step 4: Create Event and Listener

Create a new file src/new-cat.event.ts:

export class NewCatEvent implements InboxOutboxEvent {
  public readonly name = NewCatEvent.name;

  constructor(
    public readonly catName: string,
    public readonly isFluffy: boolean,
  ) {}
}
Enter fullscreen mode Exit fullscreen mode

Create a listener in src/new-cat.listener.ts:

import { IListener, Listener } from '@nestixis/nestjs-inbox-outbox';
import { NewCatEvent } from './new-cat.event';

@Listener(NewCatEvent.name)
export class NewCatListener implements IListener<NewCatEvent> {
  getName(): string {
    return NewCatListener.name;
  }

  async handle(event: NewCatEvent): Promise<void> {
    console.log(`A new and cute cat appeared with name ${event.catName}`);
  }
}
Enter fullscreen mode Exit fullscreen mode

Step 5: Create Cat Model
Create src/cat.model.ts:

import { Column, Entity, PrimaryGeneratedColumn } from 'typeorm';

@Entity()
export class Cat {
  @PrimaryGeneratedColumn()
  id: number;

  @Column()
  name: string;
}
Enter fullscreen mode Exit fullscreen mode

Step 6: Set Up Database Table
Connect to your PostgreSQL database and execute:

CREATE TABLE "cat" (
    "id" SERIAL PRIMARY KEY,
    "name" VARCHAR NOT NULL
);
Enter fullscreen mode Exit fullscreen mode

Step 7: Configure AppModule
Update src/app.module.ts:

import { InboxOutboxModule } from '@nestixis/nestjs-inbox-outbox';
import {
  InboxOutboxTransportEventMigrations,
  TypeORMDatabaseDriverFactory,
  TypeOrmInboxOutboxTransportEvent,
} from '@nestixis/nestjs-inbox-outbox-typeorm-driver';
import { Module } from '@nestjs/common';
import { TypeOrmModule } from '@nestjs/typeorm';
import { DataSource } from 'typeorm';
import { Cat } from './cat.model';
import { NewCatEvent } from './new-cat.event';
import { NewCatListener } from './new-cat.listener';

@Module({
  imports: [
    TypeOrmModule.forRoot({
      type: 'postgres',
      host: 'localhost',
      port: 5432,
      username: 'user',
      password: 'user',
      database: 'inbox_outbox',
      entities: [TypeOrmInboxOutboxTransportEvent, Cat],
      migrations: [...InboxOutboxTransportEventMigrations],
      logging: true,
      migrationsRun: true,
    }),
    InboxOutboxModule.registerAsync({
      isGlobal: true,
      imports: [TypeOrmModule.forFeature([TypeOrmInboxOutboxTransportEvent])],
      useFactory: (dataSource: DataSource) => {
        const driverFactory = new TypeORMDatabaseDriverFactory(dataSource);
        return {
          driverFactory: driverFactory,
          events: [
            {
              name: NewCatEvent.name,
              listeners: {
                expiresAtTTL: 1000 * 60 * 60 * 24,
                maxExecutionTimeTTL: 5000
                readyToRetryAfterTTL: 10000,
              },
            },
          ],
          retryEveryMilliseconds: 15000,
          maxInboxOutboxTransportEventPerRetry: 10,
        };
      },
      inject: [DataSource],
    }),
  ],
  providers: [NewCatListener],
})
export class AppModule {} 
Enter fullscreen mode Exit fullscreen mode

Step 8: Update main.ts to Emit Events
Update src/main.ts:

import {
  TransactionalEventEmitter,
  TransactionalEventEmitterOperations,
} from '@nestixis/nestjs-inbox-outbox';
import { NestFactory } from '@nestjs/core';
import { AppModule } from './app.module';
import { Cat } from './cat.model';
import { NewCatEvent } from './new-cat.event';

async function bootstrap() {
  const app = await NestFactory.create(AppModule);
  await app.init();

  const transactionalEventEmitter = app.get<TransactionalEventEmitter>(
    TransactionalEventEmitter,
  );

  const cat = new Cat();
  cat.name = 'Mr. Whiskers';

  await transactionalEventEmitter.emit(new NewCatEvent(cat.name, true), [
    {
      entity: cat,
      operation: TransactionalEventEmitterOperations.persist,
    },
  ]);

  await app.listen(3000);
}
bootstrap(); 
Enter fullscreen mode Exit fullscreen mode

Step 9: Run the Application

Start your NestJS application:

nest start
Enter fullscreen mode Exit fullscreen mode

You should see a log message indicating that a new cat has appeared.

Image description

Conclusion

Congratulations! You've now set up a NestJS application implementing the Inbox/Outbox pattern with PostgreSQL.

As you continue to develop your application, pay extra attention to the following points:

  1. Idempotent Listeners: Ensure your event listeners are idempotent, meaning they can be executed multiple times without changing the result beyond the initial application. This is crucial for handling potential duplicate events.

  2. Unique Names: The package checks for unique names of events and listeners at startup. Always use distinct, descriptive names for your events and listeners to avoid conflicts.

  3. Execution Time: Consider the execution time of your listeners. If they perform long-running operations, adjust the maxExecutionTimeTTL accordingly. This prevents premature timeouts for complex tasks.

  4. Message Expiration: Not all messages need to be repeated indefinitely. Use the expiresAtTTL parameter wisely to set appropriate expiration times for your events. This helps manage system resources and prevents processing of outdated events.

  5. Retry Cooldown: The readyToRetryAfterTTL parameter sets a cooldown period after the first failure of a listener. Consider the nature of your operations when setting this value. For transient failures, a shorter cooldown might be appropriate, while for dependent service issues, a longer cooldown could be beneficial.

Link to working example: Github

. . . . . . .
Terabox Video Player