Categories
azure extensions vscode

I wrote extension to VS Code

I feel comfortable using Spacemacs, it has almost everything I need while I write in Python. Sometimes I use other languages and this forces me to look into Spacemacs’ configuration. Spacemacs does it’s job but some things are difficult to configure.

So I decided to try and use VS Code for a while instead. It has really good support for Python and other languages. I used it while working on a React Native app or when writing in Typescript. One thing that really interests me is Live Share feature. One can share coding session with anyone by simply sharing a link. Very useful when pair programming, not like Hangouts or Slack.

Couple posts before I mentioned my Slack journal which I use to make my work more transparent to others. So I had an idea to use Live Share and show anyone what I’m currently working on. The feature is a killer but it has one flaw, the URL under which session is available changes each time you start a new session. It is not convenient if you want to share it with a couple people as you have to send this URL everytime you start a new session to each one of them. Even if session or connection failed without your fault.

To fix it I wrote a VS Code plugin and called it Oversharing. The extension allows you to share your session as Live Share would but it keeps your link same across all future sessions. Link that Live Share creates is sent to an Azure Function and saved, in return for a new stable link. This new link will redirect to Live Share link each time it is opened, and will be preserved across all future Live Share sessions.

Hopefully someone finds it useful as I do.

If you are interested in writing your own extension, here you can learn how to write your first.

Categories
azure typescript

Guide on Developing Azure Functions locally

Why

I have spend way too much time on figuring out how to create local development environment for working on Azure Functions. Most of the time was spent on reading Microsoft’s documentation on this topic. Hopefully this guide will save you some time πŸ™‚

The guide had been prepared on Ubuntu and verified as well. Additionaly my coworker verified it to work on macOS High Sierra. If you are here to create your first Azure Function locally please read along.

What are azure functions

Azure Functions are small pieces of code that are run on Azure infrastructure in a serverless fashion. The Infrastructure, provisioning, scaling is Azure’s responsibility. If you wish to read more below are links to detailed explanation by Azure team. I’d recommend at least skimming the first one.

Software required for local development

There are a couple of tools required to be able to develop locally.

  • npm, tooling is mostly in JS/TS
  • .NET Core SDK, required for function extensions
  • Azure Functions Core Tools (later referred as AFCT)
  • Azurite legacy, for storage emulation. It works with Blob, Table, and Queue storages. Legacy is required as the current one supports only Blob storage.
  • Docker

Handling http requests

This part explains how to create a function that is triggered by HTTP request and responds with HTTP response.

Create Azure Function project

Start with calling func init HTTPFunc in the directory of your choosing. You will be asked a couple of questions regarding your project.

majki@enchilada ~/projects/azure-tracker-local
% func init HTTPFunction
Select a worker runtime:
1. dotnet
2. node
3. python (preview)
4. powershell (preview)
Choose option: 2
node
Select a Language:
1. javascript
2. typescript
Choose option: 2
typescript
Writing .funcignore
Writing package.json
Writing tsconfig.json
Writing .gitignore
Writing host.json
Writing local.settings.json
Writing /home/majki/projects/azure-tracker-local/HTTPFunction/.vscode/extensions.json

Create Azure Function

Create function next. As in the previous step you will be asked questions regarding the function you wish to create. I selected 8. HTTP Trigger as this function should respond to HTTP requests.

majki@enchilada ~/projects/azure-tracker-local
% cd HTTPFunction
majki@enchilada ~/projects/azure-tracker-local/HTTPFunction
% func new
Select a template:
1. Azure Blob Storage trigger
2. Azure Cosmos DB trigger
3. Durable Functions activity
4. Durable Functions HTTP starter
5. Durable Functions orchestrator
6. Azure Event Grid trigger
7. Azure Event Hub trigger
8. HTTP trigger
9. IoT Hub (Event Hub)
10. Azure Queue Storage trigger
11. SendGrid
12. Azure Service Bus Queue trigger
13. Azure Service Bus Topic trigger
14. Timer trigger
Choose option: 8
HTTP trigger
Function name: [HttpTrigger]
Writing /home/majki/projects/azure-tracker-local/HTTPFunction/HttpTrigger/index.ts
Writing /home/majki/projects/azure-tracker-local/HTTPFunction/HttpTrigger/function.json
The function "HttpTrigger" was created successfully from the "HTTP trigger" template.

Install dependencies

npm install

Run function

npm run start

Verify

majki@enchilada ~
% curl http://localhost:7071/api/HttpTrigger
Please pass a name on the query string or in the request body%
majki@enchilada ~
% curl "http://localhost:7071/api/HttpTrigger\?name\=Mike"
Hello Mike

