Categories
logging microservices

Cleaning up messy distributed logs, part 3

If you have been reading along it is my final part of this short story on cleaning up messy logs. I might write one on Kibana itself some day, as it is a difficult tool to use for a beginners. Previous parts of this story can be found here: part 1 and part 2.

This last couple of days at work I could spent more time on fixing logs as new project estimates has finished. Last thing I sorted was syslog parsing, with that done I could think on figuring out how to log tracebacks raised in our services.

Tracebacks are spread over many lines and when they arrive to logstash they are parsed as separate entries and then sent to elasticsearch like wise. So Kibana displays them as separate as well making it really difficult to read. In addition of being separate they are in reverse order.

My idea is to serialize each log message into JSON and then parse it in logstash. As I have probably mentioned I am working on an app using microservices pattern and there is a few services to be done. First candidate is the gateway service, as it’s easiest to test and relatively simple with no additional components beside the service. It is flask app running on gunicorn so to configure logging there is no need to change the code but rather your entrypoint by adding --log-config.

#!/bin/bash
exec gunicorn -b 0.0.0.0:80 --log-config /code/log-config.ini --access-logfile - --reload "gate.app:create_app()"

Config file should be written in ini style.

[loggers]
keys=root

[handlers]
keys=console

[formatters]
keys=default

[logger_root]
level=DEBUG
qualname=root
handlers=console

[handler_console]
class=StreamHandler
formatter=default
args=(sys.stdout, )

[formatter_default]
class=logstash_formatter.LogstashFormatter

This alone handles almost whole application configuration, logs are nicely serialized into JSON with whole traceback.

Categories
python

Collaboration of OAuth and LDAP

The goal

Almost everyone knows OAuth, it is widely used online and has a good reputation. In company where I work we have decided to integrate OAuth2 into our platform (based on microservice architecture, I’ll be saying platform from now on). The trick was that it had to be integrated with existing LDAP server storing user data. User requesting the token had to input his LDAP username and password in order to receive the token. Looking for existing solutions was fruitless and new code had to be written. At that point I didn’t know how this could be achieved. In addition to LDAP I had to use Django as a base of this for providing REST API endpoints. The limitations were clear at this point, risks were unknown. This is not an unusual thing when working as a programmer.

The stack

As I mentioned earlier Django was chosen as the framework. Having decided on the framework it narrowed down number of libraries to use. Fortunately there was already an library adding OAuth to the Django, Django OAuth Toolkit. DOT integrates nicely with Django REST framework and allows you to write your plugins for validating token requests. It supports much more but those two features were the main selling points. Talking to LDAP required a library to do the heavy lifting. There is not much choice here to be honest and I stuck with python-ldap. It is different from what you would expect in terms of python library. Messages are not very useful, docs not very clear but it works and is reliable.

The solution

At the beginning the task seemed really difficult to me. I have only played with OAuth without understanding how it works. It was similar with LDAP. After diving into details it stopped looking that hairy as it turned out I only had to plug into process of authorising the user request. Putting this simply the process of issuing the token would not be started until the user provides credentials that are valid to LDAP.

Django OAuth Toolkit

DOT (Django OAuth Toolkit) is pretty flexible, it provides a setting OAUTH2_VALIDATOR_CLASS where you can define your own validator. This allows to control each step of OAuth2 process. Fortunately I was only concerned with user validation. In order to achieve it I had to write my own validator. Easiest way was to read the default class which has been provided by DOT, namely oauth2_provider.oauth2_validators.OAuth2Validator. It is nicely written, each step has it’s own method that can be replaced. Just had to find a proper one. Proper like validate_user.

def validate_user(self, username, password, client, request, *args, **kwargs):

Signature of the method pictures exactly what needs to be done and lists all the required ingredients. We were connecting to LDAP so this method had to do everything required to validate the user and return bool depending on the result of validation.

Django

Having all the parts connected together the only thing left was to replace the validator class so our new one is used. After doing this all the requests coming in to our OAuth2 server had to conform to our rules which is provide login and password stored in LDAP. It took me longer than expected to grasp the concept and to design the solution. I have created few prototypes each of them with lesser number of CLOCs, until this simple solution came to my mind.

Still don’t fancy LDAP.

Categories
python

Consumer with cache

Micro. Services. They are popular, and it is a pretty useful pattern if applied correctly. Using such pattern forces one to think a bit more when designing a solution to a problem. Not that it is more difficult but rather it is different. One of the main differences is communication or maybe data flow.

Regular applications tend to talk mostly by internal calls, or callbacks, which makes communication simpler. You do not have to care if function you are calling is available. If you pass correct arguments you will get a response. It is not that simple with micro services which in fact are like third party applications. They can go down, can throttle your calls, they can response with a delay, etc.

My current project tries to mitigate this by using messaging pattern in vulnerable spots, where communication may be unstable (connecting to third party, like Facebook) or prone to delays/timeouts (database writes, connecting to third party). Database we are using, Elasticsearch, has a thread pool of workers and if under heavy load may throttle access. Our application may generate a lot of writes exhausting pool of ES workers. "Easy" way of increasing number of written documents is to write them in batches using bulk operation. As we are using AMQP protocol for communication with DB we are not able to process more than one message at time, as this is not supported by the protocol.

Solution to this is Aggregator pattern, from Enterprise Integration Patterns book. You will find full description of it if you buy a book πŸ™‚ This link however gives enough information to understand what it does.

The Aggregator is a special Filter that receives a stream of messages and identifies messages that are correlated. Once a complete set of messages has been received (more on how to decide when a set is ‘complete’ below), the Aggregator collects information from each correlated message and publishes a single, aggregated message to the output channel for further processing.

