Quite recently I watched a short course created by DeepLearning.AI titled “Serverless LLM apps with Amazon Bedrock“. It is a very good course for beginners, well crafted, with good examples, and clear explanations by Mike Chambers. He goes and creates a small app using AWS Lambda, AWS S3, and Amazon Bedrock that does text summarisation of recorded conversations. I wanted to add something to the topic and provide a fully working example you can deploy easily using AWS CDK.
Tag: python
I had trouble picking an appropriate title. The guidelines you are about to read are based on a true story, sometimes a horror story. I can’t believe how much I have changed in this past 2 years. It was changed for the better. I grew a lot, I grew with the system I built, run, and supported.
Logging is hard, and it’s most of the time an afterthought. Read this guideline and learn from my mistakes. You may probably apply these recommendations to any language.
This is a first part of a longer series. Making more real-world examples of asyncio. “Hello world” type of examples are often very misleading about the real effort it takes to use some tool. asyncio examples you may find on the web are guilty as well. Most of them focus on simple happy path scenarios. In this article, I’ll be writing about working with coroutines, handling errors, retrying them automatically, and tracking execution progress. The next articles will explore the usage of more high-level tools like asyncio.Task
and asyncio.TaskGroup
.
Async is not new in Python but I was not familiar with the concept. I have used it but without fully grasping the idea, and that smells disaster. This article and a whole journey I went through has been sparked by one question. Why you shouldn’t run blocking code on the event loop? The answer is simple as it will block the whole thing. I kinda knew that as everyone says it, so you can’t miss it really. But why is it? How does it work? If you would like to know read on.
Debugging is difficult, what’s even more difficult, debugging production apps. Live production apps.
There are tools designed for this purpose. Azure has Application Insights, product that makes retracing history of events easier. When setup correctly you may go from a http request down to a Db call with all the query arguments. Pretty useful and definitely more convenient than sifting through log messages.
Here you can see the exact query that had been executed on the database.
You may also see every log related to a particular request in Log Analytics.
Improving your work life like this is pretty simple. Everything here is done using opencensus
and it’s extensions. Opencensus
integrates with Azure pretty nicely. First thing to do is to install required dependencies.
# pip
pip install opencensus-ext-azure opencensus-ext-logging opencensus-ext-sqlalchemy opencensus-ext-requests
# pipenv
pipenv install opencensus-ext-azure opencensus-ext-logging opencensus-ext-sqlalchemy opencensus-ext-requests
# poetry
poetry add opencensus-ext-azure opencensus-ext-logging opencensus-ext-sqlalchemy opencensus-ext-requests
Next step is to activate them by including a couple of lines in your code. Here I activate 3 extensions, logging, requests, and sqlalchemy. Here is a list of other official extensions.
import logging
from opencensus.trace import config_integration
from opencensus.ext.azure.log_exporter import AzureLogHandler
logger = logging.getLogger(__name__)
config_integration.trace_integrations(["logging", "requests", "sqlalchemy"])
handler = AzureLogHandler(
connection_string="InstrumentationKey=YOUR_KEY"
)
handler.setFormatter(logging.Formatter("%(traceId)s %(spanId)s %(message)s"))
logger.addHandler(handler)
One last thing is a middleware that will instrument every request. This code is taken from Microsoft’s documentation.
@app.middleware("http")
async def middlewareOpencensus(request: Request, call_next):
tracer = Tracer(
exporter=AzureExporter(
connection_string="InstrumentationKey=YOUR_KEY"
),
sampler=ProbabilitySampler(1.0),
)
with tracer.span("main") as span:
span.span_kind = SpanKind.SERVER
response = await call_next(request)
tracer.add_attribute_to_current_span(
attribute_key=HTTP_STATUS_CODE, attribute_value=response.status_code
)
tracer.add_attribute_to_current_span(
attribute_key=HTTP_URL, attribute_value=str(request.url)
)
return response
You are done 🙂 you will not loose information on what is going on in the app. You will be quicker in finding problems and resolving them. Life’s good now.
When integrating with third party API’s you need to make sure that your requests reach the third party. In case of issues on their end you want to retry and best not to interrupt the flow of your application or even worse pass the information about such issues to the end user (like leaking 503 errors).
Most popular solution is to use a background task and there
are tools for helping with that: celery
, python-rq
,
or dramatiq
.
They do the job of executing the code in the background but they
require some extra infrastructure to make it work, plus all the
dependencies they are bringing in. I have used them all in the past with great
success but most recently decided to write a basic background task myself.
Why? As I mentioned earlier all of them require extra infrastructure in a form of a broker
that most of the time is redis
, this implies changes to deployment, requires additional resources,
makes the stack more complex.
The scope of what I had to do just did not justify bringing in this whole baggage.
I needed to retry calls to AWS Glue service in case we maxed out capacity. Since the Glue job we
are executing can take a couple minutes our calls to AWS Glue had to be pushed into the background.
I’ll give you the code and summarize what it does. By no means this code is perfect but it works 🙂
# background.py
import threading
from queue import Queue
task_queue = Queue()
worker_thread = None
def enqueue_task(task):
task_queue.put_nowait(task)
global worker_thread
if not worker_thread:
worker_thread = _run_worker_thread()
def _process_tasks(task_queue):
while task_queue.qsize():
task = task_queue.get()
try:
print(f"Do stuff with task: {task}")
except Exception as e:
task_queue.put(task)
global worker_thread
worker_thread = None
def _run_worker_thread():
t = threading.Thread(target=_process_tasks, args=(task_queue,))
t.start()
return t
Public interface of this small background
module is one function enqueue_task
.
When called task is put on the queue and thread is started. Each subsequent call
will enqueue task and thread will be closed after it processed all of them.
I find this simple and flexible enough to handle communication with flaky services or services with usage caps. Since this can not be scaled it has limited usage, but HTTP calls are just fine. This code had been inspired by one of the talks of Raymond Hettinger regarding concurrency and queue module.
There is one thing that has bothered me for a couple of months. It felt wrong when I saw it in the codebase but I could not tell why it is wrong. It was just a hunch that something is not right, but not enough to make me look for a reason.
For last couple of days I have been struggling to sort out my bot configuration on Azure and decided I need a break from that. Python being something I know best is a good candidate to feel comfortable and in control again.
I have decided to finally answer the question that was buggin me. Why using f-strings in logger calls makes me uneasy? Why this feels wrong?
hero = "Mumen Rider"
logger.error(f"Class C, rank 1: {hero}")
f-strings
Most of the pythonistas would know by now what f-strings are. They are convenient way of constructing strings. Values can be included directly in the string what makes the string much more readable. Here is an example from Python 3’s f-Strings: An Improved String Formatting Syntax (Guide), which is worth at least skimming through if you know f-strings.
>>> name = "Eric"
>>> age = 74
>>> f"Hello, {name}. You are {age}."
'Hello, Eric. You are 74'
They have benefits and my team have been using them since. It’s fine as they are awesome however I feel that they should not be used when we talk about logging
.
logging
I’m not talking about poor man’s logging which is print
. This is an example of logging in Python
logger.info("This is an example of a log message, and a value of %s", 42)
When the code includes such line and when it is executed it outputs string according to log configuration. Of course your log level needs to match but I’m skipping this as it is not relevant here, I’ll get back to this later.
The %s
identifier in log messages means that anything passed into logger.info
will replace the identifier. So the message will look like this.
INFO:MyLogger:This is an example of a log message, and a value of 42
logging + f-strings
Since logging accept strings and f-strings are so nice they could be used together. Yes, it is possible of course but I’d not use f-strings for such purpose. Best to illustrate why is an example followed with explanation.
import logging
logging.basicConfig(level=logging.ERROR)
logger = logging.getLogger('klich.dev')
class MyClass:
def __str__(self):
print('Calling __str__')
return "Hiya"
c = MyClass()
print("F style")
logger.debug(f'{c}')
logger.info(f'{c}')
logger.warning(f'{c}')
logger.error(f'{c}')
print()
print("Regular style")
logger.debug('%s', c)
logger.info('%s', c)
logger.warning('%s', c)
logger.error('%s', c)
This short example creates logger and sets logging level to ERROR. This means that only calls of logger.error
will produce output. __str__
method of object used
in log messages prints information when it is called. So each level matching logger
call will print Calling __str__
message and Hiya
. Since there are two logger.error
calls we should get four lines total. This is what actually is printed out.
% python3 logg.py
F style
Calling __str__
Calling __str__
Calling __str__
Calling __str__
ERROR:klich.dev:Hiya
Regular style
Calling __str__
ERROR:klich.dev:Hiya
We can see that logger lines using f-strings are calling __str__
even if the log message is not printed out. This is not a big penalty but it may compound to something significant if you have many log calls with f-strings.
what is going on
According to documentation on logging
Formatting of message arguments is deferred until it cannot be avoided.
Logger is smart enough to actually not format messages if it is not needed.
It will refrain from calling __str__
until it is required, when it is passed to std out or to a file, or other with options supported by logger.
To dig a little bit more we can use dis module from python standard library. After feeding our code to dis.dis
method we will get a list of operations that happend under the hood. For detailed explanation of what exact operations do have a look at ceval.c
from Python’s sources.
>>> import logging
>>> logger = logging.getLogger()
>>> def f1():
logger.info("This is an example of a log message, and a value of %s", 42)
>>> def f2():
logger.info(f"This is an example of a log message, and a value of {42}")
>>> import dis
>>> dis.dis(f1)
0 LOAD_GLOBAL 0 (logger)
2 LOAD_METHOD 1 (info)
4 LOAD_CONST 1 ('This is an example of a log message, and a value of %s')
6 LOAD_CONST 2 (42)
8 CALL_METHOD 2
10 POP_TOP
12 LOAD_CONST 0 (None)
14 RETURN_VALUE
>>> dis.dis(f2)
0 LOAD_GLOBAL 0 (logger)
2 LOAD_METHOD 1 (info)
4 LOAD_CONST 1 ('This is an example of a log message, and a value of ')
6 LOAD_CONST 2 (42)
8 FORMAT_VALUE 0
10 BUILD_STRING 2
12 CALL_METHOD 1
14 POP_TOP
16 LOAD_CONST 0 (None)
18 RETURN_VALUE
In this case we won’t get into much details, it is enough to see that f-strings add two additional operations of FORMAT_VALUE
(Handles f-string value formatting.) and BUILD_STRING
.
After this small research I can explain why we should not be using f-strings in this specific place which is logging. I also can put my uneasiness to rest.
Redis cache
I enjoy listening to podcasts, as they sometimes give me inspiration to create something. In one of the episodes of Python Bytes podcast guys mentioned https://github.com/bwasti/cache.py tool. cache.py
allows to cache function calls across runs by using cache file. Simple and really useful.
This inspired me to write a similar thing but for distributed apps, based on redis as a cache storage. I called it rcache
and you can find it on PyPI.
In order to use it simply decorate a function like this:
import rcache
@rcache.rcache()
def expensive_func(arg, kwarg=None):
# Expensive stuff here
return arg
Default redis address is http://localhost:6379
, but you can change it by passing
an url into the decorator @rcache.cache(url="http://your_redis:6379")
.
I hope you find it useful and if you wish to comment or report something please go to https://gitlab.com/the_speedball/redis.cache.py/issues.
Have fun
Collaboration of OAuth and LDAP
The goal
Almost everyone knows OAuth, it is widely used online and has a good reputation. In company where I work we have decided to integrate OAuth2 into our platform (based on microservice architecture, I’ll be saying platform from now on). The trick was that it had to be integrated with existing LDAP server storing user data. User requesting the token had to input his LDAP username and password in order to receive the token. Looking for existing solutions was fruitless and new code had to be written. At that point I didn’t know how this could be achieved. In addition to LDAP I had to use Django as a base of this for providing REST API endpoints. The limitations were clear at this point, risks were unknown. This is not an unusual thing when working as a programmer.
The stack
As I mentioned earlier Django was chosen as the framework. Having decided on the framework it narrowed down number of libraries to use. Fortunately there was already an library adding OAuth to the Django, Django OAuth Toolkit. DOT integrates nicely with Django REST framework and allows you to write your plugins for validating token requests. It supports much more but those two features were the main selling points. Talking to LDAP required a library to do the heavy lifting. There is not much choice here to be honest and I stuck with python-ldap. It is different from what you would expect in terms of python library. Messages are not very useful, docs not very clear but it works and is reliable.
The solution
At the beginning the task seemed really difficult to me. I have only played with OAuth without understanding how it works. It was similar with LDAP. After diving into details it stopped looking that hairy as it turned out I only had to plug into process of authorising the user request. Putting this simply the process of issuing the token would not be started until the user provides credentials that are valid to LDAP.
Django OAuth Toolkit
DOT (Django OAuth Toolkit) is pretty flexible, it provides a setting OAUTH2_VALIDATOR_CLASS
where you can define your own validator. This allows to control each step of OAuth2 process.
Fortunately I was only concerned with user validation. In order to achieve it I had
to write my own validator. Easiest way was to read the default class which has been
provided by DOT, namely oauth2_provider.oauth2_validators.OAuth2Validator
.
It is nicely written, each step has it’s own method that can be replaced. Just had to find
a proper one. Proper like validate_user
.
def validate_user(self, username, password, client, request, *args, **kwargs):
Signature of the method pictures exactly what needs to be done and lists
all the required ingredients. We were connecting to LDAP so this method had to do everything required to validate the user and return bool
depending on the result of validation.
Django
Having all the parts connected together the only thing left was to replace the validator class so our new one is used. After doing this all the requests coming in to our OAuth2 server had to conform to our rules which is provide login and password stored in LDAP. It took me longer than expected to grasp the concept and to design the solution. I have created few prototypes each of them with lesser number of CLOCs, until this simple solution came to my mind.
Still don’t fancy LDAP.
Consumer with cache
Micro. Services. They are popular, and it is a pretty useful pattern if applied correctly. Using such pattern forces one to think a bit more when designing a solution to a problem. Not that it is more difficult but rather it is different. One of the main differences is communication or maybe data flow.
Regular applications tend to talk mostly by internal calls, or callbacks, which makes communication simpler. You do not have to care if function you are calling is available. If you pass correct arguments you will get a response. It is not that simple with micro services which in fact are like third party applications. They can go down, can throttle your calls, they can response with a delay, etc.
My current project tries to mitigate this by using messaging pattern in vulnerable spots, where communication may be unstable (connecting to third party, like Facebook) or prone to delays/timeouts (database writes, connecting to third party). Database we are using, Elasticsearch, has a thread pool of workers and if under heavy load may throttle access. Our application may generate a lot of writes exhausting pool of ES workers. "Easy" way of increasing number of written documents is to write them in batches using bulk operation. As we are using AMQP protocol for communication with DB we are not able to process more than one message at time, as this is not supported by the protocol.
Solution to this is Aggregator pattern, from Enterprise Integration Patterns book. You will find full description of it if you buy a book 🙂 This link however gives enough information to understand what it does.
The Aggregator is a special Filter that receives a stream of messages and identifies messages that are correlated. Once a complete set of messages has been received (more on how to decide when a set is ‘complete’ below), the Aggregator collects information from each correlated message and publishes a single, aggregated message to the output channel for further processing.
Aggregator will pull messages and when some condition is reached it will create a bulk write. The condition that triggers insert would be message count based(do we have enough messages) but it may be time based, it could be triggered by a special message or any other condition.
I usually try to find existing solution so I could integrate it or maybe modify a bit to our needs but this time there was nothing. It was time to write it myself with a plenty of elbow grease.
Below solution is not ideal as it does not package messages in same size bulks. It may be more or less then specified, but it will be a batch.
First goes queuing consumer code creating 5 consumers listening on default RabbitMQ
address.
#!/usr/bin/env python
import queue
from kombu.mixins import ConsumerMixin
from kombu import Connection, Queue
connection = Connection('amqp://guest:guest@localhost:5672//')
q = queue.Queue()
class C(ConsumerMixin):
def __init__(self, name, connection, q):
self.name = name
self.connection = connection
self.q = q
def get_consumers(self, consumer, channel):
return [
consumer(
Queue('task_queue'),
callbacks=[self.on_message],
accept=['json']),
]
def on_message(self, body, message):
self.q.put(body)
message.ack()
if self.q.qsize() > 10:
batch = []
while True:
item = self.q.get()
if self.q.empty():
break
print('%s : Compress: %s' % (self.name, item))
batch.append(item)
self.q.task_done()
print('%s : Push batch: %s' % (self.name, batch))
from threading import Thread
threads = []
for i in range(5):
w = C('worker %s' % i, connection, q)
t = Thread(target=w.run)
t.start()
threads.append(t)
Here is test producer code that generates messages so one can see how consumers behave.
#!/usr/bin/env python
import pika
import sys
connection = pika.BlockingConnection(
pika.ConnectionParameters(host='localhost'))
channel = connection.channel()
channel.queue_declare(queue='task_queue', durable=True)
message = ' '.join(sys.argv[1:]) or "Hello World!"
for x in range(40):
channel.basic_publish(
exchange='',
routing_key='task_queue',
body=message,
properties=pika.BasicProperties(
delivery_mode=2, # make message persistent
))
print(" [%s] Sent %r" % (x, message))
connection.close()
In order to test it pika
needs to be installed, then simply run consumer in one terminal and trigger producer in other. All the calls to queue.Queue
are non-blocking which is most probably reason of different batch sizes. I guess this attempt has to wait for next blog post.