Categories
async python

asyncio: real world coroutines

This is a first part of a longer series. Making more real-world examples of asyncio. “Hello world” type of examples are often very misleading about the real effort it takes to use some tool. asyncio examples you may find on the web are guilty as well. Most of them focus on simple happy path scenarios. In this article, I’ll be writing about working with coroutines, handling errors, retrying them automatically, and tracking execution progress. The next articles will explore the usage of more high-level tools like asyncio.Task and asyncio.TaskGroup.

Categories
async python

asyncio’s event loop

Async is not new in Python but I was not familiar with the concept. I have used it but without fully grasping the idea, and that smells disaster. This article and a whole journey I went through has been sparked by one question. Why you shouldn’t run blocking code on the event loop? The answer is simple as it will block the whole thing. I kinda knew that as everyone says it, so you can’t miss it really. But why is it? How does it work? If you would like to know read on.

Categories
azure python

Azure Application Insights, FastAPI, and tracing

Debugging is difficult, what’s even more difficult, debugging production apps. Live production apps.

There are tools designed for this purpose. Azure has Application Insights, product that makes retracing history of events easier. When setup correctly you may go from a http request down to a Db call with all the query arguments. Pretty useful and definitely more convenient than sifting through log messages.

Application Insights console with end-to-end transaction details

Here you can see the exact query that had been executed on the database.

Application Insights console with SQL query details

You may also see every log related to a particular request in Log Analytics.

Application Insights logs with a query displaying only relevant logs

Improving your work life like this is pretty simple. Everything here is done using opencensus and it’s extensions. Opencensus integrates with Azure pretty nicely. First thing to do is to install required dependencies.

# pip 
pip install opencensus-ext-azure opencensus-ext-logging opencensus-ext-sqlalchemy opencensus-ext-requests

# pipenv

pipenv install opencensus-ext-azure opencensus-ext-logging opencensus-ext-sqlalchemy opencensus-ext-requests

# poetry

poetry add opencensus-ext-azure opencensus-ext-logging opencensus-ext-sqlalchemy opencensus-ext-requests

Next step is to activate them by including a couple of lines in your code. Here I activate 3 extensions, logging, requests, and sqlalchemy. Here is a list of other official extensions.

import logging

from opencensus.trace import config_integration
from opencensus.ext.azure.log_exporter import AzureLogHandler

logger = logging.getLogger(__name__)
config_integration.trace_integrations(["logging", "requests", "sqlalchemy"])

handler = AzureLogHandler(
    connection_string="InstrumentationKey=YOUR_KEY"
)
handler.setFormatter(logging.Formatter("%(traceId)s %(spanId)s %(message)s"))
logger.addHandler(handler)

One last thing is a middleware that will instrument every request. This code is taken from Microsoft’s documentation.

@app.middleware("http")
async def middlewareOpencensus(request: Request, call_next):
    tracer = Tracer(
        exporter=AzureExporter(
            connection_string="InstrumentationKey=YOUR_KEY"
        ),
        sampler=ProbabilitySampler(1.0),
    )
    with tracer.span("main") as span:
        span.span_kind = SpanKind.SERVER

        response = await call_next(request)

        tracer.add_attribute_to_current_span(
            attribute_key=HTTP_STATUS_CODE, attribute_value=response.status_code
        )
        tracer.add_attribute_to_current_span(
            attribute_key=HTTP_URL, attribute_value=str(request.url)
        )

    return response

You are done πŸ™‚ you will not loose information on what is going on in the app. You will be quicker in finding problems and resolving them. Life’s good now.

Categories
aws cdk devops python

Deploy Sagemaker Endpoint using AWS CDK

I was looking for a way to deploy a custom model to Sagemaker. Unfortunately, my online searches failed to find anything that was not using Jupiter notebooks. I like them but this way of deploying models is not a reproducible way nor it is scalable.

After a couple of hours of looking, I decided to do it myself. Here comes a recipe for deploying a custom model to Sagemaker using AWS CDK.

The following steps assume you have knowledge of CDK and Sagemaker. I’ll try to explain as much as I can but if anything is unclear please refer to the docs.

Steps

  1. Prepare containerised application serving your model.
  2. Create Sagemaker model.
  3. Create Sagemaker Endpoint configuration.
  4. Deploy Sagemaker Endpoint.