Aggregator will pull messages and when some condition is reached it will create a bulk write. The condition that triggers insert would be message count based(do we have enough messages) but it may be time based, it could be triggered by a special message or any other condition.

I usually try to find existing solution so I could integrate it or maybe modify a bit to our needs but this time there was nothing. It was time to write it myself with a plenty of elbow grease.

Below solution is not ideal as it does not package messages in same size bulks. It may be more or less then specified, but it will be a batch.

First goes queuing consumer code creating 5 consumers listening on default RabbitMQ address.

#!/usr/bin/env python
import queue

from kombu.mixins import ConsumerMixin
from kombu import Connection, Queue
connection = Connection('amqp://guest:guest@localhost:5672//')


q = queue.Queue()


class C(ConsumerMixin):
    def __init__(self, name, connection, q):
        self.name = name
        self.connection = connection
        self.q = q

    def get_consumers(self, consumer, channel):
        return [
            consumer(
                Queue('task_queue'),
                callbacks=[self.on_message],
                accept=['json']),
        ]

    def on_message(self, body, message):
        self.q.put(body)
        message.ack()

        if self.q.qsize() > 10:
            batch = []
            while True:
                item = self.q.get()
                if self.q.empty():
                    break

                print('%s : Compress: %s' % (self.name, item))
                batch.append(item)
                self.q.task_done()

            print('%s : Push batch: %s' % (self.name, batch))


from threading import Thread


threads = []
for i in range(5):
    w = C('worker %s' % i, connection, q)
    t = Thread(target=w.run)
    t.start()
    threads.append(t)

Here is test producer code that generates messages so one can see how consumers behave.

#!/usr/bin/env python

import pika
import sys

connection = pika.BlockingConnection(
    pika.ConnectionParameters(host='localhost'))
channel = connection.channel()

channel.queue_declare(queue='task_queue', durable=True)

message = ' '.join(sys.argv[1:]) or "Hello World!"
for x in range(40):
    channel.basic_publish(
        exchange='',
        routing_key='task_queue',
        body=message,
        properties=pika.BasicProperties(
            delivery_mode=2,  # make message persistent
        ))
    print(" [%s] Sent %r" % (x, message))
connection.close()

In order to test it pika needs to be installed, then simply run consumer in one terminal and trigger producer in other. All the calls to queue.Queue are non-blocking which is most probably reason of different batch sizes. I guess this attempt has to wait for next blog post.

Categories
python

Celery tasks, states and results

The subject of Celery task results comes back every now and then. It would make a really good post, with nice examples. So here we go!

If you don’t know what Celery is:

Celery is a simple, flexible, and reliable distributed system to process vast amounts of messages, while providing operations with the tools required to maintain such a system. It’s a task queue with focus on real-time processing, while also supporting task scheduling.

You can read more by going to their documentation.

States

Celery comes with a few states to start with. These states tell you what is happening to a task and are selected from this list most of the time (unless you have custom states, but I’ll cover this later):

  • PENDING
  • STARTED
  • SUCCESS
  • FAILURE
  • RETRY
  • REVOKED

Such defaults allow you to go pretty far with tracking your tasks. You can even deduct transitions based on current state of the tasks: a FAILURE state means that a task went through PENDING and STARTED states. Here’s an example of how that works. Let’s take a basic task from Celery’s own tutorial. I’m using RabbitMQ of this docker image as my broker.

from celery import Celery

app = Celery('task', broker='amqp://guest:guest@localhost:5672//')

# STARTED state is not enabled by default so we flip it on
app.conf.update(task_track_started=True)


@app.task(bind=True)
def add(self, x, y):
    # we need to sleep to show STARTED state
    import time
    time.sleep(10)
    return x + y

With that we can go through some of the states.

majki@snakepit ~/projects/blog/pow/celery-states
% pipenv run python
Python 3.4.8 (default, Mar 19 2018, 21:12:05)
[GCC 5.4.0 20160609] on linux
Type "help", "copyright", "credits" or "license" for more information.
>>> from task import add
>>> r = add.delay(2, 3)  # worker is disabled so we can have PENDING state
>>> r.status
'PENDING'  # worker is enabled after this
>>> r.status
'STARTED'
>>> r.status
'SUCCESS'
>>> r = add.delay(2, 'a')
>>> r.status
'STARTED'
>>> r.state
'FAILURE'

It is pretty useful and probably covers a lot of use cases, but there’s even more to discover about Celery’s states.

Custom states

You can also define your own states if you need to. Let’s modify our example a bit to include a new state called ‘GOING_TO_SLEEP’.

from celery import Celery

app = Celery('task', broker='amqp://guest:guest@localhost:5672//')

# STARTED state is not enabled by default so we flip it on
app.conf.update(task_track_started=True)


@app.task(bind=True)
def add(self, x, y):
    self.update_state(state='GOING_TO_SLEEP')
    import time
    time.sleep(10)
    return x + y

Now let’s see how this works.

majki@snakepit ~/projects/blog/pow/celery-states
% pipenv run python
Python 3.4.8 (default, Mar 19 2018, 21:12:05)
[GCC 5.4.0 20160609] on linux
Type "help", "copyright", "credits" or "license" for more information.
>>> from task import add
>>> r = add.delay(2, 3)
>>> r.status
GOING_TO_SLEEP'
>>> r.status
'SUCCESS'

You can go bananas with such a feature and be really pedantic on your tasks state flow. It may be beneficial for some really complex workflows – if you would like to monitor it, so do it.

