Categories
aws cdk

Serverless LLM on AWS

Quite recently I watched a short course created by DeepLearning.AI titled “Serverless LLM apps with Amazon Bedrock“. It is a very good course for beginners, well crafted, with good examples, and clear explanations by Mike Chambers. He goes and creates a small app using AWS Lambda, AWS S3, and Amazon Bedrock that does text summarisation of recorded conversations. I wanted to add something to the topic and provide a fully working example you can deploy easily using AWS CDK.

Architecture diagram
Categories
logging microservices

Effective logging strategies after running the production for 2 years

I had trouble picking an appropriate title. The guidelines you are about to read are based on a true story, sometimes a horror story. I can’t believe how much I have changed in this past 2 years. It was changed for the better. I grew a lot, I grew with the system I built, run, and supported.

Logging is hard, and it’s most of the time an afterthought. Read this guideline and learn from my mistakes. You may probably apply these recommendations to any language.

Categories
async python

asyncio: real world coroutines

This is a first part of a longer series. Making more real-world examples of asyncio. “Hello world” type of examples are often very misleading about the real effort it takes to use some tool. asyncio examples you may find on the web are guilty as well. Most of them focus on simple happy path scenarios. In this article, I’ll be writing about working with coroutines, handling errors, retrying them automatically, and tracking execution progress. The next articles will explore the usage of more high-level tools like asyncio.Task and asyncio.TaskGroup.

Categories
async python

asyncio’s event loop

Async is not new in Python but I was not familiar with the concept. I have used it but without fully grasping the idea, and that smells disaster. This article and a whole journey I went through has been sparked by one question. Why you shouldn’t run blocking code on the event loop? The answer is simple as it will block the whole thing. I kinda knew that as everyone says it, so you can’t miss it really. But why is it? How does it work? If you would like to know read on.

Categories
azure python

Azure Application Insights, FastAPI, and tracing

Debugging is difficult, what’s even more difficult, debugging production apps. Live production apps.

There are tools designed for this purpose. Azure has Application Insights, product that makes retracing history of events easier. When setup correctly you may go from a http request down to a Db call with all the query arguments. Pretty useful and definitely more convenient than sifting through log messages.

Application Insights console with end-to-end transaction details

Here you can see the exact query that had been executed on the database.

Application Insights console with SQL query details

You may also see every log related to a particular request in Log Analytics.

Application Insights logs with a query displaying only relevant logs

Improving your work life like this is pretty simple. Everything here is done using opencensus and it’s extensions. Opencensus integrates with Azure pretty nicely. First thing to do is to install required dependencies.

# pip 
pip install opencensus-ext-azure opencensus-ext-logging opencensus-ext-sqlalchemy opencensus-ext-requests

# pipenv

pipenv install opencensus-ext-azure opencensus-ext-logging opencensus-ext-sqlalchemy opencensus-ext-requests

# poetry

poetry add opencensus-ext-azure opencensus-ext-logging opencensus-ext-sqlalchemy opencensus-ext-requests

Next step is to activate them by including a couple of lines in your code. Here I activate 3 extensions, logging, requests, and sqlalchemy. Here is a list of other official extensions.

import logging

from opencensus.trace import config_integration
from opencensus.ext.azure.log_exporter import AzureLogHandler

logger = logging.getLogger(__name__)
config_integration.trace_integrations(["logging", "requests", "sqlalchemy"])

handler = AzureLogHandler(
    connection_string="InstrumentationKey=YOUR_KEY"
)
handler.setFormatter(logging.Formatter("%(traceId)s %(spanId)s %(message)s"))
logger.addHandler(handler)

One last thing is a middleware that will instrument every request. This code is taken from Microsoft’s documentation.

@app.middleware("http")
async def middlewareOpencensus(request: Request, call_next):
    tracer = Tracer(
        exporter=AzureExporter(
            connection_string="InstrumentationKey=YOUR_KEY"
        ),
        sampler=ProbabilitySampler(1.0),
    )
    with tracer.span("main") as span:
        span.span_kind = SpanKind.SERVER

        response = await call_next(request)

        tracer.add_attribute_to_current_span(
            attribute_key=HTTP_STATUS_CODE, attribute_value=response.status_code
        )
        tracer.add_attribute_to_current_span(
            attribute_key=HTTP_URL, attribute_value=str(request.url)
        )

    return response

You are done 🙂 you will not loose information on what is going on in the app. You will be quicker in finding problems and resolving them. Life’s good now.

Categories
async python

Background tasks on the cheap

When integrating with third party API’s you need to make sure that your requests reach the third party. In case of issues on their end you want to retry and best not to interrupt the flow of your application or even worse pass the information about such issues to the end user (like leaking 503 errors).