Unfortunately, AWS CDK does not support higher-level constructs for Sagemaker. You have to use CloudFormation constructs which start with the prefix Cfn. Higher-level constructs for Sagemaker are not on the roadmap as of March 2021.

Dockerfile to serve model

First thing is to have your app in a container form, so it can be deployed in a predictable way. It’s difficult to help with this step as each model may require different dependencies or actions. What I can recommend is to go over https://docs.aws.amazon.com/sagemaker/latest/dg/build-multi-model-build-container.html. This page explains the steps required to prepare a container that can serve a model on Sagemaker. It may also be helpful to read this part https://docs.aws.amazon.com/sagemaker/latest/dg/your-algorithms-inference-code.html on how your docker image will be used.

Define Sagemaker model

Once you have your model in a container form it is time to create a Sagemaker model. There are 3 elements to a Sagemaker model:

  • Container definition
  • VPC configuration for a model
  • Model definition

Adding container definition to your app is simple (the hard part of creating a docker image is already done). The container definition will be used by the Sagemaker model.

asset = DockerImageAsset(
    self,
    "MLInferenceImage",
    directory="../image")

primary_container_definition = sagemaker.CfnModel.ContainerDefinitionProperty(
     image=asset.image_uri,
)Code language: PHP (php)

Creating Vpc is pretty straightforward, you have to remember about creating public and private subnets.

vpc = ec2.Vpc(
    self,
    "VPC",
    subnet_configuration=[
        ec2.SubnetConfiguration(
            name="public-model-subnet", subnet_type=ec2.SubnetType.PUBLIC
                ),
        ec2.SubnetConfiguration(
            name="private-model-subnet", subnet_type=ec2.SubnetType.PRIVATE
                ),
            ],
        )
        
model_vpc_config = 
    sagemaker.CfnModel.VpcConfigProperty(
        security_group_ids=[vpc.vpc_default_security_group],
        subnets=[s.subnet_id for s in vpc.private_subnets],
        )Code language: PHP (php)

Creating a model is putting all created things together.

model = sagemaker.CfnModel(
    self,
    "MLInference",
    execution_role_arn=role.role_arn,
    model_name="my-model",
    primary_container=primary_container_definition,
    vpc_config=model_vpc_config,
)Code language: PHP (php)

At this point, cdk deploy would create Sagemaker model with an ML model of your choice.

Define endpoint configuration

We are not done yet as the model has to be exposed. Sagemaker Endpoint is perfect for this and in the next step we create endpoint configuration.

Endpoint configuration describes resources that will serve your model.

model_endpoint_config = sagemaker.CfnEndpointConfig(
    self,
    "model-endpoint-config",
    production_variants=[
sagemaker.CfnEndpointConfig.ProductionVariantProperty(
        initial_instance_count=1,
        initial_variant_weight=1.0,
        instance_type="ml.t2.medium",
        model_name=model.model_name,
        variant_name="production-medium",
        ),
    ],
)Code language: PHP (php)

Create Sagemaker Endpoint

Last step is extremely simple. We take the configuration created earlier and create an endpoint.

model_endpoint = sagemaker.CfnEndpoint(
    self,
    "model-endpoint", endpoint_config_name=model_endpoint_config.attr_endpoint_config_name,
)Code language: PHP (php)

Congrats

Now you may call cdk deploy and the model is up and running on AWS Sagemaker πŸ™‚

Categories
async python

Background tasks on the cheap

When integrating with third party API’s you need to make sure that your requests reach the third party. In case of issues on their end you want to retry and best not to interrupt the flow of your application or even worse pass the information about such issues to the end user (like leaking 503 errors).

Most popular solution is to use a background task and there are tools for helping with that: celery, python-rq, or dramatiq. They do the job of executing the code in the background but they require some extra infrastructure to make it work, plus all the dependencies they are bringing in. I have used them all in the past with great success but most recently decided to write a basic background task myself.

Why? As I mentioned earlier all of them require extra infrastructure in a form of a broker that most of the time is redis, this implies changes to deployment, requires additional resources, makes the stack more complex. The scope of what I had to do just did not justify bringing in this whole baggage. I needed to retry calls to AWS Glue service in case we maxed out capacity. Since the Glue job we are executing can take a couple minutes our calls to AWS Glue had to be pushed into the background. I’ll give you the code and summarize what it does. By no means this code is perfect but it works πŸ™‚

# background.py
import threading
from queue import Queue

