Categories
async python

asyncio’s event loop

Async is not new in Python but I was not familiar with the concept. I have used it but without fully grasping the idea, and that smells disaster. This article and a whole journey I went through has been sparked by one question. Why you shouldn’t run blocking code on the event loop? The answer is simple as it will block the whole thing. I kinda knew that as everyone says it, so you can’t miss it really. But why is it? How does it work? If you would like to know read on.

I’m going to start with a bit of theory that will explain the concepts behind event loop and concurrency model supported by Python’s event loop. Next we will dive into the code and I’ll explain step by step what is going on based on a silly little example.

Event loop design pattern

Event loop is a process that runs in a single thread. It’s responsibilities are to manage tasks and to schedule them. You can imagine a while True loop that executes incoming functions. Important thing is that it runs in a single thread. Python’s event loop concurrency model is cooperative multitasking (non-preemptive multitasking) model. Main feature is the programs run in that model must voluntarily yield control periodically. This way other programs may start or resume their execution.

With such knowledge it is safe to say that any blocking function run on the event loop will take hold of the thread until it finishes. Preventing the event loop from scheduling anything. In a really bad scenario, like blocking forever, the program will stop responding.

That wasn’t to difficult to understand and for some readers it may be enough for an answer to the main question. If you are up for a more practical explanation of how this all works in Python keep reading.

The internals

Best to talk about how it is done is with an example code. I have prepared a small and silly example of code using asyncio basics. It is not complex and I’ll explain each line. From top to bottom as Python interpreter would do it. Code can be run as is, copy paste and execute.

import asyncio
import logging

logging.basicConfig(format="%(asctime)s %(message)s", level=logging.INFO)


async def fast(times: int):
    """Simulating fast running coroutine"""
    for i in range(times):
        logging.info("Fast counting, %s", i)
        await asyncio.sleep(0)


async def slow(times: int):
    """Simluating slow running coroutine"""
    for i in range(times):
        logging.info("Slow counting, %s", i)
        await asyncio.sleep(2.0)


async def main():
    await asyncio.gather(fast(3), slow(3))


asyncio.run(main())
Code language: Python (python)

After you skimmed the code let’s look at the first concept.

Coroutine

async def fast(times: int):Code language: Python (python)

There are 3 coroutines in the example. What is a coroutine? Coroutine is a function, a special function as its execution may be suspended or resumed. If you are thinking about generators at this point, you are correct, they are similar. If you read PEPs 342, 380, and 492 you will know the history of asyncio and its roots. Coroutines run a bit differently to regular functions as you can’t call them directly. You have to use event loop for that.

Running coroutine

async def main():
    await asyncio.gather(fast(3), slow(3))


asyncio.run(main())
Code language: Python (python)

This is the way to execute a coroutine. It is a bit odd when compared to regular functions. Reason for this is that coroutine run on event loop and highlighted code takes care of that. The call to run() is in fact a wrapper around asyncio.Runner class. It will always create a new event loop when called, but will raise if it is run from existing loop. Normally you probably won’t do it though, web frameworks like FastAPI deal with this for you. Interesting bit is that in FastAPI everything runs async, but blocking code is run in a thread pool.

Running coroutines concurrently

async def main():
    await asyncio.gather(fast(3), slow(3))
Code language: Python (python)

Here function gather is used to run coroutines and collect their results. It is one of two recommended ways of running coroutines concurrently. Second one is using a new thing from Python 3.11 called TaskGroup.

Under the hood gather is going through a list of provided coroutines and wrapping each one in a Task. Tasks are automatically scheduled to run on the event loop. At the end we are provided with a Future holding all Tasks that will resolve at some point. In this example I’m not bothered by the result as the coroutines print their results.

gather returns a list of results but you have to be careful about handling exceptions as by default everything is propagated up.

Tasks

We reached a place where it is good time to describe what task is. To be honest I spent a lot of time wrapping my head around difference between Future and Task.

Docstring of Task class says literally that it’s a coroutine wrapped in a Future. Task is a more high level construct than the Future is. Future is a deferred result, or an interface for delivering such result (or failure) in the future. Task is a specialised Future created to take care of handling a coroutine’s lifecycle. Task will give you a way to check result of a coroutine execution and its result. It is done by following Future’s interface, with some exceptions. If you are not writing a framework you will most probably use more Tasks than Futures.

I have written that tasks are scheduled automatically to run on the event loop. You may see for yourself, below is a bit from Task.__init__.

    def __init__(self, coro, *, loop=None, name=None, context=None):
        super().__init__(loop=loop)
        ...
        self._loop.call_soon(self.__step, context=self._context)
        _register_task(self)
