Implementing The Saga Pattern With Rebus And RabbitMQ

Milan Jovanović - Sep 18 '23 - - Dev Community

Designing long-lived processes in a distributed environment is an interesting engineering challenge.

And a well known pattern for solving this problem is a Saga.

A Saga is a sequence of local transactions, where each local transaction updates the Saga state and publishes a message triggering the next step in the Saga.

Sagas come in two forms:

  • Orchestrated
  • Choreographed

With an orchestrated Saga, there's a central component responsible for orchestrating the individual steps.

In a choreographed Saga, processes work independently but coordinate with each other using events.

In this week's issue, I'll show you how to create an orchestrated Saga using the Rebus library with RabbitMQ for message transport.

Rebus Configuration

Rebus is a free .NET “service bus”, and it's practical for implementing asynchronous messaging-based communication between the components of an application.

Let's install the following libraries:

  • Rebus.ServiceProvider for managing the Rebus instance
  • Rebus.RabbitMq for RabbitMQ message transport
  • Rebus.SqlServer for SQL Server state persistence
Install-Package Rebus.ServiceProvider -Version 8.4.0
Install-Package Rebus.RabbitMq -Version 8.0.0
Install-Package Rebus.SqlServer -Version 7.3.1
Enter fullscreen mode Exit fullscreen mode

Inside of your ASP.NET Core application you will need the following configuration.

services.AddRebus(
    rebus => rebus
        .Routing(r =>
            r.TypeBased().MapAssemblyOf<Program>("newsletter-queue"))
        .Transport(t =>
            t.UseRabbitMq(
                configuration.GetConnectionString("RabbitMq"),
                inputQueueName: "newsletter-queue"))
        .Sagas(s =>
            s.StoreInSqlServer(
                configuration.GetConnectionString("SqlServer"),
                dataTableName: "Sagas",
                indexTableName: "SagaIndexes"))
        .Timeouts(t =>
            t.StoreInSqlServer(
                builder.Configuration.GetConnectionString("SqlServer"),
                tableName: "Timeouts"))
);

services.AutoRegisterHandlersFromAssemblyOf<Program>();
Enter fullscreen mode Exit fullscreen mode

Unpacking the individual configuration steps:

  • Routing - Configures messages to be routed by their type
  • Transport - Configures the message transport mechanism
  • Sagas - Configures the Saga persistence store
  • Timeouts - Configures the timeouts persistence store

You also need to specify the queue name for sending and receiving messages.

AutoRegisterHandlersFromAssemblyOf will scan the specified assembly and automatically register the respective message handlers.

Creating The Saga With Rebus

We're going to create a Saga for a newsletter onboarding process.

When a user subscribes to the newsletter we want to:

  • Send a welcome email immediately
  • Send a follow-up email after 7 days

The first step in creating the Saga is defining the data model by implementing ISagaData. We'll keep it simple and store the Email for correlation , and add two flags for the distinct steps in our Saga.

public class NewsletterOnboardingSagaData : ISagaData
{
    public Guid Id { get; set; }
    public int Revision { get; set; }

    public string Email { get; set; }

    public bool WelcomeEmailSent { get; set; }

    public bool FollowUpEmailSent { get; set; }
}
Enter fullscreen mode Exit fullscreen mode

Now we can define the NewsletterOnboardingSaga class by inheriting from the Saga class and implementing the CorrelateMessages method.

It's a best practice to use a unique value for correlation. In our case this will be the Email.

You also configure how the Saga starts with IAmInitiatedBy, and the individual messages the Saga handles with IHandleMessages.