task_queue = Queue()
worker_thread = None


def enqueue_task(task):
    task_queue.put_nowait(task)

    global worker_thread
    if not worker_thread:
        worker_thread = _run_worker_thread()


def _process_tasks(task_queue):
    while task_queue.qsize():
        task = task_queue.get()
        try:
            print(f"Do stuff with task: {task}")
        except Exception as e:
            task_queue.put(task)

    global worker_thread
    worker_thread = None


def _run_worker_thread():
    t = threading.Thread(target=_process_tasks, args=(task_queue,))
    t.start()
    return t

Public interface of this small background module is one function enqueue_task. When called task is put on the queue and thread is started. Each subsequent call will enqueue task and thread will be closed after it processed all of them.

I find this simple and flexible enough to handle communication with flaky services or services with usage caps. Since this can not be scaled it has limited usage, but HTTP calls are just fine. This code had been inspired by one of the talks of Raymond Hettinger regarding concurrency and queue module.

Categories
f-strings logging python

f-strings and python logging

There is one thing that has bothered me for a couple of months. It felt wrong when I saw it in the codebase but I could not tell why it is wrong. It was just a hunch that something is not right, but not enough to make me look for a reason.

For last couple of days I have been struggling to sort out my bot configuration on Azure and decided I need a break from that. Python being something I know best is a good candidate to feel comfortable and in control again.

I have decided to finally answer the question that was buggin me. Why using f-strings in logger calls makes me uneasy? Why this feels wrong?

hero = "Mumen Rider"
logger.error(f"Class C, rank 1: {hero}")

f-strings

Most of the pythonistas would know by now what f-strings are. They are convenient way of constructing strings. Values can be included directly in the string what makes the string much more readable. Here is an example from Python 3’s f-Strings: An Improved String Formatting Syntax (Guide), which is worth at least skimming through if you know f-strings.

>>> name = "Eric"
>>> age = 74
>>> f"Hello, {name}. You are {age}."
'Hello, Eric. You are 74'

They have benefits and my team have been using them since. It’s fine as they are awesome however I feel that they should not be used when we talk about logging.

logging

I’m not talking about poor man’s logging which is print. This is an example of logging in Python

logger.info("This is an example of a log message, and a value of %s", 42)

When the code includes such line and when it is executed it outputs string according to log configuration. Of course your log level needs to match but I’m skipping this as it is not relevant here, I’ll get back to this later. The %s identifier in log messages means that anything passed into logger.info will replace the identifier. So the message will look like this.

INFO:MyLogger:This is an example of a log message, and a value of 42

logging + f-strings

Since logging accept strings and f-strings are so nice they could be used together. Yes, it is possible of course but I’d not use f-strings for such purpose. Best to illustrate why is an example followed with explanation.

    import logging
    logging.basicConfig(level=logging.ERROR)
    logger = logging.getLogger('klich.dev')

    class MyClass:
        def __str__(self):
            print('Calling __str__')
            return "Hiya"

    c = MyClass()
    print("F style")
    logger.debug(f'{c}')
    logger.info(f'{c}')
    logger.warning(f'{c}')
    logger.error(f'{c}')

    print()
    print("Regular style")
    logger.debug('%s', c)
    logger.info('%s', c)
    logger.warning('%s', c)
    logger.error('%s', c)

This short example creates logger and sets logging level to ERROR. This means that only calls of logger.error will produce output. __str__ method of object used in log messages prints information when it is called. So each level matching logger call will print Calling __str__ message and Hiya. Since there are two logger.error calls we should get four lines total. This is what actually is printed out.

    % python3 logg.py
    F style
    Calling __str__
    Calling __str__
    Calling __str__
    Calling __str__
    ERROR:klich.dev:Hiya

    Regular style
    Calling __str__
    ERROR:klich.dev:Hiya

We can see that logger lines using f-strings are calling __str__ even if the log message is not printed out. This is not a big penalty but it may compound to something significant if you have many log calls with f-strings.

what is going on

According to documentation on logging

Formatting of message arguments is deferred until it cannot be avoided.

Logger is smart enough to actually not format messages if it is not needed. It will refrain from calling __str__ until it is required, when it is passed to std out or to a file, or other with options supported by logger.