Code language: PHP (php)

Python’s event loop

We are at the point where asyncio.gather runs coroutines on the event loop. It is time to delve into Python’s event loop and see how it is constructed. At the beginning I have mentioned that event loop is single threaded, it is also a main entry to the program. Two main building blocks of the event loop are as you can imagine, a for loop and collection of events. The name speaks volumes here. I’m not even kidding about the loop. The for loop is located in BaseEventLoop._run_once method that is called when loop starts. Below is a snippet from the method where I have cut out the debug code to make it more obvious what you are looking at.

    ntodo = len(self._ready)
    for i in range(ntodo):
        handle = self._ready.popleft()
        if handle._cancelled:
            continue
        if self._debug:
            ...
        else:
            handle._run()
Code language: Python (python)

The loop is not a mistery any more. What is left to show is a list of events. You may have noticed it in the snippet but here it goes. It is right here

ntodo = len(self._ready)
Code language: Python (python)

It is not a regular list though but a deque. The list is modified during the event loop execution. Each coroutine can schedule new coroutines or itself. deque is perfect being double ended as it allows for fast removal of objects from the head and adding at the tail. Brilliant.org has a great article on double ended queues.

The algorithm is very simple. The event loop goes through a collection of coroutines stored under self._ready and it executes them. Coroutines can schedule new coroutines. Cycle repeats forever or until complete.

Coming back to our example asyncio.gather creates tasks which are scheduled to be run. Each task uses BaseEventLoop.call_soon from the event loop. The public method does some validation and dispatches to private one BaseEventLoop._call_soon.

    def _call_soon(self, callback, args, context):
        handle = events.Handle(callback, args, self, context)
        if handle._source_traceback:
            del handle._source_traceback[-1]
        self._ready.append(handle)
        return handle
Code language: Python (python)

Here is the place where our coroutines, converted to tasks are appended to a deque that is stored at self._ready on the event loop. In the next iteration of event loop’s for loop they will be executed.

Steps

There is one last thing to explain, and it’s the concurrency. When you run the example code from the beginning of the article you will get this.

2022-12-22 21:23:15,687 Fast counting, 0
2022-12-22 21:23:15,687 Slow counting, 0
2022-12-22 21:23:15,687 Fast counting, 1
2022-12-22 21:23:15,687 Fast counting, 2
2022-12-22 21:23:17,688 Slow counting, 1
2022-12-22 21:23:19,690 Slow counting, 2Code language: Shell Session (shell)

“Slow counting” is not really blocking other coroutines. There is a sleep in the code, it is a async sleep but still. What is going on under the hood is Python splits execution of the coroutines into smaller steps. You already know that asyncio.gather creates tasks and each task is run automatically on the event loop. What you don’t know that in fact it is the first step of the coroutine that is scheduled, as shown in the example below and under the link.

class Task(futures._PyFuture):                                

    def __init__(self, coro, *, loop=None, name=None, context=None):
        super().__init__(loop=loop)
        ...
        self._loop.call_soon(self.__step, context=self._context)
        _register_task(self)
Code language: Python (python)

The coroutine is not being run here but rather the method to split coroutine into smaller bits. I hope all pieces are falling into their places right now. Coroutines are based on generators, and they can be paused and resumed as generators. And that is exactly what is BaseEventLoop.__step doing. Coroutine is being split into steps marked with await calls. Each time await is found in the coroutine the control of the event loop is yielded back, and other coroutine runs until it reaches end or there is another await. This is how concurrency is achieved. The example output shows that when asyncio.sleep is awaited the other coroutine runs.

One last thing. sleep function is frowned upon, but not in asyncio world. Especially asyncio.sleep(0) which is a helper function as described in the source code.

@types.coroutine
def __sleep0():
    """Skip one event loop run cycle.

    This is a private helper for 'asyncio.sleep()', used
    when the 'delay' is set to 0.  It uses a bare 'yield'
    expression (which Task.__step knows how to handle)
    instead of creating a Future object.
    """
    yield


async def sleep(delay, result=None):
    """Coroutine that completes after a given time (in seconds)."""
    if delay <= 0:
        await __sleep0()
        return result
    ...
Code language: Python (python)

You could use this to split a coroutine into steps.

Summary

I hope that when you reached this part you have a much better understanding of how asyncio and the event loop works in Python. I do.

The blogpost took a lot of effort to write, the most important thing to remember is that I had to learn how it works before I could try to explain it to others and to future self.

Leave a Reply

Your email address will not be published. Required fields are marked *