Handling Queue Storage

This part explains how to create a function that is triggered by HTTP request to send a message onto Queue Storage.

Install storage emulation

On windows, you may go with emulator provided by Microsoft. On Ubuntu, there is community sourced Azurite. Unfortunately, the most recent version does not support Queue Storage. For queues, install version 2.7.0 by following instructions.

Create Azure Function project

Start with calling func init HTTPFunc in directory of your choosing. You will be asked couple questions regarding your project.

majki@enchilada ~/projects/azure-tracker-local
% func init HTTPFunction
Select a worker runtime:
1. dotnet
2. node
3. python (preview)
4. powershell (preview)
Choose option: 2
node
Select a Language:
1. javascript
2. typescript
Choose option: 2
typescript
Writing .funcignore
Writing package.json
Writing tsconfig.json
Writing .gitignore
Writing host.json
Writing local.settings.json
Writing /home/majki/projects/azure-tracker-local/HTTPFunction/.vscode/extensions.json

Create function

Create function next. As in previous step you will be asked questions regarding function you wish to create. I selected 8. HTTP Trigger as this function should respond to HTTP requests, it will in response send message to Queue Storage.

majki@enchilada ~/projects/azure-tracker-local
% cd HTTPFunction
majki@enchilada ~/projects/azure-tracker-local/HTTPFunction
% func new
Select a template:
1. Azure Blob Storage trigger
2. Azure Cosmos DB trigger
3. Durable Functions activity
4. Durable Functions HTTP starter
5. Durable Functions orchestrator
6. Azure Event Grid trigger
7. Azure Event Hub trigger
8. HTTP trigger
9. IoT Hub (Event Hub)
10. Azure Queue Storage trigger
11. SendGrid
12. Azure Service Bus Queue trigger
13. Azure Service Bus Topic trigger
14. Timer trigger
Choose option: 8
HTTP trigger
Function name: [HttpTrigger] HTTP2Queue
Writing /home/majki/projects/azure-tracker-local/HTTPFunction/HTTP2Queue/index.ts
Writing /home/majki/projects/azure-tracker-local/HTTPFunction/HTTP2Queue/function.json
The function "HTTP2Queue" was created successfully from the "HTTP trigger" template.

Configure function output

In the editor open function.json file located in the directory named after the name of the function. Remove the section related to http output and replace with the queue output as shown below:

{
  "type": "queue",
  "direction": "out",
  "name": "msg",
  "queueName": "outgoingqueue",
  "connection": "MyQueueConnectionString"
}

The file is suppose to look like this:

{
    "bindings": [
        {
            "authLevel": "function",
            "type": "httpTrigger",
            "direction": "in",
            "name": "req",
            "methods": [
                "get",
                "post"
            ]
        },
        {
            "type": "queue",
            "direction": "out",
            "name": "msg",
            "queueName": "outgoingqueue",
            "connection": "MyQueueConnectionString"
        }
    ],
    "scriptFile": "../dist/HTTP2Queue/index.js"
}

Configure queue connection

Some connections defined in function.json require to have a connection string. After adding an outgoing queue connection you need to add a connection string as well. String has to be added to Define connection in local.settings.json by adding "MyQeueueConnectionString". The value of this string is taken from documentation of Azurite.

{
    "IsEncrypted": false,
    "Values": {
        "FUNCTIONS_WORKER_RUNTIME": "node",
        "AzureWebJobsStorage": "{AzureWebJobsStorage}",
        "MyQueueConnectionString": "DefaultEndpointsProtocol=http;AccountName=devstoreaccount1;AccountKey=Eby8vdM02xNOcqFlqUwJPLlmEtlCDXJ1OUzFT50uSRZ6IFsuFq2UVErCz4I6tq/K1SZFPTOtr/KBHBeksoGMGw==;QueueEndpoint=http://127.0.0.1:10001/devstoreaccount1;"
    }
}

Install dependencies

npm install

Install extensions

Since we are using an output that is different than http, we have to install storage extensions to make the queue work.

func extensions install

Run azurite

Run Azurite by using Docker

docker run -e executable=queue -d -t -p 10001:10001 -v queue_Storage:/opt/azurite/folder arafato/azurite

More information on how to run it is here.

Run function

In order to put any message on the queue, the function needs to be modified a bit. Please change it accordingly.

const httpTrigger: AzureFunction = async function (context: Context, req: HttpRequest): Promise<void> {
    context.log('HTTP trigger function processed a request.');
    const name = (req.query.name || (req.body && req.body.name));
    context.bindings.msg = name;
    context.done();
};