Results storage

Now let’s tackle the other concern. How do you save a task result? All my examples up to this point were in REPL, so all results are gone as soon as you close it. Thankfully authors of Celery thought about it and provided us with such functionality. Celery supports multiple type of storages, making almost everyone happy. I’m not done with my task example, just needs a little of tweaking. To be honest, not much is needed to have task results persisted.

from celery import Celery

app = Celery('task', broker='amqp://guest:guest@localhost:5672//')
app.conf.update(task_track_started=True,
                result_backend='file:///var/celery/results')


@app.task(bind=True)
def add(self, x, y):
    import time
    time.sleep(10)
    return x + y

According to my configuration, all the task results will be saved under /var/celery/results directory. I have picked file-system backend as it is easiest to show.

majki@snakepit ~/projects/blog/pow/celery-states
% pipenv run python
Python 3.4.8 (default, Mar 19 2018, 21:12:05)
[GCC 5.4.0 20160609] on linux
Type "help", "copyright", "credits" or "license" for more information.
>>> from task import add
>>> add.delay(1,1)
<AsyncResult: be19e446-64a6-4fa9-b1a4-05feeb4fbec2>
>>> add.delay(1,1)
<AsyncResult: 1dc82626-9731-43ea-b4ea-6f237143dd42>
>>> add.delay(1,1)
<AsyncResult: badbe577-9314-4a92-91b8-80d071a5f8b5>
>>> add.delay(1,1)
<AsyncResult: c2da2e98-7a05-411f-acd5-7ead99c12c4e>

Now we can have a look at our results backend and see if results are stored. I had been keeping results of all my tasks while writing this post so there is a lot of files. The last four are the ones that you can see in the output of REPL above.

majki@snakepit ~/projects/blog
% ls -thor  # thanks firemark!
total 236K
drwxr-xr-x 6 majki 4,0K kwi 10 20:46 cryogen
drwxr-xr-x 3 majki 4,0K kwi 11 20:28 pow
-rw-rw-r-- 1 majki  120 kwi 11 21:10 celery-task-meta-f730b12b-25ee-46d2-a7d5-eabda0ab14ef
-rw-rw-r-- 1 majki  120 kwi 11 21:11 celery-task-meta-6c7a3fc1-882b-4ddd-be9e-bd1431099eae
-rw-rw-r-- 1 majki  120 kwi 11 21:11 celery-task-meta-f5a73a65-6bda-47b8-a04e-34d1e5b7b81d
-rw-rw-r-- 1 majki  120 kwi 11 21:12 celery-task-meta-c012b4a7-362f-4d05-ac65-9dc398ac514e
-rw-rw-r-- 1 majki  120 kwi 11 21:14 celery-task-meta-c0654b79-65b0-4365-9a45-6b3d248d559d
-rw-rw-r-- 1 majki  120 kwi 11 21:14 celery-task-meta-f2d51878-86db-4d21-bc36-c651e63bdda3
-rw-rw-r-- 1 majki  120 kwi 11 21:14 celery-task-meta-ff9071d1-5ac1-4f81-9da2-29b4f706f5d8
-rw-rw-r-- 1 majki  120 kwi 11 21:15 celery-task-meta-8691b6f0-fa8f-401b-9133-0ce8308e34c9
-rw-rw-r-- 1 majki  120 kwi 11 21:18 celery-task-meta-9d312364-ed4c-49f2-a6eb-7e3b6a9eeaef
-rw-rw-r-- 1 majki  120 kwi 11 21:19 celery-task-meta-31ae7d06-c292-48c8-b526-eebeb021bb90
-rw-rw-r-- 1 majki  120 kwi 11 21:22 celery-task-meta-39d6126d-57f9-4a2b-a09b-68475076ec08
-rw-rw-r-- 1 majki  120 kwi 11 21:39 celery-task-meta-264975a3-b8d0-4855-a19a-fc64e1384bfe
-rw-rw-r-- 1 majki  760 kwi 11 21:46 celery-task-meta-b7d429d2-9aaa-4af5-914a-aba1321676cb
-rw-rw-r-- 1 majki  120 kwi 11 21:57 celery-task-meta-fa4f6d6a-aeb2-416b-9efc-9e7947ef9550
-rw-rw-r-- 1 majki  120 kwi 11 22:03 celery-task-meta-f5774f4f-0d51-4861-af95-a1a8ff94d1b1
-rw-rw-r-- 1 majki  120 kwi 11 22:18 celery-task-meta-be19e446-64a6-4fa9-b1a4-05feeb4fbec2
-rw-rw-r-- 1 majki  120 kwi 11 22:26 celery-task-meta-1dc82626-9731-43ea-b4ea-6f237143dd42
-rw-rw-r-- 1 majki  120 kwi 11 22:26 celery-task-meta-badbe577-9314-4a92-91b8-80d071a5f8b5
-rw-rw-r-- 1 majki  120 kwi 11 22:26 celery-task-meta-c2da2e98-7a05-411f-acd5-7ead99c12c4e

majki@snakepit ~/projects/blog
% cat celery-task-meta-c2da2e98-7a05-411f-acd5-7ead99c12c4e| python -m json.tool
{
    "children": [],
    "result": 2,
    "status": "SUCCESS",
    "task_id": "c2da2e98-7a05-411f-acd5-7ead99c12c4e",
    "traceback": null
}

There you have it, all is stored "safely" (it’s my laptop πŸ™‚ ) and can be viewed if required.

Hopefully this puts all concerns aside regarding task states, task results, and sheds a bit of light on Celery’s extensive API.

Categories
microservices