Most popular solution is to use a background task and there are tools for helping with that: celery, python-rq, or dramatiq. They do the job of executing the code in the background but they require some extra infrastructure to make it work, plus all the dependencies they are bringing in. I have used them all in the past with great success but most recently decided to write a basic background task myself.

Why? As I mentioned earlier all of them require extra infrastructure in a form of a broker that most of the time is redis, this implies changes to deployment, requires additional resources, makes the stack more complex. The scope of what I had to do just did not justify bringing in this whole baggage. I needed to retry calls to AWS Glue service in case we maxed out capacity. Since the Glue job we are executing can take a couple minutes our calls to AWS Glue had to be pushed into the background. I’ll give you the code and summarize what it does. By no means this code is perfect but it works 🙂

# background.py
import threading
from queue import Queue

task_queue = Queue()
worker_thread = None


def enqueue_task(task):
    task_queue.put_nowait(task)

    global worker_thread
    if not worker_thread:
        worker_thread = _run_worker_thread()


def _process_tasks(task_queue):
    while task_queue.qsize():
        task = task_queue.get()
        try:
            print(f"Do stuff with task: {task}")
        except Exception as e:
            task_queue.put(task)

    global worker_thread
    worker_thread = None


def _run_worker_thread():
    t = threading.Thread(target=_process_tasks, args=(task_queue,))
    t.start()
    return t

Public interface of this small background module is one function enqueue_task. When called task is put on the queue and thread is started. Each subsequent call will enqueue task and thread will be closed after it processed all of them.

I find this simple and flexible enough to handle communication with flaky services or services with usage caps. Since this can not be scaled it has limited usage, but HTTP calls are just fine. This code had been inspired by one of the talks of Raymond Hettinger regarding concurrency and queue module.

Categories
f-strings logging python

f-strings and python logging

There is one thing that has bothered me for a couple of months. It felt wrong when I saw it in the codebase but I could not tell why it is wrong. It was just a hunch that something is not right, but not enough to make me look for a reason.

For last couple of days I have been struggling to sort out my bot configuration on Azure and decided I need a break from that. Python being something I know best is a good candidate to feel comfortable and in control again.

I have decided to finally answer the question that was buggin me. Why using f-strings in logger calls makes me uneasy? Why this feels wrong?

hero = "Mumen Rider"
logger.error(f"Class C, rank 1: {hero}")

f-strings

Most of the pythonistas would know by now what f-strings are. They are convenient way of constructing strings. Values can be included directly in the string what makes the string much more readable. Here is an example from Python 3’s f-Strings: An Improved String Formatting Syntax (Guide), which is worth at least skimming through if you know f-strings.

>>> name = "Eric"
>>> age = 74
>>> f"Hello, {name}. You are {age}."
'Hello, Eric. You are 74'

They have benefits and my team have been using them since. It’s fine as they are awesome however I feel that they should not be used when we talk about logging.

logging

I’m not talking about poor man’s logging which is print. This is an example of logging in Python

logger.info("This is an example of a log message, and a value of %s", 42)

When the code includes such line and when it is executed it outputs string according to log configuration. Of course your log level needs to match but I’m skipping this as it is not relevant here, I’ll get back to this later. The %s identifier in log messages means that anything passed into logger.info will replace the identifier. So the message will look like this.

INFO:MyLogger:This is an example of a log message, and a value of 42

logging + f-strings

Since logging accept strings and f-strings are so nice they could be used together. Yes, it is possible of course but I’d not use f-strings for such purpose. Best to illustrate why is an example followed with explanation.

    import logging
    logging.basicConfig(level=logging.ERROR)
    logger = logging.getLogger('klich.dev')

    class MyClass:
        def __str__(self):
            print('Calling __str__')
            return "Hiya"

    c = MyClass()
    print("F style")
    logger.debug(f'{c}')
    logger.info(f'{c}')
    logger.warning(f'{c}')
    logger.error(f'{c}')

    print()
    print("Regular style")
    logger.debug('%s', c)
    logger.info('%s', c)
    logger.warning('%s', c)
    logger.error('%s', c)

This short example creates logger and sets logging level to ERROR. This means that only calls of logger.error will produce output. __str__ method of object used in log messages prints information when it is called. So each level matching logger call will print Calling __str__ message and Hiya. Since there are two logger.error calls we should get four lines total. This is what actually is printed out.

    % python3 logg.py
    F style
    Calling __str__
    Calling __str__
    Calling __str__
    Calling __str__
    ERROR:klich.dev:Hiya

    Regular style
    Calling __str__
    ERROR:klich.dev:Hiya