And then

npm run start

Verify

Now we can see how this is working. We are going to call our function a couple of times and then see what messages had been put on the queue.

majki@enchilada ~
% curl "http://127.0.0.1:7071/api/HTTP2Queue?name=Mike"
HTTP/1.1 204 No Content
Content-Length: 0
Date: Tue, 14 May 2019 06:39:25 GMT
Server: Kestrel
majki@enchilada ~
% curl "http://127.0.0.1:7071/api/HTTP2Queue?name=Mickey"
HTTP/1.1 204 No Content
Content-Length: 0
Date: Tue, 14 May 2019 06:39:28 GMT
Server: Kestrel

There should be two messages on the queue at this moment. We can query the queue by running:

majki@enchilada ~
% curl "http://127.0.0.1:10001/devstoreaccount1/outgoingqueue/messages?peekonly=true&numofmessages=10"
HTTP/1.1 200 OK
Connection: keep-alive
Content-Length: 885
Content-Type: application/xml; charset=utf-8
ETag: W/"375-pdRmS0dxI0dhZlfRbwGMunhL+WM"
X-Powered-By: Express
date: Tue, 14 May 2019 06:42:39 GMT
x-ms-request-id: 77856d80-7613-11e9-afc4-b5f560b4039e
x-ms-version: 2016-05-31

<?xml version='1.0'?><QueueMessagesList><QueueMessage><MessageId>cae264ef-74bd-4bde-b03e-01be8821968a</MessageId><InsertionTime>Tue, 14 May 2019 06:29:04 GMT</InsertionTime><ExpirationTime>Tue, 21 May 2019 06:29:04 GMT</ExpirationTime><DequeueCount>0</DequeueCount><MessageText>TWlrZQ==</MessageText></QueueMessage><QueueMessage><MessageId>13732448-a498-4fc5-85cf-8c4afafe082b</MessageId><InsertionTime>Tue, 14 May 2019 06:39:25 GMT</InsertionTime><ExpirationTime>Tue, 21 May 2019 06:39:25 GMT</ExpirationTime><DequeueCount>0</DequeueCount><MessageText>TWlrZQ==</MessageText></QueueMessage></QueueMessagesList>

Azurite supports Queue Storage API, so if you need to know more operations read linked docs.

Categories
remote work slack

Slack journal is my office blog

I use slack at work, which is not a surprise as many teams do. Our team is also remote, not fully remote but everyday some part of the team is not in the office. This changes how we work together, and it in my opinion affects communication most. To tackle problems with remote teams it is best to look for advice from companies that are remote as they most probably solved it already. While looking for such advice I found a great guide to remote teams. It has 4 chapters on: hiring, communication, management, and culture. I highly recommend reading the whole guide.

What I have focused on is communication, as this was issue that in my opinion caused most problems. Too little exchange will cause misunderstandings, and may give false impression that no one is working and little is going on. Decisions made without notifying all team members may cause misunderstandings as well. Such decisions are sometimes made in a channel, or in a private talk, or during a meeting.

To improve visibility in what I’m working on I have started keeping a journal on Slack, as described in chapter on communication. I’m documenting what I do, what I read, and my thoughts and opinions on different tasks or projects, for more than a year now.

My slack journal

Journal allows my colleagues to chime in on my channel as well. Numerous times someone had jumped in with a great advice regarding some problem I had been struggling with, or simply with some tool that would solve it, or with a new library I should be looking at. I find it really helpful, as I can track my work during the week so when filling time sheets I can review what I’ve been doing. I’m really bad at keeping track so this solves one of my issues. While writing my journal I have received a lot of new information back, hopefully helped someone, and what is most important provided a visibility in what I do when you can’t see me.

Other thing we are trying is documenting every important decision for others to read. Not everyone can attend a meeting, video call or a small face to face discussion and it is vital to pass this information to other members of the team. So there is no urgency to attend everything, and there is no FOMO as all will be documented.

Categories
elixir patterns

Actor model

Motivation

One of my resolutions this year is to review my notes from each conference I visit and read/learn/build something. Quite recently I have been for the second time to Lambda Days in KrakΓ³w, where I got interested in Elixir. It is functional, what is dear to me but more importantly it is built for concurrent computation. This is achieved by shared nothing architecure and actor model. It means that each actor, or process in Elixir land, is independent and share no memory or disk storage. Communication between processes is achieved by sending messages. This allows building systems that are concurrent with less effort. Since it is so helpful I’ll try to understand what actor is and hopefully explain it here.

