πŸ“¬ Asynq: simple, reliable & efficient distributed task queue for your next Go project

Vic ShΓ³stak - Apr 6 '21 - - Dev Community

Introduction

Hi, DEV friends! πŸ˜‰ It's time to share a great find that you must try in your next project. I'm talking about simple, reliable and efficient distributed task queue written on Go and called Asynq.

I already have experience using Asynq in production on one of my work projects (microservice for sending scheduled messages to subscribers of Telegram bot). After using it successfully, I wanted to tell you more about it so you can appreciate it too!

All right, let's get started! πŸ‘‡

πŸ“ Table of contents

What is Asynq?

Follow official Asynq GitHub page:

Asynq is a Go library for queueing tasks and processing them asynchronously with workers. It's backed by Redis and is designed to be scalable yet easy to get started. [...] Task queues are used as a mechanism to distribute work across multiple machines. A system can consist of multiple worker servers and brokers, giving way to high availability and horizontal scaling.

Asynq is developed and maintained by Ken Hibino, who works as a software engineer at Google. So you can be sure of the quality of the code.

Most awesome queueing tasks features are:

  • Guaranteed at least one execution of a task
  • Scheduling of tasks
  • Durability since tasks are written to Redis
  • Retries of failed tasks
  • Automatic recovery of tasks in the event of a worker crash
  • Weighted priority queues
  • Strict priority queues
  • Low latency to add a task since writes are fast in Redis
  • De-duplication of tasks using unique option
  • Allow timeout and deadline per task
  • Flexible handler interface with support for middlewares
  • Ability to pause queue to stop processing tasks from the queue
  • Periodic Tasks

Built-in scaling tools:

  • Support Redis Cluster for automatic sharding and high availability
  • Support Redis Sentinels for high availability

And, of course, useful tools for admins:

  • Web UI to inspect and remote-control queues and tasks
  • CLI to inspect and remote-control queues and tasks

🎯 By the way, I created an official logo for Asynq.

↑ Table of contents

The project we will create

I would like to show with a simple example how you can easily work with Asynq in your Golang project. Suppose we have the task of sending a welcome email as soon as the user registers and, after a while, sending another reminder email to the user about something.

Here are the points we will stick to:

  1. Three queues for tasks with different priorities;
  2. Generating multiple tasks of different types at once and for different queues on the client;
  3. Separation into task handlers and payloads;

↑ Table of contents

Full code of this project

For clarity, you can download the full example and run it on your machine. Especially for you, I put it on GitHub:

GitHub logo koddr / tutorial-go-asynq

πŸ“– Tutorial: Asynq. Simple, reliable & efficient distributed task queue for your next Go project.

↑ Table of contents

Let's write some code

Okay! We'll move on to the most interesting part, the writing of the code. I have supplied the code examples with detailed comments, so I won't dwell on them too much in the text of the article.

πŸ”₯ Please look at the code!

↑ Table of contents

Creating tasks payloads

Let's define the payloads of our tasks. Let's create two types to send a message to Email: a welcome message (comes right away) and a reminder message (comes after a while).

// ./tasks/payloads.go

package tasks

import (
    "time"

    "github.com/hibiken/asynq"
)

const (
    // TypeWelcomeEmail is a name of the task type
    // for sending a welcome email.
    TypeWelcomeEmail = "email:welcome"

    // TypeReminderEmail is a name of the task type
    // for sending a reminder email.
    TypeReminderEmail = "email:reminder"
)

// NewWelcomeEmailTask task payload for a new welcome email.
func NewWelcomeEmailTask(id int) *asynq.Task {
    // Specify task payload.
    payload := map[string]interface{}{
        "user_id": id, // set user ID
    }

    // Return a new task with given type and payload.
    return asynq.NewTask(TypeWelcomeEmail, payload)
}

