Asyncio allows developers to write asynchronous programs in Python without hassle. The module also provides many ways asynchronous tasks and with the multitude of ways to do it, it can become confusing on which one to use.
In this article, we will discuss the many ways you can create and manage tasks with asyncio.
What is an asyncio task?
In asyncio
, a task is an object that wraps a coroutine and schedules it to run within the event loop. Simply put, a task is a way to run a coroutine concurrently with other tasks. Once a task is created, the event loop runs it, pausing and resuming it as necessary to allow other tasks to run.
Methods for Creating and Managing Asyncio Tasks
Now, we can discuss methods for creating and managing tasks. First, to create a task in Python using asyncio, you use the asyncio.create_task
method which takes the following arguments:
coro
(required): The coroutine object to be scheduled. This is the function you want to run asynchronously.-
name
(optional): A name for the task that can be useful for debugging or logging purposes. You can assign a string to this parameter.- You can also set or get the name later using
Task.set_name(name)
andTask.get_name()
.
- You can also set or get the name later using
-
context
(optional): Introduced in Python 3.11, this is used to set a context variable for the task, enabling task-local storage. It’s similar to thread-local storage but for asyncio tasks.- This argument is not commonly used unless you're dealing with advanced scenarios that require context management.
Here is an example of the usage of asyncio.create_task
:
import asyncio
# Define a coroutine
async def greet(name):
await asyncio.sleep(1) # Simulate an I/O-bound operation
print(f"Hello, {name}!")
async def main():
# Create tasks
task1 = asyncio.create_task(greet("Alice"), name="GreetingAlice")
task2 = asyncio.create_task(greet("Bob"), name="GreetingBob")
# Check task names
print(f"Task 1 name: {task1.get_name()}")
print(f"Task 2 name: {task2.get_name()}")
# Wait for both tasks to complete
await task1
await task2
# Run the main function
asyncio.run(main())
When you create a task, you can execute many methods such as:
.cancel()
: to cancel the task..add_done_callback(cb)
: to add a callback function that runs when the task is done..done()
: to check if the task is completed..result()
: to retrieve the result of the task after it’s completed.
Now that we understand how to create a task, let's see how to handle waiting for one task or a multitude of tasks.
Waiting for Tasks completion
In this section, we will discuss how to wait for a task completion, for one or many tasks. Asynchronous programming is based on the fact that we can continue the execution of a program if we have an asynchronous task running. There might be times when you want to control better the flow and want to ensure that you have a result that you can work with before safely continuing the execution of the program.
To wait for a single task completion, you can use asyncio.wait_for
. It takes two arguments:
awaitable
(required): This is the coroutine, task, or future that you want to wait for. It can be any object that can be awaited, like a coroutine function call, anasyncio.Task
, or anasyncio.Future
.timeout
(optional): This specifies the maximum number of seconds to wait for theaw
to complete. If thetimeout
is reached and the awaitable has not completed,asyncio.wait_for
raises aTimeoutError
. Iftimeout
is set toNone
, the function will wait indefinitely for the awaitable to complete.
Here is an example where this method is used:
import asyncio
async def slow_task():
print("Task started...")
await asyncio.sleep(5) # Simulating a long-running task
print("Task finished!")
return "Completed"
async def main():
try:
# Wait for slow_task to finish within 2 seconds
result = await asyncio.wait_for(slow_task(), timeout=2)
print(result)
except asyncio.TimeoutError:
print("The task took too long and was canceled!")
asyncio.run(main())
In the code above, slow_task()
is a coroutine that simulates a long-running task by sleeping for 5 seconds. The line asyncio.wait_for(slow_task(), timeout=2)
waits for the task to complete but limits the wait to 2 seconds, causing a timeout since the task takes longer. When the timeout is exceeded, a TimeoutError
is raised, the task is canceled, and the exception is handled by printing a message indicating the task took too long.
We can also wait for multiple or a group of tasks to complete. This is possible using asyncio.wait
, asyncio.gather
or asyncio.as_completed
. Let's explore each method.
asyncio.wait
The asyncio.wait
method waits for a collection of tasks and returns two sets: one for completed tasks and one for pending tasks. It takes the following arguments:
aws
(required, iterable of awaitables): A collection of coroutine objects, tasks, or futures that you want to wait for.timeout
(float or None, optional): The maximum number of seconds to wait. If not provided, it waits indefinitely.-
return_when
(constant, optional): Specifies whenasyncio.wait
should return. Options include:-
asyncio.ALL_COMPLETED
(default): Returns when all tasks are complete. -
asyncio.FIRST_COMPLETED
: Returns when the first task is completed. -
asyncio.FIRST_EXCEPTION
: Returns when the first task raises an exception.
-
Let's see how it is used in an example.
import asyncio
import random
async def task():
await asyncio.sleep(random.uniform(1, 3))
async def main():
tasks = [asyncio.create_task(task()) for _ in range(3)]
done, pending = await asyncio.wait(tasks, return_when=asyncio.FIRST_COMPLETED)
print(f"Done tasks: {len(done)}, Pending tasks: {len(pending)}")
asyncio.run(main())
In the code above, asyncio.wait
waits for a group of tasks and returns two sets: one with completed tasks and another with those still pending. You can control when it returns, such as after the first task is completed or after all tasks are done. In the example, asyncio.wait
returns when the first task is completed, leaving the rest in the pending set.
asyncio.gather
The asyncio.gather
method runs multiple awaitable objects concurrently and returns a list of their results, optionally handling exceptions. Let's see the arguments it takes.
*aws
(required, multiple awaitables): A variable number of awaitable objects (like coroutines, tasks, or futures) to run concurrently.return_exceptions
(bool, optional): IfTrue
, exceptions in the tasks will be returned as part of the results list instead of being raised.
Let's see how it can be used in an example.
import asyncio
import random
async def task(id):
await asyncio.sleep(random.uniform(1, 3))
return f"Task {id} done"
async def main():
results = await asyncio.gather(task(1), task(2), task(3))
print(results)
asyncio.run(main())
In the code above, asyncio.gather
runs multiple awaitable objects concurrently and returns a list of their results in the order they were passed in. It allows you to handle exceptions gracefully if return_exceptions
is set to True
. In the example, three tasks are run simultaneously, and their results are returned in a list once all tasks are complete.
asyncio.as
_completed
The asyncio.as_completed
method is used to return an iterator that yields tasks as they are completed, allowing results to be processed immediately. It takes the following arguments:
aws
(iterable of awaitables): A collection of coroutine objects, tasks, or futures.timeout
(float or None, optional): The maximum number of seconds to wait for tasks to complete. If not provided, it waits indefinitely.
Example
import asyncio
import random
async def task(id):
await asyncio.sleep(random.uniform(1, 3))
return f"Task {id} done"
async def main():
tasks = [task(i) for i in range(3)]
for coro in asyncio.as_completed(tasks):
result = await coro
print(result)
asyncio.run(main())
In the example above, asyncio.as
_completed
returns an iterator that yields results as each task completes, allowing you to process them immediately. This is useful when you want to handle results as soon as they're available, rather than waiting for all tasks to finish. In the example, the tasks are run simultaneously, and their results are printed as each one finishes, in the order they complete.
So to make a summary, you use:
asyncio.wait
: when you need to handle multiple tasks and want to track which tasks are completed and which are still pending. It's useful when you care about the status of each task separately.asyncio.gather
: when you want to run multiple tasks concurrently and need the results in a list, especially when the order of results matters or you need to handle exceptions gracefully.asyncio.as_completed
: when you want to process results as soon as each task finishes, rather than waiting for all tasks to complete. It’s useful for handling results in the order they become available.
However, these methods don't take atomic task management with built-in error handling. In the next section, we will see about asyncio.TaskGroup
and how to use it to manage a group of tasks.
asyncio.TaskGroup
asyncio.TaskGroup
is a context manager introduced in Python 3.11 that simplifies managing multiple tasks as a group. It ensures that if any task within the group fails, all other tasks are canceled, providing a way to handle complex task management with robust error handling. The class has one method called created_task
used to create and add tasks to the task group. You pass a coroutine to this method, and it returns an asyncio.Task
object that is managed by the group.
Here is an example of how it is used:
import asyncio
async def task1():
await asyncio.sleep(1)
return "Task 1 done"
async def task2():
await asyncio.sleep(2)
return "Task 2 done"
async def task_with_error():
await asyncio.sleep(1)
raise ValueError("An error occurred")
async def main():
try:
async with asyncio.TaskGroup() as tg:
task1 = tg.create_task(task1())
task2 = tg.create_task(task2())
error_task = tg.create_task(task_with_error())
except Exception as e:
print(f"Error: {e}")
# Print results from completed tasks
print("Task 1 result:", task1.result())
print("Task 2 result:", task2.result())
asyncio.run(main())
asyncio.TaskGroup
manages multiple tasks and ensures that if any task fails, all other tasks in the group are canceled. In the example, a task with an error causes the entire group to be canceled, and only the results of completed tasks are printed.
Usage for this can be in web scraping. You can use asyncio.TaskGroup
to handle multiple concurrent API requests and ensure that if any request fails, all other requests are canceled to avoid incomplete data.
We are at the end of the article and we have learned the multiple methods asyncio
provides to create and manage tasks. Here is a summary of the methods:
asyncio.wait_for
: Wait for a task with a timeout.asyncio.wait
: Wait for multiple tasks with flexible completion conditions.asyncio.gather
: Aggregate multiple tasks into a single awaitable.asyncio.as_completed
: Handle tasks as they are completed.asyncio.TaskGroup
: Manage a group of tasks with automatic cancellation on failure.
Conclusion
Asynchronous programming can transform the way you handle concurrent tasks in Python, making your code more efficient and responsive. In this article, we've navigated through the various methods provided by asyncio
to create and manage tasks, from simple timeouts to sophisticated task groups. Understanding when and how to use each method—asyncio.wait_for
, asyncio.wait
, asyncio.gather
, asyncio.as
_completed
, and asyncio.TaskGroup
—will help you harness the full potential of asynchronous programming, making your applications more robust and scalable.
For a deeper dive into asynchronous programming and more practical examples, explore our detailed guide here.
If you enjoyed this article, consider subscribing to my newsletter so you don't miss out on future updates.
Happy coding!