Months ago I found this amazing YouTube video by Jyotinder Singh: How to build a Distributed Task Scheduler with Go, Postgres, and gRPC
It's a project that he built a distributed task scheduler in Go using PostgreSQL and gRPC. The architecture looks very practical compare to other ordinary projects such as pseudo e-commerce site, or todo app (they are also great too). And I thought it would be great if I make my own in Rust.
So here are some things I learned though my version of distributed task scheduler.
Worker Pool Pattern in Go
One way to maximize machine's concurrent capability in Go is to use worker pool pattern. You can spin up a number of workers on start up and send a task to each worker through a channel.
Example Code:
func (w *Worker) Run() error {
w.startWorkerPool(workerPoolSize)
w.startGRPCServer()
log.Println("Serving now...")
return w.awaitShutdown()
}
func (w *Worker) startWorkerPool(poolSize int) {
for i := 0; i < poolSize; i++ {
// spawn workers
go func() {
w.wg.Add(1)
defer w.wg.Done()
w.process()
}()
}
}
func (w *Worker) process() {
for {
select {
case <-w.ctx.Done():
return
case t := <-w.taskQueue:
req := &pb.UpdateTaskStatusRequest{
TaskId: t.GetTaskId(),
Status: pb.TaskStatus_STARTED,
StartedAt: time.Now().Unix(),
}
w.grpcClient.UpdateTaskStatus(w.ctx, req)
}
}
}
This is totally valid code in Go as channel is thread-safe and multiple goroutines can be consumers for a single channel. It is also memory-efficient as it doesn't have to initialize gRPC client each time it receives a new task.
What about Rust?
However, the story is slightly different in async Rust world. The standard library only provides channels that only allow a single consumer. Having a single worker in a poll doesn't make sense at all.
So this time I chose a simpler way to achieve it in async Rust - just spawn a new async task.
Example code in Rust:
#[tonic::async_trait]
impl WorkerService for Worker {
async fn submit_task(
&self,
req: Request<TaskRequest>,
) -> Result<Response<TaskResponse>, Status> {
let req = req.into_inner();
let response = req.clone().into();
let addr = self.coordinator_addr.clone();
// Pass a received task to another async function!
tokio::spawn(handle_task(req.task_id.clone(), addr));
// And send a response
Ok(Response::new(response))
}
}
async fn handle_task(task_id: String, addr: String) -> Result<(), String> {
println!("Connecting to gRPC server: {}", addr);
let mut client = CoordinatorServiceClient::connect(addr.to_string())
.await
.map_err(|e| e.to_string())?;
println!("Created a new gRPC client for coordinator");
let req = UpdateTaskStatusRequest::start(&task_id)?;
let _ = client
.update_task_status(Request::new(req.clone()))
.await
.map_err(|e| e.to_string())?;
// Processing task now...
Ok(())
}
Two obvious downsides here:
- In Rust code, a worker needs to initialize gPRC client and connect to a server each time it receives a request. This is huge disadvantage for a service at scale such as one Rust might get involved.
Do we have alternatives?
Well, there actually is a library called async-channel. This should be the answer for the question. I don't have enough bandwidth to try it now. But I would like to in the near future.
Thanks for reading ✌️