This is not a full explanation by any means but rather a primer before I implement my latest project using this computation model. It is possible that my knowledge will be revised along with a new post.

What is the Actor Model

Actor is a model of concurrent computation. It has the following properties or axioms. (I have shuffled them a bit to emphasise messaging as IMHO important part of this model).

  • Can designate how to handle next received message
  • Can create actors
  • Can send messages to actors

Let’s unpack those properties to make it more clear. "Can designate how to handle next received message", so actors communicate with messages. Each actor has an address as well, where messages can be send. And it is up to an actor how will it respond if at all.

"Can create actors" is pretty simple, each actor can spawn other actors if required by performed task.

"Can send messages to actors" as mentioned while describing first axiom communication is done via messages. Actors send messages to each other.

One actor is not really an actor model, as mentioned in one of the articles, actor come in systems.

This is short and simple, it is the jist of it with focus on most important parts of actor model. What I find valuable is the fact that this model brings software failures to the front and forces solution designer to appreciate and expect them.

I find it similar to OOP created by Alan Key and described here

OOP to me means only messaging, local retention and protection and hiding of state-process, and extreme late-binding of all things. It can be done in Smalltalk and in LISP. There are possibly other systems in which this is possible, but I’m not aware of them.

When to use it?

If you are having a task that can be split into stages, may be linked or independent. In such case I find actors more palatable than locks and threading. This is kind of thing that Broadway library for Elixir is trying to solve. Actor model may also be used when thinking about OOP, it might not be possible to implement actors in such way that they are independent at the level this model expects, but thinking in such terms may improve resilience of the project.

Resources

I know I have skimmed this topic and if you are interested please have a look at resources I used to grasp the idea of actor model.

Categories
python

Redis cache

I enjoy listening to podcasts, as they sometimes give me inspiration to create something. In one of the episodes of Python Bytes podcast guys mentioned https://github.com/bwasti/cache.py tool. cache.py allows to cache function calls across runs by using cache file. Simple and really useful.

This inspired me to write a similar thing but for distributed apps, based on redis as a cache storage. I called it rcache and you can find it on PyPI.

In order to use it simply decorate a function like this:

import rcache

@rcache.rcache()
def expensive_func(arg, kwarg=None):
  # Expensive stuff here
  return arg

Default redis address is http://localhost:6379, but you can change it by passing an url into the decorator @rcache.cache(url="http://your_redis:6379").

I hope you find it useful and if you wish to comment or report something please go to https://gitlab.com/the_speedball/redis.cache.py/issues.

Have fun

Categories
javascript testing

Property based testing using jsverify

I wanted to write about property based testing for a long time. Since I usually write about things I have built, in order to write a post I had to built something. So I started building a runners calculator, using react-native. The calculator helps with estimating the pace (min/km) a runner needs to keep in order to finish by given duration.

What is property based testing you might ask?

It simply is an approach to testing where you verify your code’s properties. According to QuickCheck article on wikipedia

… the programmer writes assertions about logical properties that a function should fulfil.

A good example of this are properties of multiplication operation.

  • Every integer multiplied by 0 gives 0.
  • Every integer multiplied by 1 gives same number.

These are the properties of a multiplication function that can be verified. They are quite obvious to anyone who had math in primary school. Your function is probably more complex and testing each of the cases may be cumbersome. Here property based testing comes into play. Given the definition of a property you are able to test your code against large number of cases that might not be obvious to you in the first place.

Why use it?

It helps with proving your solution. Tools which support property based testing can really save you time on writing test cases, as they generate a lot of them. It is worth watching Hillel Wayne in talk Beyond Unit Tests: Taking Your Testing to the Next Level. This talk inspired me to give this a go again. So you may try yourself if you are not convinced.

How to design for property based testing?

To be honest, applying this technique in my work was difficult at first. Problems I’m solving are most of the time not suited for this kind of testing. I thought this is rather obscure tool for specific cases. Then after watching a talk by Gary Bernhardt from SCNA 2012 titled Boundaries I had a "it’s not you it’s me" moment. I realised that software I’m writing may not be designed in a way that is testable in such manner.

The talk is worth watching, however, the spark that lit the fire was one specific thought of his: "creating functional cores". When you have a core of your software (class or function) that is functional, it is more likely that you can use property based testing. I also think that using primitive types may help as well. The tools I have found were limited to such types and constructing more elaborate types could be costly. I’m going to explore contract testing which may have better support for more complex types and objects, as well as static type checking.

What I have used in my small project?

Let’s have a look at some code. I’m using react-native and javascript so I had settled on jsverify. Since I have created a small functional core I could test it pretty easily with regular tests and property tests as well.

