Categories
python

Celery tasks, states and results

The subject of Celery task results comes back every now and then. It would make a really good post, with nice examples. So here we go!

If you don’t know what Celery is:

Celery is a simple, flexible, and reliable distributed system to process vast amounts of messages, while providing operations with the tools required to maintain such a system. It’s a task queue with focus on real-time processing, while also supporting task scheduling.

You can read more by going to their documentation.

States

Celery comes with a few states to start with. These states tell you what is happening to a task and are selected from this list most of the time (unless you have custom states, but I’ll cover this later):

  • PENDING
  • STARTED
  • SUCCESS
  • FAILURE
  • RETRY
  • REVOKED

Such defaults allow you to go pretty far with tracking your tasks. You can even deduct transitions based on current state of the tasks: a FAILURE state means that a task went through PENDING and STARTED states. Here’s an example of how that works. Let’s take a basic task from Celery’s own tutorial. I’m using RabbitMQ of this docker image as my broker.

from celery import Celery

app = Celery('task', broker='amqp://guest:guest@localhost:5672//')

# STARTED state is not enabled by default so we flip it on
app.conf.update(task_track_started=True)


@app.task(bind=True)
def add(self, x, y):
    # we need to sleep to show STARTED state
    import time
    time.sleep(10)
    return x + y

With that we can go through some of the states.

majki@snakepit ~/projects/blog/pow/celery-states
% pipenv run python
Python 3.4.8 (default, Mar 19 2018, 21:12:05)
[GCC 5.4.0 20160609] on linux
Type "help", "copyright", "credits" or "license" for more information.
>>> from task import add
>>> r = add.delay(2, 3)  # worker is disabled so we can have PENDING state
>>> r.status
'PENDING'  # worker is enabled after this
>>> r.status
'STARTED'
>>> r.status
'SUCCESS'
>>> r = add.delay(2, 'a')
>>> r.status
'STARTED'
>>> r.state
'FAILURE'

It is pretty useful and probably covers a lot of use cases, but there’s even more to discover about Celery’s states.

Custom states

You can also define your own states if you need to. Let’s modify our example a bit to include a new state called ‘GOING_TO_SLEEP’.

from celery import Celery

app = Celery('task', broker='amqp://guest:guest@localhost:5672//')

# STARTED state is not enabled by default so we flip it on
app.conf.update(task_track_started=True)


@app.task(bind=True)
def add(self, x, y):
    self.update_state(state='GOING_TO_SLEEP')
    import time
    time.sleep(10)
    return x + y

Now let’s see how this works.

majki@snakepit ~/projects/blog/pow/celery-states
% pipenv run python
Python 3.4.8 (default, Mar 19 2018, 21:12:05)
[GCC 5.4.0 20160609] on linux
Type "help", "copyright", "credits" or "license" for more information.
>>> from task import add
>>> r = add.delay(2, 3)
>>> r.status
GOING_TO_SLEEP'
>>> r.status
'SUCCESS'

You can go bananas with such a feature and be really pedantic on your tasks state flow. It may be beneficial for some really complex workflows – if you would like to monitor it, so do it.

Results storage

Now let’s tackle the other concern. How do you save a task result? All my examples up to this point were in REPL, so all results are gone as soon as you close it. Thankfully authors of Celery thought about it and provided us with such functionality. Celery supports multiple type of storages, making almost everyone happy. I’m not done with my task example, just needs a little of tweaking. To be honest, not much is needed to have task results persisted.

from celery import Celery

app = Celery('task', broker='amqp://guest:guest@localhost:5672//')
app.conf.update(task_track_started=True,
                result_backend='file:///var/celery/results')


@app.task(bind=True)
def add(self, x, y):
    import time
    time.sleep(10)
    return x + y

According to my configuration, all the task results will be saved under /var/celery/results directory. I have picked file-system backend as it is easiest to show.

majki@snakepit ~/projects/blog/pow/celery-states
% pipenv run python
Python 3.4.8 (default, Mar 19 2018, 21:12:05)
[GCC 5.4.0 20160609] on linux
Type "help", "copyright", "credits" or "license" for more information.
>>> from task import add
>>> add.delay(1,1)
<AsyncResult: be19e446-64a6-4fa9-b1a4-05feeb4fbec2>
>>> add.delay(1,1)
<AsyncResult: 1dc82626-9731-43ea-b4ea-6f237143dd42>
>>> add.delay(1,1)
<AsyncResult: badbe577-9314-4a92-91b8-80d071a5f8b5>
>>> add.delay(1,1)
<AsyncResult: c2da2e98-7a05-411f-acd5-7ead99c12c4e>

Now we can have a look at our results backend and see if results are stored. I had been keeping results of all my tasks while writing this post so there is a lot of files. The last four are the ones that you can see in the output of REPL above.