Cleaning up messy distributed logs, part 2

This is a story on sorting logs. First part can be found here, it’s where you can get all the nitty gritty. If you are not feeling like it here is a short elevator pitch. Our kibana logs are a mess, each of the services seems to have different log format and trace backs are split across many entries. My goal is to fix it.

Second part is much much shorter as it’s one of the steps I took on path to sorting this pet peeve of mine. Unfortunately, I can not spend my whole time at work to fix it as it is not a crucial part of our application. At least it is not at this moment.

My first idea was to tackle "easy" issue of grok parsing failure. Our configuration is rather bloated IMO so I trimmed it (I had mentioned that it is not my concern but it easier to reason about simple code). From this:

      input {
        gelf { }
        heartbeat { }
        syslog {
          port => 5145
        }
      }
      filter {
        ruby {
          code => "
            event.to_hash.keys.each { |k| event[ k.gsub('"'.'"','"'_'"') ] = 
            event.remove(k) if k.include?'"'.'"' }
          "
        }
        grok {
          match => {
            "message" => [
              # Logspout
              "%{SYSLOG5424PRI}%{NONNEGINT:ver} +(?:%{TIMESTAMP_ISO8601:ts}|-) 
              +(?:%{HOSTNAME:containerid}|-) 
              +(?:%{WORD:process}\.%{INT:processnumber}\.%{WORD:processsha}|%{NOTSPACE:process}) 
              +(?:%{NOTSPACE:proc}|-) +(?:%{WORD:msgid}|-) 
              +(?:%{SYSLOG5424SD:sd}|-|) +%{GREEDYDATA:msg}"
            ]
          }
        }
      }
      output {
        elasticsearch {
          hosts => ["elasticsearch:9200"]
        }
        stdout {
          codec => rubydebug
        }
      }

I went to this:

     input {
      syslog { }
      }
      filter {
        grok {
          match => {
            "message" => [
              # Logspout
              "%{SYSLOG5424PRI}%{NONNEGINT:ver} +(?:%{TIMESTAMP_ISO8601:ts}|-) 
              +(?:%{HOSTNAME:containerid}|-) 
              +(?:%{WORD:process}\.%{INT:processnumber}\.%{WORD:processsha}|%{NOTSPACE:process}) 
              +(?:%{NOTSPACE:proc}|-) +(?:%{WORD:msgid}|-) 
              +(?:%{SYSLOG5424SD:sd}|-|) +%{GREEDYDATA:msg}"
            ]
          }
        }
      }
      output {
        elasticsearch {
          hosts => ["elasticsearch:9200"]
        }
      }

We had few plugins that were doing nothing so went under the cleaver. Since I can run the swarm locally I’d verify that all is logged and displayed correctly in kibana.

So then I did what everyone does, google it hard. Actually duckduckgo it but it just does not sound right. After lot of reading finally found something relevant to my case. So the perpetrator is syslog plugin, I mentioned before that logs format might not be consistent and possibly (for sure TBH) not up to syslog’s spec. As author recommended I had replaced syslog { } with tcp and udp plugins.

     input {
          tcp {
              port => 5145
              type => syslog
          }
          udp {
              port => 5145
              type => syslog
          }
      }

Logs are now without _grokparsefailure tags. Still bit messy but at least correctly and efficiently parsed. See you in the next part.

Categories
microservices

Cleaning up messy distributed logs, part 1

Logging is an important part of software lifecycle. It is used in every stage, from the beginning, through testing, deployment, staging, production. I find neat logs soothing, they increase my confidence in running software providing visibility when required.

Having said that I have taken a task to work on logs in a distributed system I’m working on. Since we push everything through ELK stack this is what I have inherited.

Unfiltered messy kibana logs

It looks like a mess, but this is because kibana does not have any sensible default configuration in our setup. After looking more closely there is another issue. I have marked it with red.

Unfiltered messy kibana logs with tags marked with red

So time to look at how logstash is configured as there is a problem with how grok filter is set up. Before doing that it is worth explaining how can you configure logstash as it will help in understanding my configuration, and maybe explain briefly what logstash is.

Easiest way to describe purpose of this tool is by citing introduction taken from the documentation.

Logstash is an open source data collection engine with real-time pipelining capabilities. Logstash can dynamically unify data from disparate sources and normalize the data into destinations of your choice.

How do you configure it? Simply, by taking the plugins you wish and putting them into your configuration file. Plugins are:

  • input plugins, they are sources of events fed to logstash. list
  • filter plugins, used for processing of event data. list
  • output plugins, send data to particular destination. list
  • codec plugins, can be part of input or output plugins and can change the data. list

In my case configuration is a bit complex and what’s most important grok is not configured correctly (So it seems but there might be something else as well in play). Some of the plugins may be redundant as well but this is not my concern now.

      input {
        gelf { }
        heartbeat { }
        syslog {
          port => 5145
        }
      }
      filter {
        ruby {
          code => "
            event.to_hash.keys.each { |k| event[ k.gsub('"'.'"','"'_'"') ] = 
            event.remove(k) if k.include?'"'.'"' }
          "
        }
        grok {
          match => {
            "message" => [
              # Logspout
              "%{SYSLOG5424PRI}%{NONNEGINT:ver} +(?:%{TIMESTAMP_ISO8601:ts}|-) 
              +(?:%{HOSTNAME:containerid}|-) 
              +(?:%{WORD:process}\.%{INT:processnumber}\.%{WORD:processsha}|%{NOTSPACE:process}) 
              +(?:%{NOTSPACE:proc}|-) +(?:%{WORD:msgid}|-) 
              +(?:%{SYSLOG5424SD:sd}|-|) +%{GREEDYDATA:msg}"
            ]
          }
        }
      }
      output {
        elasticsearch {
          hosts => ["elasticsearch:9200"]
        }
        stdout {
          codec => rubydebug
        }
      }

