Categories
async python

asyncio: real world coroutines

This is a first part of a longer series. Making more real-world examples of asyncio. “Hello world” type of examples are often very misleading about the real effort it takes to use some tool. asyncio examples you may find on the web are guilty as well. Most of them focus on simple happy path scenarios. In this article, I’ll be writing about working with coroutines, handling errors, retrying them automatically, and tracking execution progress. The next articles will explore the usage of more high-level tools like asyncio.Task and asyncio.TaskGroup.

The scenario

The scenario is as follows, we have 20 concurrent coroutines fetching a site, raising random errors, and retrying in case of errors. Execution progress will be tracked and reported to the terminal in some form. Wherever possible the code will be concurrent. If you are here just for the code, please go to the GitHub repo.

Worker

Please welcome our leading actor, the worker coroutine. Accept my apologies; this is a simplified example of an actual work. The code pulls data and throws a few random errors. It is simple but not at the level of “hello world” stuff online. The point is some errors are not recoverable, like RuntimeError.

async def worker(url: str, name: str, session: aiohttp.ClientSession) -> int:
    bad_luck: float = random.random()
    if bad_luck > 0.9:
        logging.warning("Status: failed, name: %s, bad luck: %.2f", name, bad_luck)
        raise random.choice(
            [aiohttp.ServerDisconnectedError, aiohttp.ServerTimeoutError, RuntimeError]
        )

    async with session.get(url) as response:
        logging.debug(
            "Status: %s, name: %s, bad luck: %.2f", response.status, name, bad_luck
        )
        return response.status

Code language: Python (python)

You could swap that function with any work you need to do, like write to DB, save a file, call an endpoint, etc. All of those examples will raise an error at some point. Some parts of those errors can be retried and processing will succeed. As you probably have seen already in your production code.

Supervisor

Workers can stop at any time, but we have important work to perform and have to supervise the execution. So the supervisor is introduced, inspired by the Erlang language. The design of the supervisor function is influenced by how the code runs. The supervisor runs a coroutine and handles errors.

async def supervisor(
    worker,
    url: str,
    name: str,
    client: aiohttp.ClientSession,
    retry: int = 0,
) -> int:
    try:
        return await worker(url, name, client)
    except (aiohttp.ServerDisconnectedError, aiohttp.ServerTimeoutError):
        retry += 1
        if retry < MAX_RETRIES:
            logging.warning("Retrying coroutine %s. Retry: %s", name, retry)
            return await supervisor(worker, url, name, client, retry)

        logging.warning("Retries exhausted for call args %s", (url, client, retry))
        return 0
    except Exception as e:
        logging.error("Irrecoverable error %r.", e)
        logging.error("Failed to finish coroutine %s.", name)
        return 0Code language: Python (python)

Runners

There are a couple of ways you can run coroutines. Each has different capabilities but they do not affect how the pair of worker and supervisor are written. All of the runners I present here run the code concurrently. What are runners? they are asyncio functions from the standard library.

asyncio.gather

The first attempt is with asyncio.gather function. The function takes in a sequence of awaitables and runs them concurrently. If any awaitables are coroutines they are scheduled as Task. Errors raised by coroutines can be raised or returned. I wanted to deal with exceptions in the supervisor to make decisions about what will happen so the flag is set to return_exceptions=False.

coros = [
    supervisor(worker, "https://klichx.dev", str(i), client)
    for i in range(0, 20)
]
try:
    res = await asyncio.gather(
        *coros,
        return_exceptions=False,
    )
except asyncio.CancelledError:
    logging.info("CANCEL")
    returnCode language: Python (python)

The code generates a list of coroutines, each item is a worker wrapped in supervisor. The list is given to asyncio.gather and then run concurrently. Transient exceptions are retried as per the supervisor’s design.

This approach is quick, and if you don’t care about exceptions and retries (you set return_exceptions=True) it may also be robust enough. You can’t track what is going on easily if you are not printing or reporting in some other way. The downside of this approach is the lack of timeout. One has to use asyncio.timeout to guard against hanging executions.

asyncio.as_completed

This one is interesting as it returns an iterable with results. So tracking progress becomes super easy as you iterate over it.

coros = [
    supervisor(worker, "https://klichx.dev", str(i), client)
    for i in range(0, 20)
]
try:
    res = []
    for i, coro in enumerate(asyncio.as_completed(coros)):
        res.append(await coro)
        print("*" * i, end="\r")
except asyncio.CancelledError:
    # can't cancel
    logging.info("CANCEL")
    returnCode language: Python (python)

Nothing fancy, and while it looks as if the coroutines are run sequentially they are in fact run concurrently. The loop makes it look wrong but reporting progress is super easy. The await in the loop may raise but here it is ok as I want to handle this in supervisor.

asyncio.wait

asyncio.wait is the last case. A useful feature of wait is that unfinished executions are returned in a list separate from successful ones.

coros = [
    asyncio.create_task(
        supervisor(worker, "https://klichx.dev", str(i), client)
    )
    for i in range(0, 20)
]
try:
    done, pending = await asyncio.wait(
        coros, timeout=ASYNCIO_TOTAL_TIMEOUT, 
        return_when=asyncio.ALL_COMPLETED
    )
    for t in done:
        logging.info("Task %s done", t.get_name())

    for t in pending:
        logging.info("Task %s pending", t.get_name())
except asyncio.CancelledError:
    # can't cancel
    logging.info("CANCEL")
    returnCode language: Python (python)

done is what is done, while pending (name may be a bit off, taken from the docs actually) is what has not been finished successfully. In this case, asyncio.wait returns when all is done. This behaviour can be configured to return on a first success or on a first failure, achieved via return_when flag.

The repo

The full code can be found in the repo. Feel free to use it.

Out of scope

You may have noticed there are only coroutines in this blog post. You are correct there is a big chunk of asyncio that is left out. There will be additional blog posts with tasks and task groups. I’m also planning to cover logging in asynchronous code after a discussion with my coworker.

I hope this was an interesting read and you have learned something new. I learned a lot.

Peace

Leave a Reply

Your email address will not be published. Required fields are marked *