To dig a little bit more we can use dis module from python standard library. After feeding our code to dis.dis method we will get a list of operations that happend under the hood. For detailed explanation of what exact operations do have a look at ceval.c from Python’s sources.

    >>> import logging
    >>> logger = logging.getLogger()
    >>> def f1():
            logger.info("This is an example of a log message, and a value of %s", 42)

    >>> def f2():
            logger.info(f"This is an example of a log message, and a value of {42}")

    >>> import dis
    >>> dis.dis(f1)
                0 LOAD_GLOBAL              0 (logger)
                2 LOAD_METHOD              1 (info)
                4 LOAD_CONST               1 ('This is an example of a log message, and a value of %s')
                6 LOAD_CONST               2 (42)
                8 CALL_METHOD              2
                10 POP_TOP
                12 LOAD_CONST               0 (None)
                14 RETURN_VALUE
    >>> dis.dis(f2)
                0 LOAD_GLOBAL              0 (logger)
                2 LOAD_METHOD              1 (info)
                4 LOAD_CONST               1 ('This is an example of a log message, and a value of ')
                6 LOAD_CONST               2 (42)
                8 FORMAT_VALUE             0
                10 BUILD_STRING             2
                12 CALL_METHOD              1
                14 POP_TOP
                16 LOAD_CONST               0 (None)
                18 RETURN_VALUE

In this case we won’t get into much details, it is enough to see that f-strings add two additional operations of FORMAT_VALUE (Handles f-string value formatting.) and BUILD_STRING.

After this small research I can explain why we should not be using f-strings in this specific place which is logging. I also can put my uneasiness to rest.

Categories
python

Redis cache

I enjoy listening to podcasts, as they sometimes give me inspiration to create something. In one of the episodes of Python Bytes podcast guys mentioned https://github.com/bwasti/cache.py tool. cache.py allows to cache function calls across runs by using cache file. Simple and really useful.

This inspired me to write a similar thing but for distributed apps, based on redis as a cache storage. I called it rcache and you can find it on PyPI.

In order to use it simply decorate a function like this:

import rcache

@rcache.rcache()
def expensive_func(arg, kwarg=None):
  # Expensive stuff here
  return arg

Default redis address is http://localhost:6379, but you can change it by passing an url into the decorator @rcache.cache(url="http://your_redis:6379").

I hope you find it useful and if you wish to comment or report something please go to https://gitlab.com/the_speedball/redis.cache.py/issues.

Have fun

Categories
python

Collaboration of OAuth and LDAP

The goal

Almost everyone knows OAuth, it is widely used online and has a good reputation. In company where I work we have decided to integrate OAuth2 into our platform (based on microservice architecture, I’ll be saying platform from now on). The trick was that it had to be integrated with existing LDAP server storing user data. User requesting the token had to input his LDAP username and password in order to receive the token. Looking for existing solutions was fruitless and new code had to be written. At that point I didn’t know how this could be achieved. In addition to LDAP I had to use Django as a base of this for providing REST API endpoints. The limitations were clear at this point, risks were unknown. This is not an unusual thing when working as a programmer.

The stack

As I mentioned earlier Django was chosen as the framework. Having decided on the framework it narrowed down number of libraries to use. Fortunately there was already an library adding OAuth to the Django, Django OAuth Toolkit. DOT integrates nicely with Django REST framework and allows you to write your plugins for validating token requests. It supports much more but those two features were the main selling points. Talking to LDAP required a library to do the heavy lifting. There is not much choice here to be honest and I stuck with python-ldap. It is different from what you would expect in terms of python library. Messages are not very useful, docs not very clear but it works and is reliable.

The solution

At the beginning the task seemed really difficult to me. I have only played with OAuth without understanding how it works. It was similar with LDAP. After diving into details it stopped looking that hairy as it turned out I only had to plug into process of authorising the user request. Putting this simply the process of issuing the token would not be started until the user provides credentials that are valid to LDAP.

Django OAuth Toolkit

DOT (Django OAuth Toolkit) is pretty flexible, it provides a setting OAUTH2_VALIDATOR_CLASS where you can define your own validator. This allows to control each step of OAuth2 process. Fortunately I was only concerned with user validation. In order to achieve it I had to write my own validator. Easiest way was to read the default class which has been provided by DOT, namely oauth2_provider.oauth2_validators.OAuth2Validator. It is nicely written, each step has it’s own method that can be replaced. Just had to find a proper one. Proper like validate_user.

def validate_user(self, username, password, client, request, *args, **kwargs):