This is the state now, further on I’ll describe as I go, my quest to make logs great again.

Categories
packaging python

Python packaging primer

There is a great initiative at place where I work currently called Python Bunch. Every couple of weeks someone gives a talk related to Python. I have decided to give one in regards to Python packaging, most challenging thing in Python for me. Possibly more complex than meta programming and monads πŸ˜‰ This post will be a preparation for this talk as I’m about to write down everything I would like to talk about. I’ll start with basics describing what is package, basic elements of package configuration, how to install dependencies. Next is how to create a proper setup.py and how to define dependencies.

Foundation

The basic question is what the Python package really is. In short it’s a bunch of files which are installed when you do a pip install, but in order for pip to make sense out of this bunch there must be a proper structure. Python package in fact is an archive including required files, like setup.py and optional ones like setup.cfg, and MANIFEST. The directory structure is dependent on the format used, eggs are different to wheels (these are package formats). Both formats have different directory structure but starting point for both is same and it consists of four files I have mentioned.

setup.py describes the package. It is in my knowledge file that is required to build a package. Details like name, version, repository url, description, dependencies, category, etc. are defined in this file. pypi uses this file to create a descriptive page listing for example supported versions.

setup.cfg is a set of default options for setup.py commands like bdist or sdist. It may be used by other tools like bamp or pytest for the same purpose of keeping configuration.

MANIFEST is another list. If your package requires to have any non code files, maybe a documentation, or .csv files with data, it should be added to this file.

Writing setup.py

In the old days it was rather daunting task for me to write a setup.py. It was either because I was pretty fresh to Python or there was no tooling. Rather former than latter πŸ˜‰ Today I would use cookiecutter solution for a python package, it may be a bit overwhelming but it’s a good place to start if you are already familiar with cookiecutter. This is however for people starting from scratch. If you already have a package and would like to upgrade your setup use pyroma tool to rate your packaging skills. You will also get information out of pyroma what is missing from your package definition, so it helps to iron out the kinks. Another approach is to have a look at a couple of widely used and popular packages like flask, requests or django and base your file on their approach. Please be aware that they most probably include things not needed by your package, it’s worth to look at arguments passed into setup() anyway.

As an example of pyroma usage here is a grade of one of the packages I have written.

majki@enchilada ~/projects/priv
% pyroma bamp
------------------------------
Checking bamp
Found bamp
------------------------------
Your package does not have keywords data.
------------------------------
Final rating: 9/10
Cottage Cheese
------------------------------

It’s not a bad rating and here is the villain. It is also a good starting point, despite a bit of bloat you most probably won’t need.

# -*- coding: utf-8 -*-
from setuptools import setup, find_packages

setup(
    name='bamp',
    version='0.2.2',
    install_requires=['Click', 'dulwich', 'six'],
    entry_points='''
      [console_scripts]
      bamp=bamp.main:bamp
      ''',
    packages=find_packages(),
    long_description='Bamp version of your packages according to semantic versioning. Automagically create commits and tags.',
    include_package_data=True,
    zip_safe=True,
    description='Bamp version according to semantic versioning',
    author='MichaΕ‚ Klich',
    author_email='michal@michalklich.com',
    url='https://github.com/inirudebwoy/bamp',
    setup_requires=['pytest-runner'],
    tests_require=['pytest'],
    license='MIT',
    classifiers=['Development Status :: 4 - Beta', 'Environment :: Console',
                 'License :: OSI Approved :: MIT License',
                 'Operating System :: OS Independent',
                 'Programming Language :: Python :: 2',
                 'Programming Language :: Python :: 2.7',
                 'Programming Language :: Python :: 3',
                 'Programming Language :: Python :: 3.4',
                 'Programming Language :: Python :: 3.5',
                 'Topic :: Software Development :: Build Tools'])


Defining dependencies

There is a good chance that you need to define a couple dependencies when building a package. You may do it in requirements.txt file I have mentioned couple paragraphs before, but if you had a glimpse at online resources you may be convinced there is another place for dependencies. I’m referring to install_requires argument passed into setup(), which accepts a list of strings defining package names and versions. I gotta say this is incorrect and the purpose of those two list is quite different.

requirements.txt

In order to explain the difference I’ll start with clarifying what requirements.txt is used for. Simply put it is a line by line directory of packages. Below is a plain example of really short file.

click==6.7
six==1.10.0
dulwich==0.16.3

So it is a list. List of names and versions, and it may look similar to something you have seen already. You are right, the list is in format of pip freeze output. It is an exact reflection of your current environment and there is a good reason for it. Some may call it a concrete dependency, as it won’t change and when installed give you the same setup each time. The word in bold couple sentences back is environment, and to me it is very similar to a deployment definition. If you have written or even saw any deployment configurations, either docker or ansible, Python packages are installed by running pip install -r requirements.txt. Since run of deployment script should give you exact same results these dependencies are called ‘concrete’. To illustrate this better requirements.txt can specify the source from packages are pulled, be it public PyPI, private on premises version of PyPI, or your local directory.

# public PyPI
--index-url https://pypi.python.org/simple/

click==6.7
six==1.10.0
dulwich==0.16.3
# private PyPI
--index-url https://gizmo.biz/pypi/

click==6.7
six==1.10.0
dulwich==0.16.3
# local directory
--find-links=/local/dir/

