Asynchronous Request-Reply pattern implementation
Decouple backend processing from a frontend host, where backend processing needs to be asynchronous, but the frontend still needs a clear response
Requirements
- .NET8
Decouple backend processing from a frontend host, where backend processing needs to be asynchronous, but the frontend still needs a clear response. More details are in the article.
I'm going to show implementation based on ToDo API. We have REST methods to get ToDo(s). The typical implementation - gets ToDo item by ID and gets all ToDo items
group.MapGet("/", (Store store) => store.Todos.Values.ToArray());
group.MapGet("/{id}", (string id, Store store) =>
store.Todos.TryGetValue(id, out var todo)
? Results.Ok(todo)
: Results.NotFound());
More interested POST method, because it is asynchronous:
Location
URL where get the result and RetryAfter
to delay next request.
group.MapPost("/", async (Todo model,
BackgroundJobQueue queue,
Store store, HttpResponse response) =>
{
model = model with { Id = Guid.NewGuid().ToString() };
var job = new Job(
Guid.NewGuid().ToString(),
DateTime.UtcNow,
JobStatus.InProgress,
$"/api/todos/{model.Id}"
);
store.Jobs.Add(job.Id, job);
Func<CancellationToken, ValueTask> workItem = async (token) =>
{
await Task.Delay(TimeSpan.FromSeconds(10), token);
store.Todos.Add(model.Id, model);
};
await queue.QueueJobAsync(job.Id, workItem);
response.Headers.RetryAfter = TimeSpan.FromSeconds(2).ToString();
response.Headers.Location = $"/api/jobs/{job.Id}";
response.StatusCode = StatusCodes.Status202Accepted;
});
In this example, I'm using ASP Net Core build-in BackgroundService
as a job processor and Channel
as an in-process queue. But it may be any job processor like hangfire, lambda, etc. Also, it may be any queue in-process or hosted. Just take into account all the pros and cons related to queues and job processors.
private async Task BackgroundProcessing(CancellationToken stoppingToken)
{
while (!stoppingToken.IsCancellationRequested)
{
var (id, jobTask) = await queue.DequeueJobAsync(stoppingToken);
if (!store.Jobs.TryGetValue(id, out var job))
return;
try
{
await jobTask(stoppingToken);
job = job with { CompletedAt = DateTime.UtcNow, Status = JobStatus.Completed };
}
catch (Exception ex)
{
job = job with { CompletedAt = DateTime.UtcNow, Error = ex.Message, Status = JobStatus.Failed };
}
store.Jobs[id] = job;
}
}
This is a job status API that redirects to Location
where get the job completion results or returns the job with an error when it is failed. Also, return the job with RetryAfter
header when it is still processing.
group.MapGet("/{id}", (string id, Store store, HttpResponse httpResponse) =>
{
if (!store.Jobs.TryGetValue(id, out var job) || job is null)
return Results.NotFound();
var okResult = () =>
{
httpResponse.Headers.RetryAfter = TimeSpan.FromSeconds(5).ToString();
return Results.Ok(job);
};
return job.Status switch
{
JobStatus.Completed => Results.Redirect(job.Location),
JobStatus.InProgress => okResult(),
JobStatus.Failed => Results.BadRequest(job),
_ => throw new NotImplementedException(),
};
});
return group;
.http
files are a simple way to quickly invoke your API endpoints
Location
to get job statusid
GET {{base_url}}/api/todos
###
# @name createTodo
POST {{base_url}}/api/todos
Content-Type: application/json
{
"dueBy": "2024-02-14",
"title": "Write an article"
}
###
@jobLocation = {{createTodo.response.headers.Location}}
# @name job
GET {{base_url}}{{jobLocation}}
Accept: application/json
###
@todoId = {{job.response.body.$.id}}
GET {{base_url}}/api/todos/{{todoId}}
I provided an example, of how to implement the async request-reply pattern. Important things such as queues and job processors should be chosen related to the requirements. Also, do not forget to expire job items in the DB. Btw API clients should use status codes and headers đ. And finally, the source code in the GitHub repository