Async is not new in Python but I was not familiar with the concept. I have used it but without fully grasping the idea, and that smells disaster. This article and a whole journey I went through has been sparked by one question. Why you shouldn’t run blocking code on the event loop? The answer is simple as it will block the whole thing. I kinda knew that as everyone says it, so you can’t miss it really. But why is it? How does it work? If you would like to know read on.
I’m going to start with a bit of theory that will explain the concepts behind event loop and concurrency model supported by Python’s event loop. Next we will dive into the code and I’ll explain step by step what is going on based on a silly little example.
Event loop design pattern
Event loop is a process that runs in a single thread. It’s responsibilities are to manage tasks and to schedule them. You can imagine a
while True loop that executes incoming functions. Important thing is that it runs in a single thread. Python’s event loop concurrency model is cooperative multitasking (non-preemptive multitasking) model. Main feature is the programs run in that model must voluntarily yield control periodically. This way other programs may start or resume their execution.
With such knowledge it is safe to say that any blocking function run on the event loop will take hold of the thread until it finishes. Preventing the event loop from scheduling anything. In a really bad scenario, like blocking forever, the program will stop responding.
That wasn’t to difficult to understand and for some readers it may be enough for an answer to the main question. If you are up for a more practical explanation of how this all works in Python keep reading.
Best to talk about how it is done is with an example code. I have prepared a small and silly example of code using
asyncio basics. It is not complex and I’ll explain each line. From top to bottom as Python interpreter would do it. Code can be run as is, copy paste and execute.
import asyncio import logging logging.basicConfig(format="%(asctime)s %(message)s", level=logging.INFO) async def fast(times: int): """Simulating fast running coroutine""" for i in range(times): logging.info("Fast counting, %s", i) await asyncio.sleep(0) async def slow(times: int): """Simluating slow running coroutine""" for i in range(times): logging.info("Slow counting, %s", i) await asyncio.sleep(2.0) async def main(): await asyncio.gather(fast(3), slow(3)) asyncio.run(main())Code language: Python (python)
After you skimmed the code let’s look at the first concept.
async def fast(times: int):Code language: Python (python)
There are 3 coroutines in the example. What is a coroutine? Coroutine is a function, a special function as its execution may be suspended or resumed. If you are thinking about generators at this point, you are correct, they are similar. If you read PEPs 342, 380, and 492 you will know the history of
asyncio and its roots. Coroutines run a bit differently to regular functions as you can’t call them directly. You have to use event loop for that.
async def main(): await asyncio.gather(fast(3), slow(3)) asyncio.run(main())Code language: Python (python)
This is the way to execute a coroutine. It is a bit odd when compared to regular functions. Reason for this is that coroutine run on event loop and highlighted code takes care of that. The call to
run() is in fact a wrapper around
asyncio.Runner class. It will always create a new event loop when called, but will raise if it is run from existing loop. Normally you probably won’t do it though, web frameworks like FastAPI deal with this for you. Interesting bit is that in FastAPI everything runs async, but blocking code is run in a thread pool.
Running coroutines concurrently
async def main(): await asyncio.gather(fast(3), slow(3))Code language: Python (python)
gather is used to run coroutines and collect their results. It is one of two recommended ways of running coroutines concurrently. Second one is using a new thing from Python 3.11 called
Under the hood
gather is going through a list of provided coroutines and wrapping each one in a Task. Tasks are automatically scheduled to run on the event loop. At the end we are provided with a Future holding all Tasks that will resolve at some point. In this example I’m not bothered by the result as the coroutines print their results.
gather returns a list of results but you have to be careful about handling exceptions as by default everything is propagated up.
Task class says literally that it’s a coroutine wrapped in a Future. Task is a more high level construct than the Future is. Future is a deferred result, or an interface for delivering such result (or failure) in the future. Task is a specialised Future created to take care of handling a coroutine’s lifecycle. Task will give you a way to check result of a coroutine execution and its result. It is done by following Future’s interface, with some exceptions. If you are not writing a framework you will most probably use more Tasks than Futures.
I have written that tasks are scheduled automatically to run on the event loop. You may see for yourself, below is a bit from
def __init__(self, coro, *, loop=None, name=None, context=None): super().__init__(loop=loop) ... self._loop.call_soon(self.__step, context=self._context) _register_task(self)Code language: PHP (php)
Python’s event loop
We are at the point where
asyncio.gather runs coroutines on the event loop. It is time to delve into Python’s event loop and see how it is constructed. At the beginning I have mentioned that event loop is single threaded, it is also a main entry to the program. Two main building blocks of the event loop are as you can imagine, a for loop and collection of events. The name speaks volumes here. I’m not even kidding about the loop. The
for loop is located in
_run_once method that is called when loop starts. Below is a snippet from the method where I have cut out the debug code to make it more obvious what you are looking at.
ntodo = len(self._ready) for i in range(ntodo): handle = self._ready.popleft() if handle._cancelled: continue if self._debug: ... else: handle._run()Code language: Python (python)
The loop is not a mistery any more. What is left to show is a list of events. You may have noticed it in the snippet but here it goes. It is right here
ntodo = len(self._ready)Code language: Python (python)
It is not a regular list though but a deque. The list is modified during the event loop execution. Each coroutine can schedule new coroutines or itself.
deque is perfect being double ended as it allows for fast removal of objects from the head and adding at the tail. Brilliant.org has a great article on double ended queues.
The algorithm is very simple. The event loop goes through a collection of coroutines stored under
self._ready and it executes them. Coroutines can schedule new coroutines. Cycle repeats forever or until complete.
Coming back to our example
asyncio.gather creates tasks which are scheduled to be run. Each task uses
call_soon from the event loop. The public method does some validation and dispatches to private one
def _call_soon(self, callback, args, context): handle = events.Handle(callback, args, self, context) if handle._source_traceback: del handle._source_traceback[-1] self._ready.append(handle) return handleCode language: Python (python)
Here is the place where our coroutines, converted to tasks are appended to a deque that is stored at
self._ready on the event loop. In the next iteration of event loop’s for loop they will be executed.
There is one last thing to explain, and it’s the concurrency. When you run the example code from the beginning of the article you will get this.
2022-12-22 21:23:15,687 Fast counting, 0 2022-12-22 21:23:15,687 Slow counting, 0 2022-12-22 21:23:15,687 Fast counting, 1 2022-12-22 21:23:15,687 Fast counting, 2 2022-12-22 21:23:17,688 Slow counting, 1 2022-12-22 21:23:19,690 Slow counting, 2Code language: Shell Session (shell)
“Slow counting” is not really blocking other coroutines. There is a sleep in the code, it is a async sleep but still. What is going on under the hood is Python splits execution of the coroutines into smaller steps. You already know that
asyncio.gather creates tasks and each task is run automatically on the event loop. What you don’t know that in fact it is the first step of the coroutine that is scheduled, as shown in the example below and under the link.
class Task(futures._PyFuture): def __init__(self, coro, *, loop=None, name=None, context=None): super().__init__(loop=loop) ... self._loop.call_soon(self.__step, context=self._context) _register_task(self)Code language: Python (python)
The coroutine is not being run here but rather the method to split coroutine into smaller bits. I hope all pieces are falling into their places right now. Coroutines are based on generators, and they can be paused and resumed as generators. And that is exactly what is
doing. Coroutine is being split into steps marked with
await calls. Each time
await is found in the coroutine the control of the event loop is yielded back, and other coroutine runs until it reaches end or there is another
await. This is how concurrency is achieved. The example output shows that when
asyncio.sleep is awaited the other coroutine runs.
One last thing.
sleep function is frowned upon, but not in asyncio world. Especially
asyncio.sleep(0) which is a helper function as described in the source code.
def __sleep0(): """Skip one event loop run cycle. This is a private helper for 'asyncio.sleep()', used when the 'delay' is set to 0. It uses a bare 'yield' expression (which Task.__step knows how to handle) instead of creating a Future object. """ yield async def sleep(delay, result=None): """Coroutine that completes after a given time (in seconds).""" if delay <= 0: await __sleep0() return result ...Code language: Python (python)
You could use this to split a coroutine into steps.
I hope that when you reached this part you have a much better understanding of how
asyncio and the event loop works in Python. I do.
The blogpost took a lot of effort to write, the most important thing to remember is that I had to learn how it works before I could try to explain it to others and to future self.