Implementing the Outbox Pattern

Milan Jovanović - Oct 8 - - Dev Community

In distributed systems, we often face the challenge of keeping our database and external systems in sync. Imagine saving an order to a database and then publishing a message to a message broker. If either operation fails, your system ends up in an inconsistent state.

The Outbox pattern solves this problem by treating message publication as part of your database transaction. Instead of publishing messages directly, we save them to an Outbox table in our database, ensuring atomic operations. A separate process then reliably publishes these messages.

In this newsletter, we'll dive into implementing this pattern in .NET, covering everything from setup to scaling.

Why Do We Need the Outbox Pattern?

The transactional Outbox pattern fixes a common problem in distributed systems. This problem happens when you need to do two things at once: save data and communicate with an external component.

Consider scenarios like sending order confirmation emails, notifying other systems about new client registrations, or updating inventory levels after an order is placed. Each of these involves a local data change coupled with an external communication or update.

For example, imagine a microservice that needs to:

  • Save a new order in its database
  • Tell other systems about this new order

If one of these steps fails, your system could end up in an inconsistent state. Maybe the order is saved, but no one else knows about it. Or everyone thinks there's a new order, but it's not actually in the database.

Here's a CreateOrderCommandHandler without the Outbox pattern:

public class CreateOrderCommandHandler(
    IOrderRepository orderRepository,
    IProductInventoryChecker inventoryChecker,
    IUnitOfWork unitOfWork,
    IEventBus eventBus) : IRequestHandler<CreateOrderCommand, OrderDto>
{
    public async Task<OrderDto> Handle(CreateOrderCommand request, CancellationToken cancellationToken)
    {
        var order = new Order(request.CustomerId, request.ProductId, request.Quantity, inventoryChecker);

        await orderRepository.AddAsync(order);

        await unitOfWork.CommitAsync(cancellationToken);

        // The database transaction is completed at this point.

        await eventBus.Send(new OrderCreatedIntegrationEvent(order.Id));

        return new OrderDto { Id = order.Id, Total = order.Total };
    }
}
Enter fullscreen mode Exit fullscreen mode

This code has a potential consistency problem. After the database transaction is committed, two things could go wrong:

  1. The application might crash right after the transaction is committed but before the event is sent. The order would be created in the database, but other systems wouldn't know about it.

  2. The event bus might be down or unreachable when we try to send the event. This would result in the order being created without notifying other systems.

The transactional Outbox pattern helps solve this problem by ensuring that the database update and event publication are treated as a single atomic operation.

Image description

The sequence diagram illustrates how the Outbox pattern solves our consistency challenge. Instead of trying to save data and send a message as separate steps, we save both the order and an Outbox message in a single database transaction. This is an all-or-nothing operation - we can't end up in an inconsistent state.

A separate Outbox processor handles the actual message sending. It continuously checks for unsent messages in the Outbox table and publishes them to the message queue. The processor marks messages as sent after successful publishing, preventing duplicates.

An important thing to realize here is that the Outbox pattern gives us at-least-once delivery. The Outbox message will be sent at least once, but it could also be sent multiple times in case of retries. This means we have to make our message consumers idempotent.

Implementing the Outbox Pattern

First, let's create our Outbox table where we will store messages:

CREATE TABLE outbox_messages (
    id UUID PRIMARY KEY,
    type VARCHAR(255) NOT NULL,
    content JSONB NOT NULL,
    occurred_on_utc TIMESTAMP WITH TIME ZONE NOT NULL,
    processed_on_utc TIMESTAMP WITH TIME ZONE NULL,
    error TEXT NULL
);

-- We can consider adding this index since we will be querying for unprocessed messages often
-- and it will contain the rows in the correct sort order for our query.
CREATE INDEX IF NOT EXISTS idx_outbox_messages_unprocessed
ON outbox_messages (occurred_on_utc, processed_on_utc)
INCLUDE (id, type, content)
WHERE processed_on_utc IS NULL;
Enter fullscreen mode Exit fullscreen mode

I'll use PostgreSQL as the database for this example. Notice the jsonb type for the content column. It allows for indexing and querying of the JSON data if needed in the future.

Now, let's create a class to represent our Outbox entry:

public sealed class OutboxMessage
{
    public Guid Id { get; init; }
    public string Type { get; init; }
    public string Content { get; init; }
    public DateTime OccurredOnUtc { get; init; }
    public DateTime? ProcessedOnUtc { get; init; }
    public string? Error { get; init; }
}
Enter fullscreen mode Exit fullscreen mode

Here's how we can add a message to the Outbox:

public async Task AddToOutbox<T>(T message, NpgsqlDataSource dataSource)
{
    var outboxMessage = new OutboxMessage
    {
        Id = Guid.NewGuid(),
        OccurredOnUtc = DateTime.UtcNow,
        Type = typeof(T).FullName, // We'll need this for deserialization
        Content = JsonSerializer.Serialize(message)
    };

    await using var connection = await dataSource.OpenConnectionAsync();
    await connection.ExecuteAsync(
        @"""
        INSERT INTO outbox_messages (id, occurred_on_utc, type, content)
        VALUES (@Id, @OccurredOnUtc, @Type, @Content::jsonb)
        """,
        outboxMessage);
}
Enter fullscreen mode Exit fullscreen mode

An elegant approach to implementing this is using domain events to represent notifications. When something significant happens in the domain, we will raise a domain event. Before completing the transaction, we can pick up all events and store them as Outbox messages. You could do this from the unit of work or with an EF Core interceptor.

