This is a first part of a longer series. Making more real-world examples of asyncio. “Hello world” type of examples are often very misleading about the real effort it takes to use some tool. asyncio examples you may find on the web are guilty as well. Most of them focus on simple happy path scenarios. In this article, I’ll be writing about working with coroutines, handling errors, retrying them automatically, and tracking execution progress. The next articles will explore the usage of more high-level tools like asyncio.Task
and asyncio.TaskGroup
.
Tag: async
Async is not new in Python but I was not familiar with the concept. I have used it but without fully grasping the idea, and that smells disaster. This article and a whole journey I went through has been sparked by one question. Why you shouldn’t run blocking code on the event loop? The answer is simple as it will block the whole thing. I kinda knew that as everyone says it, so you can’t miss it really. But why is it? How does it work? If you would like to know read on.
When integrating with third party API’s you need to make sure that your requests reach the third party. In case of issues on their end you want to retry and best not to interrupt the flow of your application or even worse pass the information about such issues to the end user (like leaking 503 errors).
Most popular solution is to use a background task and there
are tools for helping with that: celery
, python-rq
,
or dramatiq
.
They do the job of executing the code in the background but they
require some extra infrastructure to make it work, plus all the
dependencies they are bringing in. I have used them all in the past with great
success but most recently decided to write a basic background task myself.
Why? As I mentioned earlier all of them require extra infrastructure in a form of a broker
that most of the time is redis
, this implies changes to deployment, requires additional resources,
makes the stack more complex.
The scope of what I had to do just did not justify bringing in this whole baggage.
I needed to retry calls to AWS Glue service in case we maxed out capacity. Since the Glue job we
are executing can take a couple minutes our calls to AWS Glue had to be pushed into the background.
I’ll give you the code and summarize what it does. By no means this code is perfect but it works 🙂
# background.py
import threading
from queue import Queue
task_queue = Queue()
worker_thread = None
def enqueue_task(task):
task_queue.put_nowait(task)
global worker_thread
if not worker_thread:
worker_thread = _run_worker_thread()
def _process_tasks(task_queue):
while task_queue.qsize():
task = task_queue.get()
try:
print(f"Do stuff with task: {task}")
except Exception as e:
task_queue.put(task)
global worker_thread
worker_thread = None
def _run_worker_thread():
t = threading.Thread(target=_process_tasks, args=(task_queue,))
t.start()
return t
Public interface of this small background
module is one function enqueue_task
.
When called task is put on the queue and thread is started. Each subsequent call
will enqueue task and thread will be closed after it processed all of them.
I find this simple and flexible enough to handle communication with flaky services or services with usage caps. Since this can not be scaled it has limited usage, but HTTP calls are just fine. This code had been inspired by one of the talks of Raymond Hettinger regarding concurrency and queue module.