Signature of the method pictures exactly what needs to be done and lists all the required ingredients. We were connecting to LDAP so this method had to do everything required to validate the user and return bool depending on the result of validation.

Django

Having all the parts connected together the only thing left was to replace the validator class so our new one is used. After doing this all the requests coming in to our OAuth2 server had to conform to our rules which is provide login and password stored in LDAP. It took me longer than expected to grasp the concept and to design the solution. I have created few prototypes each of them with lesser number of CLOCs, until this simple solution came to my mind.

Still don’t fancy LDAP.

Categories
python

Consumer with cache

Micro. Services. They are popular, and it is a pretty useful pattern if applied correctly. Using such pattern forces one to think a bit more when designing a solution to a problem. Not that it is more difficult but rather it is different. One of the main differences is communication or maybe data flow.

Regular applications tend to talk mostly by internal calls, or callbacks, which makes communication simpler. You do not have to care if function you are calling is available. If you pass correct arguments you will get a response. It is not that simple with micro services which in fact are like third party applications. They can go down, can throttle your calls, they can response with a delay, etc.

My current project tries to mitigate this by using messaging pattern in vulnerable spots, where communication may be unstable (connecting to third party, like Facebook) or prone to delays/timeouts (database writes, connecting to third party). Database we are using, Elasticsearch, has a thread pool of workers and if under heavy load may throttle access. Our application may generate a lot of writes exhausting pool of ES workers. "Easy" way of increasing number of written documents is to write them in batches using bulk operation. As we are using AMQP protocol for communication with DB we are not able to process more than one message at time, as this is not supported by the protocol.

Solution to this is Aggregator pattern, from Enterprise Integration Patterns book. You will find full description of it if you buy a book πŸ™‚ This link however gives enough information to understand what it does.

The Aggregator is a special Filter that receives a stream of messages and identifies messages that are correlated. Once a complete set of messages has been received (more on how to decide when a set is ‘complete’ below), the Aggregator collects information from each correlated message and publishes a single, aggregated message to the output channel for further processing.

Aggregator will pull messages and when some condition is reached it will create a bulk write. The condition that triggers insert would be message count based(do we have enough messages) but it may be time based, it could be triggered by a special message or any other condition.

I usually try to find existing solution so I could integrate it or maybe modify a bit to our needs but this time there was nothing. It was time to write it myself with a plenty of elbow grease.

Below solution is not ideal as it does not package messages in same size bulks. It may be more or less then specified, but it will be a batch.

First goes queuing consumer code creating 5 consumers listening on default RabbitMQ address.

#!/usr/bin/env python
import queue

from kombu.mixins import ConsumerMixin
from kombu import Connection, Queue
connection = Connection('amqp://guest:guest@localhost:5672//')


q = queue.Queue()


class C(ConsumerMixin):
    def __init__(self, name, connection, q):
        self.name = name
        self.connection = connection
        self.q = q

    def get_consumers(self, consumer, channel):
        return [
            consumer(
                Queue('task_queue'),
                callbacks=[self.on_message],
                accept=['json']),
        ]

    def on_message(self, body, message):
        self.q.put(body)
        message.ack()

        if self.q.qsize() > 10:
            batch = []
            while True:
                item = self.q.get()
                if self.q.empty():
                    break

                print('%s : Compress: %s' % (self.name, item))
                batch.append(item)
                self.q.task_done()

            print('%s : Push batch: %s' % (self.name, batch))


from threading import Thread


threads = []
for i in range(5):
    w = C('worker %s' % i, connection, q)
    t = Thread(target=w.run)
    t.start()
    threads.append(t)

Here is test producer code that generates messages so one can see how consumers behave.

#!/usr/bin/env python

import pika
import sys

connection = pika.BlockingConnection(
    pika.ConnectionParameters(host='localhost'))
channel = connection.channel()

channel.queue_declare(queue='task_queue', durable=True)

message = ' '.join(sys.argv[1:]) or "Hello World!"
for x in range(40):
    channel.basic_publish(
        exchange='',
        routing_key='task_queue',
        body=message,
        properties=pika.BasicProperties(
            delivery_mode=2,  # make message persistent
        ))
    print(" [%s] Sent %r" % (x, message))
connection.close()

In order to test it pika needs to be installed, then simply run consumer in one terminal and trigger producer in other. All the calls to queue.Queue are non-blocking which is most probably reason of different batch sizes. I guess this attempt has to wait for next blog post.