test('given one input expect string of 8 chars', () => {
    checkForall(nat, (a) => {
        return toTime(a).length === 8;
    });
});

This test function will verify that my toTime function will create a string out of an int, like this toTime(65) => "00:01:05". I’m testing that no matter the input, my function will create an 8 characters long string.

const timeGenerator = bless({
    generator: nat.generator.map(n => `${n.toString().padStart(2, '0')}`)
});
test('toTime and toSeconds should be commutative', () => {
    checkForall(timeGenerator, timeGenerator, timeGenerator, (t, x, y) => {
        const time = [t, x, y].join(':');
        return toTime(toSeconds(time)) === time;
    });
});

Another test verifies that functions operating on time are commutative. Given the time like "01:49:56" converting it to seconds and then back to time will give the same result. In my opinion, this is a good place to test the property with such tools as jsverify. You will get hundreds of test cases for free based on the definition of your property. You will also receive a failing test case.

● Console

console.info node_modules/jsverify/lib/jsverify.js:334
  OK, passed 100 tests
console.error node_modules/jsverify/lib/jsverify.js:325
  Failed after 2 tests and 0 shrinks. rngState: 036f5d837c9e042b1a; Counterexample: "00"; "30"; "12";  [ '00', '30', '12' ]

I wrote a dozen or more unit tests verifying my solution, however, my input arguments were too simple and obvious. Next, I tried jsverify and it found a bug on first run, simply because I have not anticipated such an input that had been created by this library.

This to me is the best incentive to use property based testing even more πŸ™‚

Categories
logging microservices

Cleaning up messy distributed logs, part 3

If you have been reading along it is my final part of this short story on cleaning up messy logs. I might write one on Kibana itself some day, as it is a difficult tool to use for a beginners. Previous parts of this story can be found here: part 1 and part 2.

This last couple of days at work I could spent more time on fixing logs as new project estimates has finished. Last thing I sorted was syslog parsing, with that done I could think on figuring out how to log tracebacks raised in our services.

Tracebacks are spread over many lines and when they arrive to logstash they are parsed as separate entries and then sent to elasticsearch like wise. So Kibana displays them as separate as well making it really difficult to read. In addition of being separate they are in reverse order.

My idea is to serialize each log message into JSON and then parse it in logstash. As I have probably mentioned I am working on an app using microservices pattern and there is a few services to be done. First candidate is the gateway service, as it’s easiest to test and relatively simple with no additional components beside the service. It is flask app running on gunicorn so to configure logging there is no need to change the code but rather your entrypoint by adding --log-config.

#!/bin/bash
exec gunicorn -b 0.0.0.0:80 --log-config /code/log-config.ini --access-logfile - --reload "gate.app:create_app()"

Config file should be written in ini style.

[loggers]
keys=root

[handlers]
keys=console

[formatters]
keys=default

[logger_root]
level=DEBUG
qualname=root
handlers=console

[handler_console]
class=StreamHandler
formatter=default
args=(sys.stdout, )

[formatter_default]
class=logstash_formatter.LogstashFormatter

This alone handles almost whole application configuration, logs are nicely serialized into JSON with whole traceback.

Categories
python

Collaboration of OAuth and LDAP

The goal

Almost everyone knows OAuth, it is widely used online and has a good reputation. In company where I work we have decided to integrate OAuth2 into our platform (based on microservice architecture, I’ll be saying platform from now on). The trick was that it had to be integrated with existing LDAP server storing user data. User requesting the token had to input his LDAP username and password in order to receive the token. Looking for existing solutions was fruitless and new code had to be written. At that point I didn’t know how this could be achieved. In addition to LDAP I had to use Django as a base of this for providing REST API endpoints. The limitations were clear at this point, risks were unknown. This is not an unusual thing when working as a programmer.

The stack

As I mentioned earlier Django was chosen as the framework. Having decided on the framework it narrowed down number of libraries to use. Fortunately there was already an library adding OAuth to the Django, Django OAuth Toolkit. DOT integrates nicely with Django REST framework and allows you to write your plugins for validating token requests. It supports much more but those two features were the main selling points. Talking to LDAP required a library to do the heavy lifting. There is not much choice here to be honest and I stuck with python-ldap. It is different from what you would expect in terms of python library. Messages are not very useful, docs not very clear but it works and is reliable.

The solution

At the beginning the task seemed really difficult to me. I have only played with OAuth without understanding how it works. It was similar with LDAP. After diving into details it stopped looking that hairy as it turned out I only had to plug into process of authorising the user request. Putting this simply the process of issuing the token would not be started until the user provides credentials that are valid to LDAP.