We can see that logger lines using f-strings are calling __str__ even if the log message is not printed out. This is not a big penalty but it may compound to something significant if you have many log calls with f-strings.

what is going on

According to documentation on logging

Formatting of message arguments is deferred until it cannot be avoided.

Logger is smart enough to actually not format messages if it is not needed. It will refrain from calling __str__ until it is required, when it is passed to std out or to a file, or other with options supported by logger.

To dig a little bit more we can use dis module from python standard library. After feeding our code to dis.dis method we will get a list of operations that happend under the hood. For detailed explanation of what exact operations do have a look at ceval.c from Python’s sources.

    >>> import logging
    >>> logger = logging.getLogger()
    >>> def f1():
            logger.info("This is an example of a log message, and a value of %s", 42)

    >>> def f2():
            logger.info(f"This is an example of a log message, and a value of {42}")

    >>> import dis
    >>> dis.dis(f1)
                0 LOAD_GLOBAL              0 (logger)
                2 LOAD_METHOD              1 (info)
                4 LOAD_CONST               1 ('This is an example of a log message, and a value of %s')
                6 LOAD_CONST               2 (42)
                8 CALL_METHOD              2
                10 POP_TOP
                12 LOAD_CONST               0 (None)
                14 RETURN_VALUE
    >>> dis.dis(f2)
                0 LOAD_GLOBAL              0 (logger)
                2 LOAD_METHOD              1 (info)
                4 LOAD_CONST               1 ('This is an example of a log message, and a value of ')
                6 LOAD_CONST               2 (42)
                8 FORMAT_VALUE             0
                10 BUILD_STRING             2
                12 CALL_METHOD              1
                14 POP_TOP
                16 LOAD_CONST               0 (None)
                18 RETURN_VALUE

In this case we won’t get into much details, it is enough to see that f-strings add two additional operations of FORMAT_VALUE (Handles f-string value formatting.) and BUILD_STRING.

After this small research I can explain why we should not be using f-strings in this specific place which is logging. I also can put my uneasiness to rest.

Categories
python

Redis cache

I enjoy listening to podcasts, as they sometimes give me inspiration to create something. In one of the episodes of Python Bytes podcast guys mentioned https://github.com/bwasti/cache.py tool. cache.py allows to cache function calls across runs by using cache file. Simple and really useful.

This inspired me to write a similar thing but for distributed apps, based on redis as a cache storage. I called it rcache and you can find it on PyPI.

In order to use it simply decorate a function like this:

import rcache

@rcache.rcache()
def expensive_func(arg, kwarg=None):
  # Expensive stuff here
  return arg

Default redis address is http://localhost:6379, but you can change it by passing an url into the decorator @rcache.cache(url="http://your_redis:6379").

I hope you find it useful and if you wish to comment or report something please go to https://gitlab.com/the_speedball/redis.cache.py/issues.

Have fun

Categories
python

Collaboration of OAuth and LDAP

The goal

Almost everyone knows OAuth, it is widely used online and has a good reputation. In company where I work we have decided to integrate OAuth2 into our platform (based on microservice architecture, I’ll be saying platform from now on). The trick was that it had to be integrated with existing LDAP server storing user data. User requesting the token had to input his LDAP username and password in order to receive the token. Looking for existing solutions was fruitless and new code had to be written. At that point I didn’t know how this could be achieved. In addition to LDAP I had to use Django as a base of this for providing REST API endpoints. The limitations were clear at this point, risks were unknown. This is not an unusual thing when working as a programmer.

The stack

As I mentioned earlier Django was chosen as the framework. Having decided on the framework it narrowed down number of libraries to use. Fortunately there was already an library adding OAuth to the Django, Django OAuth Toolkit. DOT integrates nicely with Django REST framework and allows you to write your plugins for validating token requests. It supports much more but those two features were the main selling points. Talking to LDAP required a library to do the heavy lifting. There is not much choice here to be honest and I stuck with python-ldap. It is different from what you would expect in terms of python library. Messages are not very useful, docs not very clear but it works and is reliable.

The solution

At the beginning the task seemed really difficult to me. I have only played with OAuth without understanding how it works. It was similar with LDAP. After diving into details it stopped looking that hairy as it turned out I only had to plug into process of authorising the user request. Putting this simply the process of issuing the token would not be started until the user provides credentials that are valid to LDAP.

Django OAuth Toolkit

DOT (Django OAuth Toolkit) is pretty flexible, it provides a setting OAUTH2_VALIDATOR_CLASS where you can define your own validator. This allows to control each step of OAuth2 process. Fortunately I was only concerned with user validation. In order to achieve it I had to write my own validator. Easiest way was to read the default class which has been provided by DOT, namely oauth2_provider.oauth2_validators.OAuth2Validator. It is nicely written, each step has it’s own method that can be replaced. Just had to find a proper one. Proper like validate_user.

