Processing DAGs with Async Python and graphlib

Roman - Aug 26 - - Dev Community

I recently came across an interesting module in Python's bottomless standard library: graphlib. If you haven't worked with it before, it's a small utility that was added in Python 3.9 and only implements one class: TopologicalSorter.

The name is self-explanatory -- this is a class for topological sorting of a graph. But I don't think it was originally written with just sorting in mind, since it has rather cryptic, but incredibly useful API, such as prepare() method or is_active(). This example in the documentation hints on the motivation behind it:

topological_sorter = TopologicalSorter()

# Add nodes to 'topological_sorter'...

topological_sorter.prepare()
while topological_sorter.is_active():
    for node in topological_sorter.get_ready():
        # Worker threads or processes take nodes to work on off the
        # 'task_queue' queue.
        task_queue.put(node)

    # When the work for a node is done, workers put the node in
    # 'finalized_tasks_queue' so we can get more nodes to work on.
    # The definition of 'is_active()' guarantees that, at this point, at
    # least one node has been placed on 'task_queue' that hasn't yet
    # been passed to 'done()', so this blocking 'get()' must (eventually)
    # succeed.  After calling 'done()', we loop back to call 'get_ready()'
    # again, so put newly freed nodes on 'task_queue' as soon as
    # logically possible.
    node = finalized_tasks_queue.get()
    topological_sorter.done(node)
Enter fullscreen mode Exit fullscreen mode

So graphlib is not a module just for sorting graphs, it's also a utility for running graphs of tasks in topological order, which is useful if your workloads have tasks depending on results of other tasks. Graphs are a great way to model this problem, and topological order is how you make sure tasks are processed in the right order.

One thing that is missing from the docs is asyncio example, which turns out to be quite easy to write. Since with asyncio you don't have to deal with thread-safety, you can get by without using queue for synchronizing threads or any other additional complexity of the sort.

We'll define a simplistic async node visitor function:

async def visit(node: str, sorter: TopologicalSorter):
    print(f"processing node {node}")
    sorter.done(node)
Enter fullscreen mode Exit fullscreen mode

In the real world this can be as complex as you'd like, as long as you're doing I/O bound work to reap the benefits of asyncio. The important bit is to call the sorter.done(node) in the end of the function to let the instance of TopologicalSorter know we're done with this node and we can progress onto the next.

Then we plug the visit function into our topologically ordered run:

sorter = TopologicalSorter(graph)

sorter.prepare()

while sorter.is_active():
    node_group = sorter.get_ready()

    if not node_group:
        # no nodes are ready yet, so we sleep for a bit
        await asyncio.sleep(0.25)
    else:
        tasks = set()
        for node in node_group:
            task = asyncio.create_task(visit(node, sorter))
            tasks.add(task)
            task.add_done_callback(tasks.discard)
Enter fullscreen mode Exit fullscreen mode

Full source code of a working script can be found in this gist.

One peculiar aspect of graphlib is the format of the graph the TopologicalSorter accepts as an argument -- it is in reverse order from your typical representation of a graph. E.g. if you have a graph like this A -> B -> C, normally you'd to represent it like this:

graph = {
  "A": ["B"],
  "B": ["C"],
  "C": [],
}
Enter fullscreen mode Exit fullscreen mode

but the TopologicalSorter wants this graph in the with edge direction reversed:

If the optional graph argument is provided it must be a dictionary representing a directed acyclic graph where the keys are nodes and the values are iterables of all predecessors of that node in the graph

So the right way to represent A -> B -> C for the TopologicalSorter is this:

graph = {
  "C": ["B"],
  "B": ["A"],
  "A": [],
}
Enter fullscreen mode Exit fullscreen mode

More info and a rather heated debate on this can be found here: https://bugs.python.org/issue46071.

Happy coding!

.
Terabox Video Player