Django OAuth Toolkit

DOT (Django OAuth Toolkit) is pretty flexible, it provides a setting OAUTH2_VALIDATOR_CLASS where you can define your own validator. This allows to control each step of OAuth2 process. Fortunately I was only concerned with user validation. In order to achieve it I had to write my own validator. Easiest way was to read the default class which has been provided by DOT, namely oauth2_provider.oauth2_validators.OAuth2Validator. It is nicely written, each step has it’s own method that can be replaced. Just had to find a proper one. Proper like validate_user.

def validate_user(self, username, password, client, request, *args, **kwargs):

Signature of the method pictures exactly what needs to be done and lists all the required ingredients. We were connecting to LDAP so this method had to do everything required to validate the user and return bool depending on the result of validation.

Django

Having all the parts connected together the only thing left was to replace the validator class so our new one is used. After doing this all the requests coming in to our OAuth2 server had to conform to our rules which is provide login and password stored in LDAP. It took me longer than expected to grasp the concept and to design the solution. I have created few prototypes each of them with lesser number of CLOCs, until this simple solution came to my mind.

Still don’t fancy LDAP.

Categories
python

Consumer with cache

Micro. Services. They are popular, and it is a pretty useful pattern if applied correctly. Using such pattern forces one to think a bit more when designing a solution to a problem. Not that it is more difficult but rather it is different. One of the main differences is communication or maybe data flow.

Regular applications tend to talk mostly by internal calls, or callbacks, which makes communication simpler. You do not have to care if function you are calling is available. If you pass correct arguments you will get a response. It is not that simple with micro services which in fact are like third party applications. They can go down, can throttle your calls, they can response with a delay, etc.

My current project tries to mitigate this by using messaging pattern in vulnerable spots, where communication may be unstable (connecting to third party, like Facebook) or prone to delays/timeouts (database writes, connecting to third party). Database we are using, Elasticsearch, has a thread pool of workers and if under heavy load may throttle access. Our application may generate a lot of writes exhausting pool of ES workers. "Easy" way of increasing number of written documents is to write them in batches using bulk operation. As we are using AMQP protocol for communication with DB we are not able to process more than one message at time, as this is not supported by the protocol.

Solution to this is Aggregator pattern, from Enterprise Integration Patterns book. You will find full description of it if you buy a book πŸ™‚ This link however gives enough information to understand what it does.

The Aggregator is a special Filter that receives a stream of messages and identifies messages that are correlated. Once a complete set of messages has been received (more on how to decide when a set is ‘complete’ below), the Aggregator collects information from each correlated message and publishes a single, aggregated message to the output channel for further processing.

Aggregator will pull messages and when some condition is reached it will create a bulk write. The condition that triggers insert would be message count based(do we have enough messages) but it may be time based, it could be triggered by a special message or any other condition.

I usually try to find existing solution so I could integrate it or maybe modify a bit to our needs but this time there was nothing. It was time to write it myself with a plenty of elbow grease.

Below solution is not ideal as it does not package messages in same size bulks. It may be more or less then specified, but it will be a batch.

First goes queuing consumer code creating 5 consumers listening on default RabbitMQ address.

#!/usr/bin/env python
import queue

from kombu.mixins import ConsumerMixin
from kombu import Connection, Queue
connection = Connection('amqp://guest:guest@localhost:5672//')


q = queue.Queue()


class C(ConsumerMixin):
    def __init__(self, name, connection, q):
        self.name = name
        self.connection = connection
        self.q = q

    def get_consumers(self, consumer, channel):
        return [
            consumer(
                Queue('task_queue'),
                callbacks=[self.on_message],
                accept=['json']),
        ]

    def on_message(self, body, message):
        self.q.put(body)
        message.ack()

        if self.q.qsize() > 10:
            batch = []
            while True:
                item = self.q.get()
                if self.q.empty():
                    break

                print('%s : Compress: %s' % (self.name, item))
                batch.append(item)
                self.q.task_done()

            print('%s : Push batch: %s' % (self.name, batch))


from threading import Thread


threads = []
for i in range(5):
    w = C('worker %s' % i, connection, q)
    t = Thread(target=w.run)
    t.start()
    threads.append(t)

Here is test producer code that generates messages so one can see how consumers behave.

#!/usr/bin/env python

import pika
import sys

connection = pika.BlockingConnection(
    pika.ConnectionParameters(host='localhost'))
channel = connection.channel()

channel.queue_declare(queue='task_queue', durable=True)

