Categories
async python

Background tasks on the cheap

When integrating with third party API’s you need to make sure that your requests reach the third party. In case of issues on their end you want to retry and best not to interrupt the flow of your application or even worse pass the information about such issues to the end user (like leaking 503 errors).

Most popular solution is to use a background task and there are tools for helping with that: celery, python-rq, or dramatiq. They do the job of executing the code in the background but they require some extra infrastructure to make it work, plus all the dependencies they are bringing in. I have used them all in the past with great success but most recently decided to write a basic background task myself.

Why? As I mentioned earlier all of them require extra infrastructure in a form of a broker that most of the time is redis, this implies changes to deployment, requires additional resources, makes the stack more complex. The scope of what I had to do just did not justify bringing in this whole baggage. I needed to retry calls to AWS Glue service in case we maxed out capacity. Since the Glue job we are executing can take a couple minutes our calls to AWS Glue had to be pushed into the background. I’ll give you the code and summarize what it does. By no means this code is perfect but it works πŸ™‚

# background.py
import threading
from queue import Queue

task_queue = Queue()
worker_thread = None


def enqueue_task(task):
    task_queue.put_nowait(task)

    global worker_thread
    if not worker_thread:
        worker_thread = _run_worker_thread()


def _process_tasks(task_queue):
    while task_queue.qsize():
        task = task_queue.get()
        try:
            print(f"Do stuff with task: {task}")
        except Exception as e:
            task_queue.put(task)

    global worker_thread
    worker_thread = None


def _run_worker_thread():
    t = threading.Thread(target=_process_tasks, args=(task_queue,))
    t.start()
    return t

Public interface of this small background module is one function enqueue_task. When called task is put on the queue and thread is started. Each subsequent call will enqueue task and thread will be closed after it processed all of them.

I find this simple and flexible enough to handle communication with flaky services or services with usage caps. Since this can not be scaled it has limited usage, but HTTP calls are just fine. This code had been inspired by one of the talks of Raymond Hettinger regarding concurrency and queue module.

Categories
python

Celery tasks, states and results

The subject of Celery task results comes back every now and then. It would make a really good post, with nice examples. So here we go!

If you don’t know what Celery is:

Celery is a simple, flexible, and reliable distributed system to process vast amounts of messages, while providing operations with the tools required to maintain such a system. It’s a task queue with focus on real-time processing, while also supporting task scheduling.

You can read more by going to their documentation.

States

Celery comes with a few states to start with. These states tell you what is happening to a task and are selected from this list most of the time (unless you have custom states, but I’ll cover this later):

  • PENDING
  • STARTED
  • SUCCESS
  • FAILURE
  • RETRY
  • REVOKED

Such defaults allow you to go pretty far with tracking your tasks. You can even deduct transitions based on current state of the tasks: a FAILURE state means that a task went through PENDING and STARTED states. Here’s an example of how that works. Let’s take a basic task from Celery’s own tutorial. I’m using RabbitMQ of this docker image as my broker.

from celery import Celery

app = Celery('task', broker='amqp://guest:guest@localhost:5672//')

# STARTED state is not enabled by default so we flip it on
app.conf.update(task_track_started=True)


@app.task(bind=True)
def add(self, x, y):
    # we need to sleep to show STARTED state
    import time
    time.sleep(10)
    return x + y

With that we can go through some of the states.

majki@snakepit ~/projects/blog/pow/celery-states
% pipenv run python
Python 3.4.8 (default, Mar 19 2018, 21:12:05)
[GCC 5.4.0 20160609] on linux
Type "help", "copyright", "credits" or "license" for more information.
>>> from task import add
>>> r = add.delay(2, 3)  # worker is disabled so we can have PENDING state
>>> r.status
'PENDING'  # worker is enabled after this
>>> r.status
'STARTED'
>>> r.status
'SUCCESS'
>>> r = add.delay(2, 'a')
>>> r.status
'STARTED'
>>> r.state
'FAILURE'

It is pretty useful and probably covers a lot of use cases, but there’s even more to discover about Celery’s states.

Custom states

You can also define your own states if you need to. Let’s modify our example a bit to include a new state called ‘GOING_TO_SLEEP’.

from celery import Celery

app = Celery('task', broker='amqp://guest:guest@localhost:5672//')

# STARTED state is not enabled by default so we flip it on
app.conf.update(task_track_started=True)


@app.task(bind=True)
def add(self, x, y):
    self.update_state(state='GOING_TO_SLEEP')
    import time
    time.sleep(10)
    return x + y

Now let’s see how this works.

majki@snakepit ~/projects/blog/pow/celery-states
% pipenv run python
Python 3.4.8 (default, Mar 19 2018, 21:12:05)
[GCC 5.4.0 20160609] on linux
Type "help", "copyright", "credits" or "license" for more information.
>>> from task import add
>>> r = add.delay(2, 3)
>>> r.status
GOING_TO_SLEEP'
>>> r.status
'SUCCESS'

You can go bananas with such a feature and be really pedantic on your tasks state flow. It may be beneficial for some really complex workflows – if you would like to monitor it, so do it.

Results storage

Now let’s tackle the other concern. How do you save a task result? All my examples up to this point were in REPL, so all results are gone as soon as you close it. Thankfully authors of Celery thought about it and provided us with such functionality. Celery supports multiple type of storages, making almost everyone happy. I’m not done with my task example, just needs a little of tweaking. To be honest, not much is needed to have task results persisted.

