Categories
async python

asyncio: real world coroutines

This is a first part of a longer series. Making more real-world examples of asyncio. “Hello world” type of examples are often very misleading about the real effort it takes to use some tool. asyncio examples you may find on the web are guilty as well. Most of them focus on simple happy path scenarios. In this article, I’ll be writing about working with coroutines, handling errors, retrying them automatically, and tracking execution progress. The next articles will explore the usage of more high-level tools like asyncio.Task and asyncio.TaskGroup.

Categories
async python

asyncio’s event loop

Async is not new in Python but I was not familiar with the concept. I have used it but without fully grasping the idea, and that smells disaster. This article and a whole journey I went through has been sparked by one question. Why you shouldn’t run blocking code on the event loop? The answer is simple as it will block the whole thing. I kinda knew that as everyone says it, so you can’t miss it really. But why is it? How does it work? If you would like to know read on.

Categories
async python

Background tasks on the cheap

When integrating with third party API’s you need to make sure that your requests reach the third party. In case of issues on their end you want to retry and best not to interrupt the flow of your application or even worse pass the information about such issues to the end user (like leaking 503 errors).

Most popular solution is to use a background task and there are tools for helping with that: celery, python-rq, or dramatiq. They do the job of executing the code in the background but they require some extra infrastructure to make it work, plus all the dependencies they are bringing in. I have used them all in the past with great success but most recently decided to write a basic background task myself.

Why? As I mentioned earlier all of them require extra infrastructure in a form of a broker that most of the time is redis, this implies changes to deployment, requires additional resources, makes the stack more complex. The scope of what I had to do just did not justify bringing in this whole baggage. I needed to retry calls to AWS Glue service in case we maxed out capacity. Since the Glue job we are executing can take a couple minutes our calls to AWS Glue had to be pushed into the background. I’ll give you the code and summarize what it does. By no means this code is perfect but it works 🙂

# background.py
import threading
from queue import Queue

task_queue = Queue()
worker_thread = None


def enqueue_task(task):
    task_queue.put_nowait(task)

    global worker_thread
    if not worker_thread:
        worker_thread = _run_worker_thread()


def _process_tasks(task_queue):
    while task_queue.qsize():
        task = task_queue.get()
        try:
            print(f"Do stuff with task: {task}")
        except Exception as e:
            task_queue.put(task)

    global worker_thread
    worker_thread = None


def _run_worker_thread():
    t = threading.Thread(target=_process_tasks, args=(task_queue,))
    t.start()
    return t

Public interface of this small background module is one function enqueue_task. When called task is put on the queue and thread is started. Each subsequent call will enqueue task and thread will be closed after it processed all of them.

I find this simple and flexible enough to handle communication with flaky services or services with usage caps. Since this can not be scaled it has limited usage, but HTTP calls are just fine. This code had been inspired by one of the talks of Raymond Hettinger regarding concurrency and queue module.