message = ' '.join(sys.argv[1:]) or "Hello World!"
for x in range(40):
    channel.basic_publish(
        exchange='',
        routing_key='task_queue',
        body=message,
        properties=pika.BasicProperties(
            delivery_mode=2,  # make message persistent
        ))
    print(" [%s] Sent %r" % (x, message))
connection.close()

In order to test it pika needs to be installed, then simply run consumer in one terminal and trigger producer in other. All the calls to queue.Queue are non-blocking which is most probably reason of different batch sizes. I guess this attempt has to wait for next blog post.

Categories
python

Celery tasks, states and results

The subject of Celery task results comes back every now and then. It would make a really good post, with nice examples. So here we go!

If you don’t know what Celery is:

Celery is a simple, flexible, and reliable distributed system to process vast amounts of messages, while providing operations with the tools required to maintain such a system. It’s a task queue with focus on real-time processing, while also supporting task scheduling.

You can read more by going to their documentation.

States

Celery comes with a few states to start with. These states tell you what is happening to a task and are selected from this list most of the time (unless you have custom states, but I’ll cover this later):

  • PENDING
  • STARTED
  • SUCCESS
  • FAILURE
  • RETRY
  • REVOKED

Such defaults allow you to go pretty far with tracking your tasks. You can even deduct transitions based on current state of the tasks: a FAILURE state means that a task went through PENDING and STARTED states. Here’s an example of how that works. Let’s take a basic task from Celery’s own tutorial. I’m using RabbitMQ of this docker image as my broker.

from celery import Celery

app = Celery('task', broker='amqp://guest:guest@localhost:5672//')

# STARTED state is not enabled by default so we flip it on
app.conf.update(task_track_started=True)


@app.task(bind=True)
def add(self, x, y):
    # we need to sleep to show STARTED state
    import time
    time.sleep(10)
    return x + y

With that we can go through some of the states.

majki@snakepit ~/projects/blog/pow/celery-states
% pipenv run python
Python 3.4.8 (default, Mar 19 2018, 21:12:05)
[GCC 5.4.0 20160609] on linux
Type "help", "copyright", "credits" or "license" for more information.
>>> from task import add
>>> r = add.delay(2, 3)  # worker is disabled so we can have PENDING state
>>> r.status
'PENDING'  # worker is enabled after this
>>> r.status
'STARTED'
>>> r.status
'SUCCESS'
>>> r = add.delay(2, 'a')
>>> r.status
'STARTED'
>>> r.state
'FAILURE'

It is pretty useful and probably covers a lot of use cases, but there’s even more to discover about Celery’s states.

Custom states

You can also define your own states if you need to. Let’s modify our example a bit to include a new state called ‘GOING_TO_SLEEP’.

from celery import Celery

app = Celery('task', broker='amqp://guest:guest@localhost:5672//')

# STARTED state is not enabled by default so we flip it on
app.conf.update(task_track_started=True)


@app.task(bind=True)
def add(self, x, y):
    self.update_state(state='GOING_TO_SLEEP')
    import time
    time.sleep(10)
    return x + y

Now let’s see how this works.

majki@snakepit ~/projects/blog/pow/celery-states
% pipenv run python
Python 3.4.8 (default, Mar 19 2018, 21:12:05)
[GCC 5.4.0 20160609] on linux
Type "help", "copyright", "credits" or "license" for more information.
>>> from task import add
>>> r = add.delay(2, 3)
>>> r.status
GOING_TO_SLEEP'
>>> r.status
'SUCCESS'

You can go bananas with such a feature and be really pedantic on your tasks state flow. It may be beneficial for some really complex workflows – if you would like to monitor it, so do it.

Results storage

Now let’s tackle the other concern. How do you save a task result? All my examples up to this point were in REPL, so all results are gone as soon as you close it. Thankfully authors of Celery thought about it and provided us with such functionality. Celery supports multiple type of storages, making almost everyone happy. I’m not done with my task example, just needs a little of tweaking. To be honest, not much is needed to have task results persisted.

from celery import Celery

app = Celery('task', broker='amqp://guest:guest@localhost:5672//')
app.conf.update(task_track_started=True,
                result_backend='file:///var/celery/results')


@app.task(bind=True)
def add(self, x, y):
    import time
    time.sleep(10)
    return x + y

According to my configuration, all the task results will be saved under /var/celery/results directory. I have picked file-system backend as it is easiest to show.