majki@snakepit ~/projects/blog
% ls -thor  # thanks firemark!
total 236K
drwxr-xr-x 6 majki 4,0K kwi 10 20:46 cryogen
drwxr-xr-x 3 majki 4,0K kwi 11 20:28 pow
-rw-rw-r-- 1 majki  120 kwi 11 21:10 celery-task-meta-f730b12b-25ee-46d2-a7d5-eabda0ab14ef
-rw-rw-r-- 1 majki  120 kwi 11 21:11 celery-task-meta-6c7a3fc1-882b-4ddd-be9e-bd1431099eae
-rw-rw-r-- 1 majki  120 kwi 11 21:11 celery-task-meta-f5a73a65-6bda-47b8-a04e-34d1e5b7b81d
-rw-rw-r-- 1 majki  120 kwi 11 21:12 celery-task-meta-c012b4a7-362f-4d05-ac65-9dc398ac514e
-rw-rw-r-- 1 majki  120 kwi 11 21:14 celery-task-meta-c0654b79-65b0-4365-9a45-6b3d248d559d
-rw-rw-r-- 1 majki  120 kwi 11 21:14 celery-task-meta-f2d51878-86db-4d21-bc36-c651e63bdda3
-rw-rw-r-- 1 majki  120 kwi 11 21:14 celery-task-meta-ff9071d1-5ac1-4f81-9da2-29b4f706f5d8
-rw-rw-r-- 1 majki  120 kwi 11 21:15 celery-task-meta-8691b6f0-fa8f-401b-9133-0ce8308e34c9
-rw-rw-r-- 1 majki  120 kwi 11 21:18 celery-task-meta-9d312364-ed4c-49f2-a6eb-7e3b6a9eeaef
-rw-rw-r-- 1 majki  120 kwi 11 21:19 celery-task-meta-31ae7d06-c292-48c8-b526-eebeb021bb90
-rw-rw-r-- 1 majki  120 kwi 11 21:22 celery-task-meta-39d6126d-57f9-4a2b-a09b-68475076ec08
-rw-rw-r-- 1 majki  120 kwi 11 21:39 celery-task-meta-264975a3-b8d0-4855-a19a-fc64e1384bfe
-rw-rw-r-- 1 majki  760 kwi 11 21:46 celery-task-meta-b7d429d2-9aaa-4af5-914a-aba1321676cb
-rw-rw-r-- 1 majki  120 kwi 11 21:57 celery-task-meta-fa4f6d6a-aeb2-416b-9efc-9e7947ef9550
-rw-rw-r-- 1 majki  120 kwi 11 22:03 celery-task-meta-f5774f4f-0d51-4861-af95-a1a8ff94d1b1
-rw-rw-r-- 1 majki  120 kwi 11 22:18 celery-task-meta-be19e446-64a6-4fa9-b1a4-05feeb4fbec2
-rw-rw-r-- 1 majki  120 kwi 11 22:26 celery-task-meta-1dc82626-9731-43ea-b4ea-6f237143dd42
-rw-rw-r-- 1 majki  120 kwi 11 22:26 celery-task-meta-badbe577-9314-4a92-91b8-80d071a5f8b5
-rw-rw-r-- 1 majki  120 kwi 11 22:26 celery-task-meta-c2da2e98-7a05-411f-acd5-7ead99c12c4e

majki@snakepit ~/projects/blog
% cat celery-task-meta-c2da2e98-7a05-411f-acd5-7ead99c12c4e| python -m json.tool
{
    "children": [],
    "result": 2,
    "status": "SUCCESS",
    "task_id": "c2da2e98-7a05-411f-acd5-7ead99c12c4e",
    "traceback": null
}

There you have it, all is stored "safely" (it’s my laptop 🙂 ) and can be viewed if required.

Hopefully this puts all concerns aside regarding task states, task results, and sheds a bit of light on Celery’s extensive API.

Categories
microservices

Cleaning up messy distributed logs, part 2

This is a story on sorting logs. First part can be found here, it’s where you can get all the nitty gritty. If you are not feeling like it here is a short elevator pitch. Our kibana logs are a mess, each of the services seems to have different log format and trace backs are split across many entries. My goal is to fix it.

Second part is much much shorter as it’s one of the steps I took on path to sorting this pet peeve of mine. Unfortunately, I can not spend my whole time at work to fix it as it is not a crucial part of our application. At least it is not at this moment.