click==6.7
six==1.10.0
dulwich==0.16.3

install_requires

install_requires consist of other list, the abstract dependencies list. Items on this list should more loosely define a set of dependencies. What I mean by loosely is no versions, and definitely no links to package sources (private PyPI, or directory, etc.). In it’s simplest form it’s just a bunch of names.

install_requires=['Click', 'dulwich', 'six']

You may include a minimal working version if you know that your library will not work with anything below a certain version. In case of SemVer you may also define a higher boundary for a version if you know of any incompatible changes.

install_requires=['Click', 
                  'dulwich>=0.16', 
                  'six>=1,<2']

It is a flexible system, but on the other hand allows to specify some set of rules to follow when installing a package. Having pinned versions or even using dependency_links is not advised. Defining such dependencies and requirements for package sources may not be possible to fulfill by a person fetching this code. It is a frustrating thing to work offline with a cache of PyPI and having to modify setup() call in order to even start development.

Hopefully after reading this you realized, as I did at some point, that packaging is not that difficult. To be fair it can be better, and it is getting better. Now you have the foundation to make a package with 10/10 grade given by pyroma. What cheese is your package?

Categories
patterns python

Iterator fun

PyCon US 2017 finished more than a month ago. By the miracle of the technology everyone not able to attend can watch all the talks conveniently in ones own home. So I did, starting with a list of talks recommended in one of the recent episodes of Talk Python To Me podcast.

At this point it’s easy to guess that this post will be about PyCon and most probably about one of the talks I enjoyed. The talk I’d like to focus on is Instagram Keynote delivered by Lisa Guo and Hui Ding. Really good performance and really good slides, in my opinion top notch delivery.

Both speakers have been talking about migration of Instagram from Legacy Python (2.7.x) to Modern Python (>3.5). Massive user base and codebase, outdated third party packages without Python3 support, outdated Django (super old), and Legacy Python. Sounds like a typical software endeavour, maybe except user base being massive. When speaking of challenges Lisa mentioned something that really surprised me. I’m talking about the issue with iterators and builtin function any. Here is the slide she used to illustrate the problem and below is the code in question.

CYTHON_SOURCES = [a.pyx, b.pyx, c.pyx]
builds = map(BuildProcess, CYTHON_SOURCES)
while any(not build.done() for build in builds):
    pending = [build for build in builds if not build.started()]
    <do some work>

This piece of code works fine under Legacy Python but when run under Modern Python first element of builds list is lost in the while loop. builds list inside of while is missing an element. The cause of it is the change of return value of map function in Modern Python from list into iterator. This is something new to me and I really had to know the reason why first value is lost.

Why does it happen?

It would be much easier to reason about this with simpler code example.

Python 3.5.3

>>> m = map(str, [1, 2])
>>> any(m)
>>> print(list(m))

['2']

Puzzling, I was wondering if same thing would happen with Legacy Python. In order to make it work result of map has to be converted into iterator.

Python 2.7.13

>>> m = iter(map(str, [1, 2]))
>>> any(m)
>>> print(list(m))

['2']

This behaviour is shared among Python versions and most likely is intended behaviour, although not documented one I guess. But why does this happen? Answer to this question as usual requires us to look into how Python is implemented. Implementation of any function needs to be examined. So whip up Python/bltinmodule.c file out, it can be any version of Python. Below is mentioned function taken from source of Python 3.5.1 as it’s most recent source I have.

static PyObject *
builtin_any(PyModuleDef *module, PyObject *iterable)
{
    PyObject *it, *item;
    PyObject *(*iternext)(PyObject *);
    int cmp;

    it = PyObject_GetIter(iterable);
    if (it == NULL)
        return NULL;
    iternext = *Py_TYPE(it)->tp_iternext;

    for (;;) {
        item = iternext(it);
        if (item == NULL)
            break;
        cmp = PyObject_IsTrue(item);
        Py_DECREF(item);
        if (cmp < 0) {
            Py_DECREF(it);
            return NULL;
        }
        if (cmp == 1) {
            Py_DECREF(it);
            Py_RETURN_TRUE;
        }
    }
    Py_DECREF(it);
    if (PyErr_Occurred()) {
        if (PyErr_ExceptionMatches(PyExc_StopIteration))
            PyErr_Clear();
        else
            return NULL;
    }
    Py_RETURN_FALSE;
}

Looking at body of any we can tell that there is no difference if you call it with iterator or iterable. Both objects can be iterated on, difference is in what PyObject_GetIter returns when either list or iterator is used.

As we know iterators work tirelessly till exhausted, but list is not an iterator. List is an iterable. List when iterated returns fresh iterator each time, thus is never exhausted. Each for loop operating on a list gets fresh iterator with all the values. In our case important lines are these:

    it = PyObject_GetIter(iterable);  // This retrieves an iterator
    ...
    
    item = iternext(it);  // This consumes an element

any returns True if any element is true. When running on Modern Python any will consume each element up to one that is true. Cause of it is the loop and iternext call that subsequently removes elements from the iterator. This simple example illustrates the behaviour.

Python 3.5.3

>>> m = map(bool, [False, False, True, False])
>>> any(m)
True
>>> print(list(m))
[False]

One other function came to my mind. How would all behave? Mechanics of it seems to be the same as with any. Answer is pretty simple when we look at the source.