majki@snakepit ~/projects/blog/pow/celery-states
% pipenv run python
Python 3.4.8 (default, Mar 19 2018, 21:12:05)
[GCC 5.4.0 20160609] on linux
Type "help", "copyright", "credits" or "license" for more information.
>>> from task import add
>>> add.delay(1,1)
<AsyncResult: be19e446-64a6-4fa9-b1a4-05feeb4fbec2>
>>> add.delay(1,1)
<AsyncResult: 1dc82626-9731-43ea-b4ea-6f237143dd42>
>>> add.delay(1,1)
<AsyncResult: badbe577-9314-4a92-91b8-80d071a5f8b5>
>>> add.delay(1,1)
<AsyncResult: c2da2e98-7a05-411f-acd5-7ead99c12c4e>

Now we can have a look at our results backend and see if results are stored. I had been keeping results of all my tasks while writing this post so there is a lot of files. The last four are the ones that you can see in the output of REPL above.

majki@snakepit ~/projects/blog
% ls -thor  # thanks firemark!
total 236K
drwxr-xr-x 6 majki 4,0K kwi 10 20:46 cryogen
drwxr-xr-x 3 majki 4,0K kwi 11 20:28 pow
-rw-rw-r-- 1 majki  120 kwi 11 21:10 celery-task-meta-f730b12b-25ee-46d2-a7d5-eabda0ab14ef
-rw-rw-r-- 1 majki  120 kwi 11 21:11 celery-task-meta-6c7a3fc1-882b-4ddd-be9e-bd1431099eae
-rw-rw-r-- 1 majki  120 kwi 11 21:11 celery-task-meta-f5a73a65-6bda-47b8-a04e-34d1e5b7b81d
-rw-rw-r-- 1 majki  120 kwi 11 21:12 celery-task-meta-c012b4a7-362f-4d05-ac65-9dc398ac514e
-rw-rw-r-- 1 majki  120 kwi 11 21:14 celery-task-meta-c0654b79-65b0-4365-9a45-6b3d248d559d
-rw-rw-r-- 1 majki  120 kwi 11 21:14 celery-task-meta-f2d51878-86db-4d21-bc36-c651e63bdda3
-rw-rw-r-- 1 majki  120 kwi 11 21:14 celery-task-meta-ff9071d1-5ac1-4f81-9da2-29b4f706f5d8
-rw-rw-r-- 1 majki  120 kwi 11 21:15 celery-task-meta-8691b6f0-fa8f-401b-9133-0ce8308e34c9
-rw-rw-r-- 1 majki  120 kwi 11 21:18 celery-task-meta-9d312364-ed4c-49f2-a6eb-7e3b6a9eeaef
-rw-rw-r-- 1 majki  120 kwi 11 21:19 celery-task-meta-31ae7d06-c292-48c8-b526-eebeb021bb90
-rw-rw-r-- 1 majki  120 kwi 11 21:22 celery-task-meta-39d6126d-57f9-4a2b-a09b-68475076ec08
-rw-rw-r-- 1 majki  120 kwi 11 21:39 celery-task-meta-264975a3-b8d0-4855-a19a-fc64e1384bfe
-rw-rw-r-- 1 majki  760 kwi 11 21:46 celery-task-meta-b7d429d2-9aaa-4af5-914a-aba1321676cb
-rw-rw-r-- 1 majki  120 kwi 11 21:57 celery-task-meta-fa4f6d6a-aeb2-416b-9efc-9e7947ef9550
-rw-rw-r-- 1 majki  120 kwi 11 22:03 celery-task-meta-f5774f4f-0d51-4861-af95-a1a8ff94d1b1
-rw-rw-r-- 1 majki  120 kwi 11 22:18 celery-task-meta-be19e446-64a6-4fa9-b1a4-05feeb4fbec2
-rw-rw-r-- 1 majki  120 kwi 11 22:26 celery-task-meta-1dc82626-9731-43ea-b4ea-6f237143dd42
-rw-rw-r-- 1 majki  120 kwi 11 22:26 celery-task-meta-badbe577-9314-4a92-91b8-80d071a5f8b5
-rw-rw-r-- 1 majki  120 kwi 11 22:26 celery-task-meta-c2da2e98-7a05-411f-acd5-7ead99c12c4e

majki@snakepit ~/projects/blog
% cat celery-task-meta-c2da2e98-7a05-411f-acd5-7ead99c12c4e| python -m json.tool
{
    "children": [],
    "result": 2,
    "status": "SUCCESS",
    "task_id": "c2da2e98-7a05-411f-acd5-7ead99c12c4e",
    "traceback": null
}

There you have it, all is stored "safely" (it’s my laptop πŸ™‚ ) and can be viewed if required.

Hopefully this puts all concerns aside regarding task states, task results, and sheds a bit of light on Celery’s extensive API.