My first idea was to tackle "easy" issue of grok parsing failure. Our configuration is rather bloated IMO so I trimmed it (I had mentioned that it is not my concern but it easier to reason about simple code). From this:

      input {
        gelf { }
        heartbeat { }
        syslog {
          port => 5145
        }
      }
      filter {
        ruby {
          code => "
            event.to_hash.keys.each { |k| event[ k.gsub('"'.'"','"'_'"') ] = 
            event.remove(k) if k.include?'"'.'"' }
          "
        }
        grok {
          match => {
            "message" => [
              # Logspout
              "%{SYSLOG5424PRI}%{NONNEGINT:ver} +(?:%{TIMESTAMP_ISO8601:ts}|-) 
              +(?:%{HOSTNAME:containerid}|-) 
              +(?:%{WORD:process}\.%{INT:processnumber}\.%{WORD:processsha}|%{NOTSPACE:process}) 
              +(?:%{NOTSPACE:proc}|-) +(?:%{WORD:msgid}|-) 
              +(?:%{SYSLOG5424SD:sd}|-|) +%{GREEDYDATA:msg}"
            ]
          }
        }
      }
      output {
        elasticsearch {
          hosts => ["elasticsearch:9200"]
        }
        stdout {
          codec => rubydebug
        }
      }

I went to this:

     input {
      syslog { }
      }
      filter {
        grok {
          match => {
            "message" => [
              # Logspout
              "%{SYSLOG5424PRI}%{NONNEGINT:ver} +(?:%{TIMESTAMP_ISO8601:ts}|-) 
              +(?:%{HOSTNAME:containerid}|-) 
              +(?:%{WORD:process}\.%{INT:processnumber}\.%{WORD:processsha}|%{NOTSPACE:process}) 
              +(?:%{NOTSPACE:proc}|-) +(?:%{WORD:msgid}|-) 
              +(?:%{SYSLOG5424SD:sd}|-|) +%{GREEDYDATA:msg}"
            ]
          }
        }
      }
      output {
        elasticsearch {
          hosts => ["elasticsearch:9200"]
        }
      }

We had few plugins that were doing nothing so went under the cleaver. Since I can run the swarm locally I’d verify that all is logged and displayed correctly in kibana.

So then I did what everyone does, google it hard. Actually duckduckgo it but it just does not sound right. After lot of reading finally found something relevant to my case. So the perpetrator is syslog plugin, I mentioned before that logs format might not be consistent and possibly (for sure TBH) not up to syslog’s spec. As author recommended I had replaced syslog { } with tcp and udp plugins.

     input {
          tcp {
              port => 5145
              type => syslog
          }
          udp {
              port => 5145
              type => syslog
          }
      }

Logs are now without _grokparsefailure tags. Still bit messy but at least correctly and efficiently parsed. See you in the next part.

Categories
microservices

Cleaning up messy distributed logs, part 1

Logging is an important part of software lifecycle. It is used in every stage, from the beginning, through testing, deployment, staging, production. I find neat logs soothing, they increase my confidence in running software providing visibility when required.

Having said that I have taken a task to work on logs in a distributed system I’m working on. Since we push everything through ELK stack this is what I have inherited.

Unfiltered messy kibana logs

It looks like a mess, but this is because kibana does not have any sensible default configuration in our setup. After looking more closely there is another issue. I have marked it with red.

Unfiltered messy kibana logs with tags marked with red

So time to look at how logstash is configured as there is a problem with how grok filter is set up. Before doing that it is worth explaining how can you configure logstash as it will help in understanding my configuration, and maybe explain briefly what logstash is.

Easiest way to describe purpose of this tool is by citing introduction taken from the documentation.

Logstash is an open source data collection engine with real-time pipelining capabilities. Logstash can dynamically unify data from disparate sources and normalize the data into destinations of your choice.

How do you configure it? Simply, by taking the plugins you wish and putting them into your configuration file. Plugins are:

  • input plugins, they are sources of events fed to logstash. list
  • filter plugins, used for processing of event data. list
  • output plugins, send data to particular destination. list
  • codec plugins, can be part of input or output plugins and can change the data. list

In my case configuration is a bit complex and what’s most important grok is not configured correctly (So it seems but there might be something else as well in play). Some of the plugins may be redundant as well but this is not my concern now.

      input {
        gelf { }
        heartbeat { }
        syslog {
          port => 5145
        }
      }
      filter {
        ruby {
          code => "
            event.to_hash.keys.each { |k| event[ k.gsub('"'.'"','"'_'"') ] = 
            event.remove(k) if k.include?'"'.'"' }
          "
        }
        grok {
          match => {
            "message" => [
              # Logspout
              "%{SYSLOG5424PRI}%{NONNEGINT:ver} +(?:%{TIMESTAMP_ISO8601:ts}|-) 
              +(?:%{HOSTNAME:containerid}|-) 
              +(?:%{WORD:process}\.%{INT:processnumber}\.%{WORD:processsha}|%{NOTSPACE:process}) 
              +(?:%{NOTSPACE:proc}|-) +(?:%{WORD:msgid}|-) 
              +(?:%{SYSLOG5424SD:sd}|-|) +%{GREEDYDATA:msg}"
            ]
          }
        }
      }
      output {
        elasticsearch {
          hosts => ["elasticsearch:9200"]
        }
        stdout {
          codec => rubydebug
        }
      }

This is the state now, further on I’ll describe as I go, my quest to make logs great again.