Categories
python

Celery tasks, states and results

The subject of Celery task results comes back every now and then. It would make a really good post, with nice examples. So here we go!

If you don’t know what Celery is:

Celery is a simple, flexible, and reliable distributed system to process vast amounts of messages, while providing operations with the tools required to maintain such a system. It’s a task queue with focus on real-time processing, while also supporting task scheduling.

You can read more by going to their documentation.

States

Celery comes with a few states to start with. These states tell you what is happening to a task and are selected from this list most of the time (unless you have custom states, but I’ll cover this later):

  • PENDING
  • STARTED
  • SUCCESS
  • FAILURE
  • RETRY
  • REVOKED

Such defaults allow you to go pretty far with tracking your tasks. You can even deduct transitions based on current state of the tasks: a FAILURE state means that a task went through PENDING and STARTED states. Here’s an example of how that works. Let’s take a basic task from Celery’s own tutorial. I’m using RabbitMQ of this docker image as my broker.

from celery import Celery

app = Celery('task', broker='amqp://guest:guest@localhost:5672//')

# STARTED state is not enabled by default so we flip it on
app.conf.update(task_track_started=True)


@app.task(bind=True)
def add(self, x, y):
    # we need to sleep to show STARTED state
    import time
    time.sleep(10)
    return x + y

With that we can go through some of the states.

majki@snakepit ~/projects/blog/pow/celery-states
% pipenv run python
Python 3.4.8 (default, Mar 19 2018, 21:12:05)
[GCC 5.4.0 20160609] on linux
Type "help", "copyright", "credits" or "license" for more information.
>>> from task import add
>>> r = add.delay(2, 3)  # worker is disabled so we can have PENDING state
>>> r.status
'PENDING'  # worker is enabled after this
>>> r.status
'STARTED'
>>> r.status
'SUCCESS'
>>> r = add.delay(2, 'a')
>>> r.status
'STARTED'
>>> r.state
'FAILURE'

It is pretty useful and probably covers a lot of use cases, but there’s even more to discover about Celery’s states.

Custom states

You can also define your own states if you need to. Let’s modify our example a bit to include a new state called ‘GOING_TO_SLEEP’.

from celery import Celery

app = Celery('task', broker='amqp://guest:guest@localhost:5672//')

# STARTED state is not enabled by default so we flip it on
app.conf.update(task_track_started=True)


@app.task(bind=True)
def add(self, x, y):
    self.update_state(state='GOING_TO_SLEEP')
    import time
    time.sleep(10)
    return x + y

Now let’s see how this works.

majki@snakepit ~/projects/blog/pow/celery-states
% pipenv run python
Python 3.4.8 (default, Mar 19 2018, 21:12:05)
[GCC 5.4.0 20160609] on linux
Type "help", "copyright", "credits" or "license" for more information.
>>> from task import add
>>> r = add.delay(2, 3)
>>> r.status
GOING_TO_SLEEP'
>>> r.status
'SUCCESS'

You can go bananas with such a feature and be really pedantic on your tasks state flow. It may be beneficial for some really complex workflows – if you would like to monitor it, so do it.

Results storage

Now let’s tackle the other concern. How do you save a task result? All my examples up to this point were in REPL, so all results are gone as soon as you close it. Thankfully authors of Celery thought about it and provided us with such functionality. Celery supports multiple type of storages, making almost everyone happy. I’m not done with my task example, just needs a little of tweaking. To be honest, not much is needed to have task results persisted.

from celery import Celery

app = Celery('task', broker='amqp://guest:guest@localhost:5672//')
app.conf.update(task_track_started=True,
                result_backend='file:///var/celery/results')


@app.task(bind=True)
def add(self, x, y):
    import time
    time.sleep(10)
    return x + y

According to my configuration, all the task results will be saved under /var/celery/results directory. I have picked file-system backend as it is easiest to show.

majki@snakepit ~/projects/blog/pow/celery-states
% pipenv run python
Python 3.4.8 (default, Mar 19 2018, 21:12:05)
[GCC 5.4.0 20160609] on linux
Type "help", "copyright", "credits" or "license" for more information.
>>> from task import add
>>> add.delay(1,1)
<AsyncResult: be19e446-64a6-4fa9-b1a4-05feeb4fbec2>
>>> add.delay(1,1)
<AsyncResult: 1dc82626-9731-43ea-b4ea-6f237143dd42>
>>> add.delay(1,1)
<AsyncResult: badbe577-9314-4a92-91b8-80d071a5f8b5>
>>> add.delay(1,1)
<AsyncResult: c2da2e98-7a05-411f-acd5-7ead99c12c4e>