static PyObject *
builtin_all(PyModuleDef *module, PyObject *iterable)
{
    PyObject *it, *item;
    PyObject *(*iternext)(PyObject *);
    int cmp;

    it = PyObject_GetIter(iterable);
    if (it == NULL)
        return NULL;
    iternext = *Py_TYPE(it)->tp_iternext;

    for (;;) {
        item = iternext(it);
        if (item == NULL)
            break;
        cmp = PyObject_IsTrue(item);
        Py_DECREF(item);
        if (cmp < 0) {
            Py_DECREF(it);
            return NULL;
        }
        if (cmp == 0) {
            Py_DECREF(it);
            Py_RETURN_FALSE;
        }
    }
    Py_DECREF(it);
    if (PyErr_Occurred()) {
        if (PyErr_ExceptionMatches(PyExc_StopIteration))
            PyErr_Clear();
        else
            return NULL;
    }
    Py_RETURN_TRUE;
}

As we can see this function also loops over iterable using an iterator. Condition however is slightly different as it needs to verify if all elements are true. This will cause all to consume each element up to first false element. Again simple example will illustrate this best.

>>> m = map(bool, [True, True, True, False, True])
>>> all(m)
False
>>> print(list(m))
[True]

It has been bugging me since I watched the video, now I have my closure. I learned a bit while investigating this and I’m happy to share it.

P.S. Yes, it is Legacy Python.

Categories
logging python

Logging and microservices

The context

As you might already know, logging is a rather important thing. If you don’t know, you will probably find out, as (like with backups) there are people who log and ones who just have plenty of spare time. Good logging is hard. The line between spamming and logging is thin, the subject of useful logging is worth exploring, possibly in another post.

This post will be about logging information by tracking incoming requests within web applications based on microservices architecture. Having a distributed system means that a request send by the user will most probably hit more than one service. It is very useful to keep track of how the request is routed across all the services. When one service calls another, in order to not treat every call as separate but have a bigger picture of what has been happening with data along the way, the request has to be identified and the identifier must be passed along.

By using the REST approach each of the services becomes the consumer and provider of the API. They are third party API’s for each other.

The goal

Distributed systems require you to have a central place for storing and viewing logs, so that you remain sane. A good choice would be something like elk.

The fact is that apps are multi threaded but a log is not. It is single threaded and time-indexed. It doesn’t matter if the log is kept in one place or partitioned onto several computers; reading it with no indicator how records are related does not make a lot of sense. I’d compare it to trying to read a document that went through a shredder.

Here’s a real life example to illustrate this problem. Imagine a simple document search engine that allows users to search for a document that has been stored on the server. The user types in a name in the search box and the system comes up with a list of candidates. Pretty simple and common example of an app. From the user’s point of view it looks pretty simple, however, it gets complicated when we look into the internals. Under the hood, querying the app might open a request to one or many servers. Servers might be kept behind load balancers that would introduce additional fragmentation of requests and logs. If you take into account different response times that may happen, depending on the server load etc., you will definitely not get a linear and chronological log of events that happened. In addition to that, each service will treat incoming request as a unique request without the history that may have happened up to this point. Debugging would be horrendous. So the goal is to provide a set of tools that would let you track how the request travels across services. Each request should be given an ID that would travel with it and would be stored with each log record being saved.

The solution

The idea is to have a header with a unique ID on each request that would identify them when travelling across the system. The name of the header is ‘Correlation-Id’, you may pick whatever suits your case. The solution is based on a real world example with pretty simple tech stack.

  • ubuntu
  • nginx
  • uwsgi
  • flask or django

I’d like to advise you that the code here is not complete and will probably not work if copied and pasted. Hopefully it is clear enough to guide you to your solution.

Nginx

Requests coming into the system are first handled by the Nginx server. There are two cases that it needs to handle: request with Correlation-Id header and one without the header. If the request has the required Correlation-Id header, nginx passes the request to uwsgi without modification. The second case is when the request does not include the Correlation-Id header and nginx has to inject it. It is possible to provide a bit of logic into nginx config file – the code has to be written in Lua (a very simple language), thus it requires a module to be installed. Having the logic inside the config file makes it easy to deploy and maintain. I’d reconsider this with a more complicated case, but for this one it seems all right. When installing nginx package by ansible, the only thing that had to be modified was the name of the package. Ubuntu has an nginx-extras package that adds Lua with a bunch of other features. After you install nginx-extras you can write Lua in config files.

    # Generate random HTTP_CORRELATION_ID if request does not provide it
    set_by_lua $correlation_id '
        if(ngx.var.http_correlation_id == nil or ngx.var.http_correlation_id == "")
            then
               return string.format("%010d", math.random(0, 10000000000))
            else
               return ngx.var.http_correlation_id
        end
    ';

This snippet evaluates if the request includes the Correlation-Id header and generates new random value if it’s missing. The value being returned can now be attached to the request.

uwsgi_param HTTP_CORRELATION_ID $correlation_id;

How you inject value depends on what is your setup. When running an app using uwsgi and sockets I could not make it work by calling proxy_set_header but it worked with uwsgi_param instead. After switching to serving app on port it worked fine with proxy_set_header.

Next step is to handle the header in app as uwsgi simply passes it through.

The app

At this point all requests have a Correlation-Id header which can be extracted and used for logging purposes. In order to simplify the life of a developer and automate all the work, an intermediary needs to do the heavy lifting. Processing of the header has to be done before the request will be handled over to any views in Django or Flask app. Logging effort also has to be minimal (as Homer Simpson said, "Can’t someone else do it?"). Applications also contact other services so such calls should include the header as well. Thus the way in which the Correlation-Id is transferred should be done effortlesly. Finally, the response should be returned to the originator as it may also log the result on its side. Every element of this puzzle is invisible to the developer, apart from the helpers designed to work when a service contacts other services. It is not possible to enforce sending the required header unless the requests library (which almost everyone loves), becomes a standard in the project. I provide examples using requests as everyone in my team uses it, if you are using different tools you have to make your own helpers.