public class NewsletterOnboardingSaga :
    Saga<NewsletterOnboardingSagaData>,
    IAmInitiatedBy<SubscribeToNewsletter>,
    IHandleMessages<WelcomeEmailSent>,
    IHandleMessages<FollowUpEmailSent>
{
    private readonly IBus _bus;

    public NewsletterOnboardingSaga(IBus bus)
    {
        _bus = bus;
    }

    protected override void CorrelateMessages(
        ICorrelationConfig<NewsletterOnboardingSagaData> config)
    {
        config.Correlate<SubscribeToNewsletter>(m => m.Email, d => d.Email);

        config.Correlate<WelcomeEmailSent>(m => m.Email, d => d.Email);

        config.Correlate<FollowUpEmailSent>(m => m.Email, d => d.Email);
    }

    /* Handlers omitted for brevity */
}
Enter fullscreen mode Exit fullscreen mode

Message Types And Naming Conventions

There are two types of messages you send in a Saga :

  • Commands
  • Events

Commands instruct the receiving component what to do.

Think: verb, imperative.

Events notify the Saga which process was just completed.

Think: what happened, past tense.

Saga Orchestration With Messages

The NewsletterOnboardingSaga starts by handling the SubscribeToNewsletter command, and publishes a SendWelcomeEmail command.

public async Task Handle(SubscribeToNewsletter message)
{
    if (!IsNew)
    {
        return;
    }

    await _bus.Send(new SendWelcomeEmail(message.Email));
}
Enter fullscreen mode Exit fullscreen mode

The SendWelcomeEmail command is handled by a different component, which publishes a WelcomeEmailSent event when it completes.

In the WelcomeEmailSent handler we update the Saga state and publish a deferred message by calling Defer. Rebus will persist the SendFollowUpEmail command, and publish it when the timeout expires.

public async Task Handle(WelcomeEmailSent message)
{
    Data.WelcomeEmailSent = true;

    await _bus.Defer(TimeSpan.FromDays(3), new SendFollowUpEmail(message.Email));
}
Enter fullscreen mode Exit fullscreen mode

Finally, the SendFollowUpEmail command is handled and we publish the FollowUpEmailSent event.

We update the Saga state again, and also call MarkAsComplete to complete the Saga.

public Task Handle(FollowUpEmailSent message)
{
    Data.FollowUpEmailSent = true;

    MarkAsComplete();

    return Task.CompletedTask;
}
Enter fullscreen mode Exit fullscreen mode

Completing the Saga will delete it from the database.

Handling Commands With Rebus

Here's how the SendWelcomeEmail command handler looks like.

public class SendWelcomeEmailHandler : IHandleMessages<SendWelcomeEmail>
{
    private readonly IEmailService _emailService;
    private readonly IBus _bus;

    public SendWelcomeEmailHandler(IEmailService emailService, IBus bus)
    {
        _emailService = emailService;
        _bus = bus;
    }

    public async Task Handle(SendWelcomeEmail message)
    {
        await _emailService.SendWelcomeEmailAsync(message.Email);

        await _bus.Reply(new WelcomeEmailSent(message.Email));
    }
}
Enter fullscreen mode Exit fullscreen mode

The important thing to highlight here is that we're using the Reply method to send a message back. This will reply back to the endpoint specified as the return address on the current message.

In Summary

Sagas are practical for implementing a long-lived process in a distributed system. Each business process is represented by a local transaction, and publishes a message to trigger the next step in the Saga.

Although Sagas are very powerful, they are also complicated to develop, maintain and debug.

We didn't cover a few important topics in this newsletter:

  • Error handling
  • Message retries
  • Compensating transactions

I think you'll have some fun researching these on your own.

Take a look at the source code for the example used in this newsletterif you want to learn more about implementing Sagas.

If you have Docker installed, you should be able to run it without a problem and try it out.

Thank you for reading, and have an awesome Saturday.


P.S. Whenever you're ready, there are 2 ways I can help you:

  1. Pragmatic Clean Architecture: This comprehensive course will teach you the system I use to ship production-ready applications using Clean Architecture. Learn how to apply the best practices of modern software architecture. Join 950+ students here.

  2. Patreon Community: Think like a senior software engineer with access to the source code I use in my YouTube videos and exclusive discounts for my courses. Join 820+ engineers here.

. . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . .
Terabox Video Player