Now we can have a look at our results backend and see if results are stored. I had been keeping results of all my tasks while writing this post so there is a lot of files. The last four are the ones that you can see in the output of REPL above.

majki@snakepit ~/projects/blog
% ls -thor  # thanks firemark!
total 236K
drwxr-xr-x 6 majki 4,0K kwi 10 20:46 cryogen
drwxr-xr-x 3 majki 4,0K kwi 11 20:28 pow
-rw-rw-r-- 1 majki  120 kwi 11 21:10 celery-task-meta-f730b12b-25ee-46d2-a7d5-eabda0ab14ef
-rw-rw-r-- 1 majki  120 kwi 11 21:11 celery-task-meta-6c7a3fc1-882b-4ddd-be9e-bd1431099eae
-rw-rw-r-- 1 majki  120 kwi 11 21:11 celery-task-meta-f5a73a65-6bda-47b8-a04e-34d1e5b7b81d
-rw-rw-r-- 1 majki  120 kwi 11 21:12 celery-task-meta-c012b4a7-362f-4d05-ac65-9dc398ac514e
-rw-rw-r-- 1 majki  120 kwi 11 21:14 celery-task-meta-c0654b79-65b0-4365-9a45-6b3d248d559d
-rw-rw-r-- 1 majki  120 kwi 11 21:14 celery-task-meta-f2d51878-86db-4d21-bc36-c651e63bdda3
-rw-rw-r-- 1 majki  120 kwi 11 21:14 celery-task-meta-ff9071d1-5ac1-4f81-9da2-29b4f706f5d8
-rw-rw-r-- 1 majki  120 kwi 11 21:15 celery-task-meta-8691b6f0-fa8f-401b-9133-0ce8308e34c9
-rw-rw-r-- 1 majki  120 kwi 11 21:18 celery-task-meta-9d312364-ed4c-49f2-a6eb-7e3b6a9eeaef
-rw-rw-r-- 1 majki  120 kwi 11 21:19 celery-task-meta-31ae7d06-c292-48c8-b526-eebeb021bb90
-rw-rw-r-- 1 majki  120 kwi 11 21:22 celery-task-meta-39d6126d-57f9-4a2b-a09b-68475076ec08
-rw-rw-r-- 1 majki  120 kwi 11 21:39 celery-task-meta-264975a3-b8d0-4855-a19a-fc64e1384bfe
-rw-rw-r-- 1 majki  760 kwi 11 21:46 celery-task-meta-b7d429d2-9aaa-4af5-914a-aba1321676cb
-rw-rw-r-- 1 majki  120 kwi 11 21:57 celery-task-meta-fa4f6d6a-aeb2-416b-9efc-9e7947ef9550
-rw-rw-r-- 1 majki  120 kwi 11 22:03 celery-task-meta-f5774f4f-0d51-4861-af95-a1a8ff94d1b1
-rw-rw-r-- 1 majki  120 kwi 11 22:18 celery-task-meta-be19e446-64a6-4fa9-b1a4-05feeb4fbec2
-rw-rw-r-- 1 majki  120 kwi 11 22:26 celery-task-meta-1dc82626-9731-43ea-b4ea-6f237143dd42
-rw-rw-r-- 1 majki  120 kwi 11 22:26 celery-task-meta-badbe577-9314-4a92-91b8-80d071a5f8b5
-rw-rw-r-- 1 majki  120 kwi 11 22:26 celery-task-meta-c2da2e98-7a05-411f-acd5-7ead99c12c4e

majki@snakepit ~/projects/blog
% cat celery-task-meta-c2da2e98-7a05-411f-acd5-7ead99c12c4e| python -m json.tool
{
    "children": [],
    "result": 2,
    "status": "SUCCESS",
    "task_id": "c2da2e98-7a05-411f-acd5-7ead99c12c4e",
    "traceback": null
}

There you have it, all is stored "safely" (it’s my laptop πŸ™‚ ) and can be viewed if required.

Hopefully this puts all concerns aside regarding task states, task results, and sheds a bit of light on Celery’s extensive API.