Processing the Outbox

The Outbox processor is the next component we'll need. This could be a physically separate process or a background worker in the same process.

I'll use Quartz to schedule background jobs for Outbox processing. It's a robust library with excellent support for scheduling recurring jobs.

Now, let's implement the OutboxProcessorJob:

[DisallowConcurrentExecution]
public class OutboxProcessorJob(
    NpgsqlDataSource dataSource,
    IPublishEndpoint publishEndpoint,
    Assembly integrationEventsAssembly) : IJob
{
    public async Task Execute(IJobExecutionContext context)
    {
        await using var connection = await dataSource.OpenConnectionAsync();
        await using var transaction = await connection.BeginTransactionAsync();

        // You can make the limit a parameter, to control the batch size.
        // We can also select just the id, type, and content columns.
        var messages = await connection.QueryAsync<OutboxMessage>(
            @"""
            SELECT * FROM outbox_messages
            WHERE processed_on_utc IS NULL
            ORDER BY occurred_on_utc LIMIT 100
            """,
            transaction: transaction);

        foreach (var message in messages)
        {
            try
            {
                var messageType = integrationEventsAssembly.GetType(message.Type);
                var deserializedMessage = JsonConvert.DeserializeObject(message.Content, messageType);

                // We should introduce retries here to improve reliablity.
                await publishEndpoint.Publish(deserializedMessage);

                await connection.ExecuteAsync(
                    @"""
                    UPDATE outbox_messages
                    SET processed_on_utc = @ProcessedOnUtc
                    WHERE id = @Id
                    """,
                    new { ProcessedOnUtc = DateTime.UtcNow, message.Id },
                    transaction: transaction);
            }
            catch (Exception ex)
            {
                // We can also introduce error logging here.

                await connection.ExecuteAsync(
                    @"""
                    UPDATE outbox_messages
                    SET processed_on_utc = @ProcessedOnUtc, error = @Error
                    WHERE id = @Id
                    """,
                    new { ProcessedOnUtc = DateTime.UtcNow, Error = ex.ToString(), message.Id },
                    transaction: transaction);
            }
        }

        await transaction.CommitAsync();
    }
}
Enter fullscreen mode Exit fullscreen mode

This approach uses polling to periodically fetch unprocessed messages from the database. Polling can increase the load on the database, as we'll need to query for unprocessed messages frequently.

An alternative way to process Outbox messages is by using Transaction log tailing. We can implement this using Postgres logical replication. The database will stream changes from the Write-Ahead Log (WAL) to our application, and we'll process these messages and publish them to the message broker. You can use this to implement a push-based Outbox processor.

Considerations and Tradeoffs

The Outbox pattern, while effective, introduces additional complexity and database writes. In high-throughput systems, it's crucial to monitor its performance to ensure it doesn't become a bottleneck.

I recommend implementing retry mechanisms in the Outbox processor to improve reliability. Consider using exponential backoff for transient failures and a circuit breaker for persistent issues to prevent system overload during outages.

It's essential that you implement idempotent message consumers. Network issues or processor restarts can lead to multiple deliveries of the same message, so your consumers must handle repeated processing safely.

Over time, the Outbox table can grow significantly, potentially impacting database performance. It's important to implement an archiving strategy early on. Consider moving processed messages to cold storage or deleting them after a set period.

Scaling Outbox Processing

As your system grows, you may find that a single Outbox processor can't keep up with the volume of messages. This can lead to increased latency between when an event occurs and when it's processed by consumers.

One straightforward approach is to increase the frequency of the Outbox processor job. You should consider running it every few seconds. This can significantly reduce the delay in message processing.

Another effective strategy is to increase the batch size when fetching unprocessed messages. By processing more messages in each run, you can improve throughput. However, be cautious not to make the batches so large that they cause long-running transactions.

For high-volume systems, processing the Outbox in parallel can be very effective. Implement a locking mechanism to claim batches of messages, allowing multiple processors to work simultaneously without conflict. You can use SELECT ... FOR UPDATE SKIP LOCKED to claim a batch of messages. This approach can dramatically increase your processing capacity.

Wrapping Up

The Outbox pattern is a powerful tool for maintaining data consistency in distributed systems. By decoupling database operations from message publishing, the Outbox pattern ensures that your system remains reliable even in the face of failures.

Remember to keep your consumers idempotent, implement proper scaling strategies, and manage your Outbox table growth.

While it adds some complexity, the benefits of guaranteed message delivery make it a valuable pattern in many scenarios.

If you're looking to implement the Outbox pattern in a robust, production-ready way, you can check out Pragmatic Clean Architecture. It includes an entire section on implementing the Outbox pattern, along with other essential patterns for building maintainable and scalable .NET applications.

That's all for today. Stay awesome, and I'll see you next week.


P.S. Whenever you're ready, there are 3 ways I can help you:

  1. Pragmatic Clean Architecture: Join 3,050+ students in this comprehensive course that will teach you the system I use to ship production-ready applications using Clean Architecture. Learn how to apply the best practices of modern software architecture.

  2. Modular Monolith Architecture: Join 950+ engineers in this in-depth course that will transform the way you build modern systems. You will learn the best practices for applying the Modular Monolith architecture in a real-world scenario.

  3. Patreon Community: Join a community of 1,050+ engineers and software architects. You will also unlock access to the source code I use in my YouTube videos, early access to future videos, and exclusive discounts for my courses.

. . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . .
Terabox Video Player