Categories
python

Consumer with cache

Micro. Services. They are popular, and it is a pretty useful pattern if applied correctly. Using such pattern forces one to think a bit more when designing a solution to a problem. Not that it is more difficult but rather it is different. One of the main differences is communication or maybe data flow.

Regular applications tend to talk mostly by internal calls, or callbacks, which makes communication simpler. You do not have to care if function you are calling is available. If you pass correct arguments you will get a response. It is not that simple with micro services which in fact are like third party applications. They can go down, can throttle your calls, they can response with a delay, etc.

My current project tries to mitigate this by using messaging pattern in vulnerable spots, where communication may be unstable (connecting to third party, like Facebook) or prone to delays/timeouts (database writes, connecting to third party). Database we are using, Elasticsearch, has a thread pool of workers and if under heavy load may throttle access. Our application may generate a lot of writes exhausting pool of ES workers. "Easy" way of increasing number of written documents is to write them in batches using bulk operation. As we are using AMQP protocol for communication with DB we are not able to process more than one message at time, as this is not supported by the protocol.

Solution to this is Aggregator pattern, from Enterprise Integration Patterns book. You will find full description of it if you buy a book 🙂 This link however gives enough information to understand what it does.

The Aggregator is a special Filter that receives a stream of messages and identifies messages that are correlated. Once a complete set of messages has been received (more on how to decide when a set is ‘complete’ below), the Aggregator collects information from each correlated message and publishes a single, aggregated message to the output channel for further processing.

Aggregator will pull messages and when some condition is reached it will create a bulk write. The condition that triggers insert would be message count based(do we have enough messages) but it may be time based, it could be triggered by a special message or any other condition.

I usually try to find existing solution so I could integrate it or maybe modify a bit to our needs but this time there was nothing. It was time to write it myself with a plenty of elbow grease.

Below solution is not ideal as it does not package messages in same size bulks. It may be more or less then specified, but it will be a batch.

First goes queuing consumer code creating 5 consumers listening on default RabbitMQ address.

#!/usr/bin/env python
import queue

from kombu.mixins import ConsumerMixin
from kombu import Connection, Queue
connection = Connection('amqp://guest:guest@localhost:5672//')


q = queue.Queue()


class C(ConsumerMixin):
    def __init__(self, name, connection, q):
        self.name = name
        self.connection = connection
        self.q = q

    def get_consumers(self, consumer, channel):
        return [
            consumer(
                Queue('task_queue'),
                callbacks=[self.on_message],
                accept=['json']),
        ]

    def on_message(self, body, message):
        self.q.put(body)
        message.ack()

        if self.q.qsize() > 10:
            batch = []
            while True:
                item = self.q.get()
                if self.q.empty():
                    break

                print('%s : Compress: %s' % (self.name, item))
                batch.append(item)
                self.q.task_done()

            print('%s : Push batch: %s' % (self.name, batch))


from threading import Thread


threads = []
for i in range(5):
    w = C('worker %s' % i, connection, q)
    t = Thread(target=w.run)
    t.start()
    threads.append(t)

Here is test producer code that generates messages so one can see how consumers behave.

#!/usr/bin/env python

import pika
import sys

connection = pika.BlockingConnection(
    pika.ConnectionParameters(host='localhost'))
channel = connection.channel()

channel.queue_declare(queue='task_queue', durable=True)

message = ' '.join(sys.argv[1:]) or "Hello World!"
for x in range(40):
    channel.basic_publish(
        exchange='',
        routing_key='task_queue',
        body=message,
        properties=pika.BasicProperties(
            delivery_mode=2,  # make message persistent
        ))
    print(" [%s] Sent %r" % (x, message))
connection.close()

In order to test it pika needs to be installed, then simply run consumer in one terminal and trigger producer in other. All the calls to queue.Queue are non-blocking which is most probably reason of different batch sizes. I guess this attempt has to wait for next blog post.