// NewReminderEmailTask task payload for a reminder email.
func NewReminderEmailTask(id int, ts time.Time) *asynq.Task {
    // Specify task payload.
    payload := map[string]interface{}{
        "user_id": id,          // set user ID
        "sent_in": ts.String(), // set time to sending
    }

    // Return a new task with given type and payload.
    return asynq.NewTask(TypeReminderEmail, payload)
}
Enter fullscreen mode Exit fullscreen mode

↑ Table of contents

Creating tasks handlers

Task handlers are our business logic, which is responsible for the specific behavior of tasks when triggered. To keep it simple, I will display a normal message in the console of the Asynq worker server.

// ./tasks/handlers.go

package tasks

import (
    "context"
    "fmt"

    "github.com/hibiken/asynq"
)

// HandleWelcomeEmailTask handler for welcome email task.
func HandleWelcomeEmailTask(c context.Context, t *asynq.Task) error {
    // Get user ID from given task.
    id, err := t.Payload.GetInt("user_id")
    if err != nil {
        return err
    }

    // Dummy message to the worker's output.
    fmt.Printf("Send Welcome Email to User ID %d\n", id)

    return nil
}

// HandleReminderEmailTask for reminder email task.
func HandleReminderEmailTask(c context.Context, t *asynq.Task) error {
    // Get int with the user ID from the given task.
    id, err := t.Payload.GetInt("user_id")
    if err != nil {
        return err
    }

    // Get string with the sent time from the given task.
    time, err := t.Payload.GetString("sent_in")
    if err != nil {
        return err
    }

    // Dummy message to the worker's output.
    fmt.Printf("Send Reminder Email to User ID %d\n", id)
    fmt.Printf("Reason: time is up (%v)\n", time)

    return nil
}
Enter fullscreen mode Exit fullscreen mode

↑ Table of contents

Creating Asynq worker server

The central part of our project. It is this component that will be responsible for the logic of sending messages and queuing them (if we need it).

// ./worker/server.go

package main

import (
    "log"

    "tutorial-go-asynq/tasks"

    "github.com/hibiken/asynq"
)

func main() {
    // Create and configuring Redis connection.
    redisConnection := asynq.RedisClientOpt{
        Addr: "localhost:6379", // Redis server address
    }

    // Create and configuring Asynq worker server.
    worker := asynq.NewServer(redisConnection, asynq.Config{
        // Specify how many concurrent workers to use.
        Concurrency: 10,
        // Specify multiple queues with different priority.
        Queues: map[string]int{
            "critical": 6, // processed 60% of the time
            "default":  3, // processed 30% of the time
            "low":      1, // processed 10% of the time
        },
    })

    // Create a new task's mux instance.
    mux := asynq.NewServeMux()

    // Define a task handler for the welcome email task.
    mux.HandleFunc(
        tasks.TypeWelcomeEmail,       // task type
        tasks.HandleWelcomeEmailTask, // handler function
    )

    // Define a task handler for the reminder email task.
    mux.HandleFunc(
        tasks.TypeReminderEmail,       // task type
        tasks.HandleReminderEmailTask, // handler function
    )

    // Run worker server.
    if err := worker.Run(mux); err != nil {
        log.Fatal(err)
    }
}
Enter fullscreen mode Exit fullscreen mode

And my favorite part of Asynq. If your application is going to grow, you will definitely want to make a more scalable system and Asynq can help you with that perfectly, because:

  • You can create a personal Asynq worker server for each queue;
  • Each Asynq worker server can be configured with any number of concurrent active workers to use;
  • Next, you can use a simple Docker Compose solution to automatically start the right amount of each Asynq worker server replicas, when needed;

And if that's not enough anyway, you can easily start a Redis Cluster using Asynq built-in adapter... literally in a couple of minutes!

☝️ I won't describe the configuration process, since the Wiki page of the project has detailed instructions.

↑ Table of contents

Creating Asynq client

The client part can be anything, really. The main thing is that it can create new tasks and send them to the queue:

// ./client/main.go

package main

import (
    "log"
    "math/rand"
    "time"

    "tutorial-go-asynq/tasks"

    "github.com/hibiken/asynq"
)