Flask

As mentioned earlier, here comes the bit that does the heavy lifting. Flask provides two methods allowing to register functions that will run before each request and after each request. I’ll use them to get the value of Correlation-Id from the header and to attach the value to the header on exit.

import threading

from flask import request

tlocal = threading.local()

def log_incoming(*args, **kwargs):
    tlocal.corr_id = request.headers.get('Correlation-Id')
    logger.debug('corrID %s extracted', tlocal.corr_id)

Flask already uses thread-local objects, and if you are only using Flask as your framework, you might get away with using the request object instead of calling threading.local.

def cleanup_after_outgoing(response):
    response.headers['Correlation-Id'] = tlocal.corr_id
    logger.debug('Removing corrID %s from thread local storage',
                 tlocal.corr_id)
    del tlocal.corr_id
    return response

Making sure the header is returned and doing cleanup like a good scout. The middleware has to be registered next so Flask can use it.

app = Flask(__name__)

# register middleware
app.before_request(log_incoming)
app.after_request(cleanup_after_outgoing)

Django

Middleware for Django is quite similar. The place where it differs is registration of the middleware in the framework.

class ThreadStorageMiddleware:
    def process_request(self, request):
        tlocal.corr_id = request.META.get(UWSGI_CORRELATION_HEADER)
        logger.debug('corrID %s extracted', tlocal.corr_id)

    def process_response(self, request, response):
        try:
            response[CORRELATION_HEADER] = tlocal.corr_id
        except AttributeError:
            # corr_id has not been set
            return response
        logger.debug('Removing corrID %s from thread local storage',
                     tlocal.corr_id)
        del tlocal.corr_id
        return response

Using thread-local storage in Django is required as there is no mechanism for storing information in an accessible way in every function during the lifetime of response. Remember to register middleware by adding it to the MIDDLEWARE_CLASSES list.

Utils

The whole effort of adding the middleware is not useful if you are not passing the Correlation-Id value to other services that you might be contacting. The solution proposed in this post is not forcing anyone to use specific libraries, however, this greatly simplifies passing the header. I gotta say, this is not ideal and may change in the future depending on how often people forget about using it.

def add_corr_id(headers=None):
    headers = headers or {}
    headers.update({'Correlation-Id': tlocal.corr_id})
    return headers
    
    
requests.get('https://the.cheddar.com', headers=add_corr_id())

headers = {'Required-Content': 'More beans'}
requests.get('https://monty.py', header=add_corr_id(headers))

Helper functions creating the header if header is not passed or adding it to existing dictionary.

Logging

Logging has not been mentioned yet. All the examples were preparing the framework for feeding data to logging so we could log something useful and make sense out of the log file. As with the examples above, logging was designed to be effortless in use by the developer.


class InjectingFilter(logging.Filter):
    def filter(self, record):
        try:
            record.corrId = tlocal.corr_id
        except AttributeError:
            record.corrId = 'Internal'
        return True

First the filter is defined that uses value saved in thread-local storage and injects it into the LogRecord. There will be logs that originated within the service and thus will not have the Correlation-Id header. They are marked with ‘Internal’ label.

DEFAULT_LOGGING = {
    ...
    'filters': {
        'correlation_filter': {
            '()': InjectingFilter
        }
    },
    'formatters': {
        'default': {
            'format': ('%(corrId)s - %(asctime)s - %(filename)s - '
                       '%(funcName)s - %(message)s')
        }
    },
    'handlers': {
        'console': {
            'level': 'DEBUG',
            'class': 'logging.StreamHandler',
            'formatter': 'default',
            'filters': ['correlation_filter']
        },
    }
    ...
}

This config includes all the needed details. New filter is defined with the name ‘correlation_filter’. Formatter configuration makes use of the header value injected into LogRecord. Lastly, the handler configuration ties it together with StreamHandler. Logging now can be used as seen in examples, by calling all the regular functions from logging module.

import logging
logger = logging.getLogger(__name__)

logger.debug('corrID %s extracted', tlocal.corr_id)
logger.debug('Removing corrID %s from thread local storage', tlocal.corr_id)
logger.info('User created')

In first two examples thread-local is used explicitly as it is passed into a string. Third example is just a plain simple log that you would see in every application. What is interesting is the output that goes into the log file or in this case the terminal.

# Making request with lovely httpie
http https://the.web.app Correlation-Id:123456

123456 - 2016-05-16 21:26:31,448 - log.py - process_request - corrID 123456 extracted
123456 - 2016-05-16 21:26:34,309 - log.py - process_response - Removing corrId from threa-local storage
123456 - 2016-05-16 21:26:35,042 - user.py - create - User created

As we can see, the Correlation-Id is silently added to our LogRecord objects and included in the log. In the case with many simultaneous requests, the log can be filtered and does not need any special machinery; pure grep will do the trick if you don’t have access to mentioned earlier elk.

The conclusion

There are no one size fits all solutions or at least I don’t think there are. You could provide libraries to cover handling of the headers on framework level, on server level, log configurations etc. But there are many ways to approach this, for example instead of having a middleware on framework level you could create one for wsgi. It would save you time writing a framework specific solution but it would tie you to wsgi (which could be beneficial for some users). This is why I believe there is no off the shelf solution. I definitely recommend implementing the proposed solution. I’ll stress this again, it is a custom solution. The road to the goal will be a bit rocky, full of surprises but definitely will teach you something new.