from celery import Celery

app = Celery('task', broker='amqp://guest:guest@localhost:5672//')
app.conf.update(task_track_started=True,
                result_backend='file:///var/celery/results')


@app.task(bind=True)
def add(self, x, y):
    import time
    time.sleep(10)
    return x + y

According to my configuration, all the task results will be saved under /var/celery/results directory. I have picked file-system backend as it is easiest to show.

majki@snakepit ~/projects/blog/pow/celery-states
% pipenv run python
Python 3.4.8 (default, Mar 19 2018, 21:12:05)
[GCC 5.4.0 20160609] on linux
Type "help", "copyright", "credits" or "license" for more information.
>>> from task import add
>>> add.delay(1,1)
<AsyncResult: be19e446-64a6-4fa9-b1a4-05feeb4fbec2>
>>> add.delay(1,1)
<AsyncResult: 1dc82626-9731-43ea-b4ea-6f237143dd42>
>>> add.delay(1,1)
<AsyncResult: badbe577-9314-4a92-91b8-80d071a5f8b5>
>>> add.delay(1,1)
<AsyncResult: c2da2e98-7a05-411f-acd5-7ead99c12c4e>

Now we can have a look at our results backend and see if results are stored. I had been keeping results of all my tasks while writing this post so there is a lot of files. The last four are the ones that you can see in the output of REPL above.

majki@snakepit ~/projects/blog
% ls -thor  # thanks firemark!
total 236K
drwxr-xr-x 6 majki 4,0K kwi 10 20:46 cryogen
drwxr-xr-x 3 majki 4,0K kwi 11 20:28 pow
-rw-rw-r-- 1 majki  120 kwi 11 21:10 celery-task-meta-f730b12b-25ee-46d2-a7d5-eabda0ab14ef
-rw-rw-r-- 1 majki  120 kwi 11 21:11 celery-task-meta-6c7a3fc1-882b-4ddd-be9e-bd1431099eae
-rw-rw-r-- 1 majki  120 kwi 11 21:11 celery-task-meta-f5a73a65-6bda-47b8-a04e-34d1e5b7b81d
-rw-rw-r-- 1 majki  120 kwi 11 21:12 celery-task-meta-c012b4a7-362f-4d05-ac65-9dc398ac514e
-rw-rw-r-- 1 majki  120 kwi 11 21:14 celery-task-meta-c0654b79-65b0-4365-9a45-6b3d248d559d
-rw-rw-r-- 1 majki  120 kwi 11 21:14 celery-task-meta-f2d51878-86db-4d21-bc36-c651e63bdda3
-rw-rw-r-- 1 majki  120 kwi 11 21:14 celery-task-meta-ff9071d1-5ac1-4f81-9da2-29b4f706f5d8
-rw-rw-r-- 1 majki  120 kwi 11 21:15 celery-task-meta-8691b6f0-fa8f-401b-9133-0ce8308e34c9
-rw-rw-r-- 1 majki  120 kwi 11 21:18 celery-task-meta-9d312364-ed4c-49f2-a6eb-7e3b6a9eeaef
-rw-rw-r-- 1 majki  120 kwi 11 21:19 celery-task-meta-31ae7d06-c292-48c8-b526-eebeb021bb90
-rw-rw-r-- 1 majki  120 kwi 11 21:22 celery-task-meta-39d6126d-57f9-4a2b-a09b-68475076ec08
-rw-rw-r-- 1 majki  120 kwi 11 21:39 celery-task-meta-264975a3-b8d0-4855-a19a-fc64e1384bfe
-rw-rw-r-- 1 majki  760 kwi 11 21:46 celery-task-meta-b7d429d2-9aaa-4af5-914a-aba1321676cb
-rw-rw-r-- 1 majki  120 kwi 11 21:57 celery-task-meta-fa4f6d6a-aeb2-416b-9efc-9e7947ef9550
-rw-rw-r-- 1 majki  120 kwi 11 22:03 celery-task-meta-f5774f4f-0d51-4861-af95-a1a8ff94d1b1
-rw-rw-r-- 1 majki  120 kwi 11 22:18 celery-task-meta-be19e446-64a6-4fa9-b1a4-05feeb4fbec2
-rw-rw-r-- 1 majki  120 kwi 11 22:26 celery-task-meta-1dc82626-9731-43ea-b4ea-6f237143dd42
-rw-rw-r-- 1 majki  120 kwi 11 22:26 celery-task-meta-badbe577-9314-4a92-91b8-80d071a5f8b5
-rw-rw-r-- 1 majki  120 kwi 11 22:26 celery-task-meta-c2da2e98-7a05-411f-acd5-7ead99c12c4e

majki@snakepit ~/projects/blog
% cat celery-task-meta-c2da2e98-7a05-411f-acd5-7ead99c12c4e| python -m json.tool
{
    "children": [],
    "result": 2,
    "status": "SUCCESS",
    "task_id": "c2da2e98-7a05-411f-acd5-7ead99c12c4e",
    "traceback": null
}

There you have it, all is stored "safely" (it’s my laptop πŸ™‚ ) and can be viewed if required.

Hopefully this puts all concerns aside regarding task states, task results, and sheds a bit of light on Celery’s extensive API.