func main() {
    // Create a new Redis connection for the client.
    redisConnection := asynq.RedisClientOpt{
        Addr: "localhost:6379", // Redis server address
    }

    // Create a new Asynq client.
    client := asynq.NewClient(redisConnection)
    defer client.Close()

    // Infinite loop to create tasks as Asynq client.
    for {
        // Generate a random user ID.
        userID := rand.Intn(1000) + 10

        // Set a delay duration to 2 minutes.
        delay := 2 * time.Minute

        // Define tasks.
        task1 := tasks.NewWelcomeEmailTask(userID)
        task2 := tasks.NewReminderEmailTask(userID, time.Now().Add(delay))

        // Process the task immediately in critical queue.
        if _, err := client.Enqueue(
            task1,                   // task payload
            asynq.Queue("critical"), // set queue for task
        ); err != nil {
            log.Fatal(err)
        }

        // Process the task 2 minutes later in low queue.
        if _, err := client.Enqueue(
            task2,                  // task payload
            asynq.Queue("low"),     // set queue for task
            asynq.ProcessIn(delay), // set time to process task
        ); err != nil {
            log.Fatal(err)
        }
    }
}
Enter fullscreen mode Exit fullscreen mode

↑ Table of contents

Asynq web UI

Unfortunately, at the moment, the Asyncq web UI installation is only available by copying binary from a releases page or running from a Docker container, like this:

# Pull the latest image
docker pull hibiken/asynqmon

# Run Asynqmon
docker run --rm \
    --name asynqmon \
    -p 8080:8080 \
    hibiken/asynqmon
Enter fullscreen mode Exit fullscreen mode

πŸ‘‹ Author of Asyncq and myself are already working on simplifying this process for many platforms in a future version (hopefully v1.0.0). If you have a desire to help, you are welcome!

In the meantime, this is Asynqmon repository:

GitHub logo hibiken / asynqmon

Web UI for Asynq task queue

The installed and running web UI will look like this:

web ui hover charts

↑ Table of contents

List of servers and workers

Shows Asynq worker server statistics with detailed information about queues and active workers:

web ui servers and workers

Supports mass operations with tasks in a convenient tabular list with information on a given queue:

web ui servers and workers mass operations

↑ Table of contents

Redis server information

It graphically displays all the necessary information about memory usage, server uptime, the number of connected clients at the moment and much more:

web ui redis information

By the way, a full list of your Redis server configuration can be found at INFO Command Output section at the bottom of this page (data will be updated automatically).

↑ Table of contents

Adaptive dark theme

Oh, yes! The future has arrived. A dark theme with an adaptive mode for the most fashionable Asynq users πŸŽ‰

dark theme

↑ Table of contents

Asynq CLI

Install the Asynq CLI tool by running the following command:

go get -u github.com/hibiken/asynq/tools/asynq
Enter fullscreen mode Exit fullscreen mode

To see the current state of the queues and their statistics:

asynq stats
Enter fullscreen mode Exit fullscreen mode

asynq stats

↑ Table of contents

Photos and videos by

P.S.

If you want more articles (like this) on this blog, then post a comment below and subscribe to me. Thanks! 😻

❗️ You can support me on Boosty, both on a permanent and on a one-time basis. All proceeds from this way will go to support my OSS projects and will energize me to create new products and articles for the community.

support me on Boosty

And of course, you can help me make developers' lives even better! Just connect to one of my projects as a contributor. It's easy!

My main projects that need your help (and stars) πŸ‘‡

  • πŸ”₯ gowebly: A next-generation CLI tool that makes it easy to create amazing web applications with Go on the backend, using htmx, hyperscript or Alpine.js and the most popular CSS frameworks on the frontend.
  • ✨ create-go-app: Create a new production-ready project with Go backend, frontend and deploy automation by running one CLI command.

Other my small projects: yatr, gosl, json2csv, csv2api.

. . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . .
Terabox Video Player