def validate_user(self, username, password, client, request, *args, **kwargs):

Signature of the method pictures exactly what needs to be done and lists all the required ingredients. We were connecting to LDAP so this method had to do everything required to validate the user and return bool depending on the result of validation.

Django

Having all the parts connected together the only thing left was to replace the validator class so our new one is used. After doing this all the requests coming in to our OAuth2 server had to conform to our rules which is provide login and password stored in LDAP. It took me longer than expected to grasp the concept and to design the solution. I have created few prototypes each of them with lesser number of CLOCs, until this simple solution came to my mind.

Still don’t fancy LDAP.

Categories
python

Consumer with cache

Micro. Services. They are popular, and it is a pretty useful pattern if applied correctly. Using such pattern forces one to think a bit more when designing a solution to a problem. Not that it is more difficult but rather it is different. One of the main differences is communication or maybe data flow.

Regular applications tend to talk mostly by internal calls, or callbacks, which makes communication simpler. You do not have to care if function you are calling is available. If you pass correct arguments you will get a response. It is not that simple with micro services which in fact are like third party applications. They can go down, can throttle your calls, they can response with a delay, etc.

My current project tries to mitigate this by using messaging pattern in vulnerable spots, where communication may be unstable (connecting to third party, like Facebook) or prone to delays/timeouts (database writes, connecting to third party). Database we are using, Elasticsearch, has a thread pool of workers and if under heavy load may throttle access. Our application may generate a lot of writes exhausting pool of ES workers. "Easy" way of increasing number of written documents is to write them in batches using bulk operation. As we are using AMQP protocol for communication with DB we are not able to process more than one message at time, as this is not supported by the protocol.

Solution to this is Aggregator pattern, from Enterprise Integration Patterns book. You will find full description of it if you buy a book 🙂 This link however gives enough information to understand what it does.

The Aggregator is a special Filter that receives a stream of messages and identifies messages that are correlated. Once a complete set of messages has been received (more on how to decide when a set is ‘complete’ below), the Aggregator collects information from each correlated message and publishes a single, aggregated message to the output channel for further processing.

Aggregator will pull messages and when some condition is reached it will create a bulk write. The condition that triggers insert would be message count based(do we have enough messages) but it may be time based, it could be triggered by a special message or any other condition.

I usually try to find existing solution so I could integrate it or maybe modify a bit to our needs but this time there was nothing. It was time to write it myself with a plenty of elbow grease.

Below solution is not ideal as it does not package messages in same size bulks. It may be more or less then specified, but it will be a batch.

First goes queuing consumer code creating 5 consumers listening on default RabbitMQ address.

#!/usr/bin/env python
import queue

from kombu.mixins import ConsumerMixin
from kombu import Connection, Queue
connection = Connection('amqp://guest:guest@localhost:5672//')


q = queue.Queue()


class C(ConsumerMixin):
    def __init__(self, name, connection, q):
        self.name = name
        self.connection = connection
        self.q = q

    def get_consumers(self, consumer, channel):
        return [
            consumer(
                Queue('task_queue'),
                callbacks=[self.on_message],
                accept=['json']),
        ]

    def on_message(self, body, message):
        self.q.put(body)
        message.ack()

        if self.q.qsize() > 10:
            batch = []
            while True:
                item = self.q.get()
                if self.q.empty():
                    break

                print('%s : Compress: %s' % (self.name, item))
                batch.append(item)
                self.q.task_done()

            print('%s : Push batch: %s' % (self.name, batch))


from threading import Thread


threads = []
for i in range(5):
    w = C('worker %s' % i, connection, q)
    t = Thread(target=w.run)
    t.start()
    threads.append(t)

Here is test producer code that generates messages so one can see how consumers behave.

#!/usr/bin/env python

import pika
import sys

connection = pika.BlockingConnection(
    pika.ConnectionParameters(host='localhost'))
channel = connection.channel()

channel.queue_declare(queue='task_queue', durable=True)

message = ' '.join(sys.argv[1:]) or "Hello World!"
for x in range(40):
    channel.basic_publish(
        exchange='',
        routing_key='task_queue',
        body=message,
        properties=pika.BasicProperties(
            delivery_mode=2,  # make message persistent
        ))
    print(" [%s] Sent %r" % (x, message))
connection.close()

In order to test it pika needs to be installed, then simply run consumer in one terminal and trigger producer in other. All the calls to queue.Queue are non-blocking which is most probably reason of different batch sizes. I guess this attempt has to wait for next blog post.