Categories
aws cdk

Serverless LLM on AWS

Quite recently I watched a short course created by DeepLearning.AI titled “Serverless LLM apps with Amazon Bedrock“. It is a very good course for beginners, well crafted, with good examples, and clear explanations by Mike Chambers. He goes and creates a small app using AWS Lambda, AWS S3, and Amazon Bedrock that does text summarisation of recorded conversations. I wanted to add something to the topic and provide a fully working example you can deploy easily using AWS CDK.

Architecture diagram
Categories
logging microservices

Effective logging strategies after running the production for 2 years

I had trouble picking an appropriate title. The guidelines you are about to read are based on a true story, sometimes a horror story. I can’t believe how much I have changed in this past 2 years. It was changed for the better. I grew a lot, I grew with the system I built, run, and supported.

Logging is hard, and it’s most of the time an afterthought. Read this guideline and learn from my mistakes. You may probably apply these recommendations to any language.

Categories
async python

asyncio: real world coroutines

This is a first part of a longer series. Making more real-world examples of asyncio. “Hello world” type of examples are often very misleading about the real effort it takes to use some tool. asyncio examples you may find on the web are guilty as well. Most of them focus on simple happy path scenarios. In this article, I’ll be writing about working with coroutines, handling errors, retrying them automatically, and tracking execution progress. The next articles will explore the usage of more high-level tools like asyncio.Task and asyncio.TaskGroup.

Categories
async python

asyncio’s event loop

Async is not new in Python but I was not familiar with the concept. I have used it but without fully grasping the idea, and that smells disaster. This article and a whole journey I went through has been sparked by one question. Why you shouldn’t run blocking code on the event loop? The answer is simple as it will block the whole thing. I kinda knew that as everyone says it, so you can’t miss it really. But why is it? How does it work? If you would like to know read on.

Categories
microservices

Bridge to kubernetes

When you are using kubernetes you have a cluster, and clusters are rarely small. Your gripes may come in different flavours though. The cluster won’t fit on your local box any more. You most probably have people in your team that need the cluster as well. You need the feedback faster than deploying your changes to the cluster. What to do with all those problems?

Run a bridge that will route traffic to your local version of one of the microservices. Easy. You will run your service locally that will interact with a kubernetes cluster without interrupting anyone. Sounds amazing, doesn’t it.

To illustrate this let’s run a sample app in a cluster and then do some “development” and “debugging”. This will not affect the “development” environment for other people that could use it. The code will be isolated and your team will be happy.

Categories
microservices patterns

Cloud agnostic apps with DAPR

DAPR is cool, as stated on the website “APIs for building portable and reliable microservices”, it works with many clouds, and external services. As a result you only need to configure the services and then use DAPR APIs. It is true and I’ll show you. You will find the code for this article here. A must have tool when working with micro services.

Applications can use DAPR as a sidecar container or as a separate process. I’ll show you a local version of the app, where DAPR is configured to use Redis running in a container. The repo will have Azure, AWS, and GCP configuration as well.

Before we can start the adventure you have to install DAPR.

Running app locally

Configuration

You have to start off on the right foot. Because of that we have to configure secrets store where you can keep passwords and such. I could skip this step but then there is a risk someone will never find out how to do it properly and will ship passwords in plain text. Here we go.

# save under ./components/secrets.yaml
apiVersion: dapr.io/v1alpha1
kind: Component
metadata:
  name: local-secret-store
  namespace: default
spec:
  type: secretstores.local.file
  version: v1
  metadata:
  - name: secretsFile
    value: ../secrets.json

The file secrets.json should have all your secrets, like connection strings, user and pass pairs, etc. Don’t commit this file.

{
    "redisPass": "just a password"
}

Next file is publish subscribe configuration. Dead simple but I’d recommend going through the docs as there is much more to pub/sub. Here you can reference your secrets as shown.

# save under ./components/pubsub.yaml
apiVersion: dapr.io/v1alpha1
kind: Component
metadata:
  name: order_pub_sub
spec:
  type: pubsub.redis
  version: v1
  metadata:
  - name: redisHost
    value: localhost:6379
  - name: redisPass
    secretKeyRef:
      name: redisPass
      key: redisPass
auth:
  secretStore: local-secret-store

Publisher and subscriber

With config out of the way only thing left are publisher part and subscriber part. As mentioned before the app you write talks to DAPR API. This means you may use http calls or Dapr client. Best part is that no matter what is on the other end, be it Redis or PostgreSQL, your code will not change even when you change your external services.

Here goes publisher that will send events to a topic. Topic can be hosted anywhere, here is a list of supported brokers. The list is long however only 3 are stable. I really like how DAPR is approaching components certification though. There are well defined requirements to pass to advance from Alpha, to Beta, and finally to Stable.

# save under ./publisher/app.py

import logging

from dapr.clients import DaprClient
from fastapi import FastAPI
from pydantic import BaseModel

logging.basicConfig(level=logging.INFO)


app = FastAPI()


class Order(BaseModel):
    product: str


@app.post("/orders")
def orders(order: Order):
    logging.info("Received order")
    with DaprClient() as dapr_client:
        dapr_client.publish_event(
            pubsub_name="order_pub_sub",
            topic_name="orders",
            data=order.json(),
            data_content_type="application/json",
        )
    return order

Here is a consumer.

# save under ./consumer/app.py

from dapr.ext.fastapi import DaprApp
from fastapi import FastAPI
from pydantic import BaseModel

app = FastAPI()
dapr_app = DaprApp(app)


class CloudEvent(BaseModel):
    datacontenttype: str
    source: str
    topic: str
    pubsubname: str
    data: dict
    id: str
    specversion: str
    tracestate: str
    type: str
    traceid: str


@dapr_app.subscribe(pubsub="order_pub_sub", topic="orders")
def orders_subscriber(event: CloudEvent):
    print("Subscriber received : %s" % event.data["product"], flush=True)
    return {"success": True}

Running the apps

Now you can run both apps together in separate terminal windows and see how they talk to each other using configured broker. For this example we are using Redis as a broker. You will see how easy is to run them on different platforms.

In the first terminal run the consumer.

$ dapr run --app-id order-processor --components-path ../components/ --app-port 8000 -- uvicorn app:app

In the other terminal run the producer.

$ dapr run --app-id order-processor --components-path ../components/ --app-port 8001 -- uvicorn app:app --port 8001

After you make a HTTP call to a producer you should see both of them producing log messages as follows.

$ http :8001/orders product=falafel

# producer
== APP == INFO:root:Received order
== APP == INFO:     127.0.0.1:49698 - "POST /orders HTTP/1.1" 200 OK

# subscriber
== APP == Subscriber received : falafel
== APP == INFO:     127.0.0.1:49701 - "POST /events/order_pub_sub/orders HTTP/1.1" 200 OK

Running app in the cloud

It took us a bit to reach the clu of this post. We had to build something, and run it so then we can run it in the cloud. Above example will run on cloud with a simple change of configuration.

Simplest configuration is for Azure. Change your pubsub.yaml so it looks as follows, and update your secrets.json as well.

apiVersion: dapr.io/v1alpha1
kind: Component
metadata:
  name: order_pub_sub
  spec:
    type: pubsub.azure.servicebus
    version: v1
    metadata:
    - name: connectionString
      secretKeyRef:
        name: connectionStrings:azure
        key: connectionStrings:azure

Your secrets.json should look like this now

{
  "connectionStrings": {
    "azure": "YOUR CONNECTION STRING"
  }
}

Rerun both commands in terminal and the output will look the same as with local env but the app will run on Azure Service Bus.

Bloody magic if you’d ask me. You can mix and match your dependencies without changing your application. In some cases you may even use features not available to a particular cloud, like message routing based on body in Azure Service Bus. This will be another post though.

Here is the repo for this post, it includes all the providers listed below:

  • Azure
  • Google Cloud
  • AWS

Please remember to update your secrets.json.

Have fun 🙂

Categories
azure python

Azure Application Insights, FastAPI, and tracing

Debugging is difficult, what’s even more difficult, debugging production apps. Live production apps.

There are tools designed for this purpose. Azure has Application Insights, product that makes retracing history of events easier. When setup correctly you may go from a http request down to a Db call with all the query arguments. Pretty useful and definitely more convenient than sifting through log messages.

Application Insights console with end-to-end transaction details

Here you can see the exact query that had been executed on the database.

Application Insights console with SQL query details

You may also see every log related to a particular request in Log Analytics.

Application Insights logs with a query displaying only relevant logs

Improving your work life like this is pretty simple. Everything here is done using opencensus and it’s extensions. Opencensus integrates with Azure pretty nicely. First thing to do is to install required dependencies.

# pip 
pip install opencensus-ext-azure opencensus-ext-logging opencensus-ext-sqlalchemy opencensus-ext-requests

# pipenv

pipenv install opencensus-ext-azure opencensus-ext-logging opencensus-ext-sqlalchemy opencensus-ext-requests

# poetry

poetry add opencensus-ext-azure opencensus-ext-logging opencensus-ext-sqlalchemy opencensus-ext-requests

Next step is to activate them by including a couple of lines in your code. Here I activate 3 extensions, logging, requests, and sqlalchemy. Here is a list of other official extensions.

import logging

from opencensus.trace import config_integration
from opencensus.ext.azure.log_exporter import AzureLogHandler

logger = logging.getLogger(__name__)
config_integration.trace_integrations(["logging", "requests", "sqlalchemy"])

handler = AzureLogHandler(
    connection_string="InstrumentationKey=YOUR_KEY"
)
handler.setFormatter(logging.Formatter("%(traceId)s %(spanId)s %(message)s"))
logger.addHandler(handler)

One last thing is a middleware that will instrument every request. This code is taken from Microsoft’s documentation.

@app.middleware("http")
async def middlewareOpencensus(request: Request, call_next):
    tracer = Tracer(
        exporter=AzureExporter(
            connection_string="InstrumentationKey=YOUR_KEY"
        ),
        sampler=ProbabilitySampler(1.0),
    )
    with tracer.span("main") as span:
        span.span_kind = SpanKind.SERVER

        response = await call_next(request)

        tracer.add_attribute_to_current_span(
            attribute_key=HTTP_STATUS_CODE, attribute_value=response.status_code
        )
        tracer.add_attribute_to_current_span(
            attribute_key=HTTP_URL, attribute_value=str(request.url)
        )

    return response

You are done 🙂 you will not loose information on what is going on in the app. You will be quicker in finding problems and resolving them. Life’s good now.

Categories
aws cdk devops python

Deploy Sagemaker Endpoint using AWS CDK

I was looking for a way to deploy a custom model to Sagemaker. Unfortunately, my online searches failed to find anything that was not using Jupiter notebooks. I like them but this way of deploying models is not a reproducible way nor it is scalable.

After a couple of hours of looking, I decided to do it myself. Here comes a recipe for deploying a custom model to Sagemaker using AWS CDK.

The following steps assume you have knowledge of CDK and Sagemaker. I’ll try to explain as much as I can but if anything is unclear please refer to the docs.

Steps

  1. Prepare containerised application serving your model.
  2. Create Sagemaker model.
  3. Create Sagemaker Endpoint configuration.
  4. Deploy Sagemaker Endpoint.

Unfortunately, AWS CDK does not support higher-level constructs for Sagemaker. You have to use CloudFormation constructs which start with the prefix Cfn. Higher-level constructs for Sagemaker are not on the roadmap as of March 2021.

Dockerfile to serve model

First thing is to have your app in a container form, so it can be deployed in a predictable way. It’s difficult to help with this step as each model may require different dependencies or actions. What I can recommend is to go over https://docs.aws.amazon.com/sagemaker/latest/dg/build-multi-model-build-container.html. This page explains the steps required to prepare a container that can serve a model on Sagemaker. It may also be helpful to read this part https://docs.aws.amazon.com/sagemaker/latest/dg/your-algorithms-inference-code.html on how your docker image will be used.

Define Sagemaker model

Once you have your model in a container form it is time to create a Sagemaker model. There are 3 elements to a Sagemaker model:

  • Container definition
  • VPC configuration for a model
  • Model definition

Adding container definition to your app is simple (the hard part of creating a docker image is already done). The container definition will be used by the Sagemaker model.

asset = DockerImageAsset(
    self,
    "MLInferenceImage",
    directory="../image")

primary_container_definition = sagemaker.CfnModel.ContainerDefinitionProperty(
     image=asset.image_uri,
)Code language: PHP (php)

Creating Vpc is pretty straightforward, you have to remember about creating public and private subnets.

vpc = ec2.Vpc(
    self,
    "VPC",
    subnet_configuration=[
        ec2.SubnetConfiguration(
            name="public-model-subnet", subnet_type=ec2.SubnetType.PUBLIC
                ),
        ec2.SubnetConfiguration(
            name="private-model-subnet", subnet_type=ec2.SubnetType.PRIVATE
                ),
            ],
        )
        
model_vpc_config = 
    sagemaker.CfnModel.VpcConfigProperty(
        security_group_ids=[vpc.vpc_default_security_group],
        subnets=[s.subnet_id for s in vpc.private_subnets],
        )Code language: PHP (php)

Creating a model is putting all created things together.

model = sagemaker.CfnModel(
    self,
    "MLInference",
    execution_role_arn=role.role_arn,
    model_name="my-model",
    primary_container=primary_container_definition,
    vpc_config=model_vpc_config,
)Code language: PHP (php)

At this point, cdk deploy would create Sagemaker model with an ML model of your choice.

Define endpoint configuration

We are not done yet as the model has to be exposed. Sagemaker Endpoint is perfect for this and in the next step we create endpoint configuration.

Endpoint configuration describes resources that will serve your model.

model_endpoint_config = sagemaker.CfnEndpointConfig(
    self,
    "model-endpoint-config",
    production_variants=[
sagemaker.CfnEndpointConfig.ProductionVariantProperty(
        initial_instance_count=1,
        initial_variant_weight=1.0,
        instance_type="ml.t2.medium",
        model_name=model.model_name,
        variant_name="production-medium",
        ),
    ],
)Code language: PHP (php)

Create Sagemaker Endpoint

Last step is extremely simple. We take the configuration created earlier and create an endpoint.

model_endpoint = sagemaker.CfnEndpoint(
    self,
    "model-endpoint", endpoint_config_name=model_endpoint_config.attr_endpoint_config_name,
)Code language: PHP (php)

Congrats

Now you may call cdk deploy and the model is up and running on AWS Sagemaker 🙂

Categories
microservices patterns

GitOps workflow

I have been using GitOps in my last project and I like the way it changed my workflow. It had to change as in the world of microservices old ways have to go. It is not a post if this is good or bad, I may write one someday. This post is about GitOps, if you do not know what GitOps is read here. TLDR version: in practice, each commit deploys a new version of a service to one’s cluster.

Going back to the subject of the workflow. I’ll focus on the microservices workflow as here in my opinion GitOps is extremely useful. One of the main pain points of microservices architecture is deployment. When deployments are mentioned you instinctively think about deploying the application. It may be difficult but it is not as difficult as creating developer environments and QA environments.

Here comes GitOps. When applied to your project you immediately get new service per each commit. This applies to each service you have. Having this at your disposal you can set up your application in numerous combinations of versions. You can also easily replace one of the services in your stack. Sounds good but nothing beats a demo, so here we go.

Demo

Let’s say I have a task of verifying if our changes to one of the microservices are correct. I’m using Rio to manage my Kubernetes cluster as it makes things smoother. Change in one service affects another service, and I have to verify it using UI. This adds up to 3 services deployed in one namespace and configured so they talk to each other. After I add commit in the service repository there is a namespace already created on a cluster. Each commit creates a new version of the service.

% rio -n bugfix ps -q
bugfix:appCode language: CSS (css)

Now I need to add missing services, and I can do it by branching off from master. The name of the branch must match in all services involved.

% cd other_services && git checkout -b bugfix 
% git push

After pushing the changes Rio adds them to the same namespace.

% rio -n bugfix ps -q
bugfix:app
bugifx:web
bugfix:other_appCode language: CSS (css)

One thing left is to wire them up so services talk to each other. As I’m using recommendations from https://12factor.net/config so it is dead easy, and I can use Rio to do it. Edit command allows me to modify the environment variables of each service.

% rio -n bugfix edit web

This opens up your favourite text editor where you can edit the variables and setup where the web app can find other services. You can do the same changes in other services if necessary.

I have wired up services, they are talking to each other and I can proceed with my work. This my workflow using GitOps, Rio, and microservices.

Categories
async python

Background tasks on the cheap

When integrating with third party API’s you need to make sure that your requests reach the third party. In case of issues on their end you want to retry and best not to interrupt the flow of your application or even worse pass the information about such issues to the end user (like leaking 503 errors).

Most popular solution is to use a background task and there are tools for helping with that: celery, python-rq, or dramatiq. They do the job of executing the code in the background but they require some extra infrastructure to make it work, plus all the dependencies they are bringing in. I have used them all in the past with great success but most recently decided to write a basic background task myself.

Why? As I mentioned earlier all of them require extra infrastructure in a form of a broker that most of the time is redis, this implies changes to deployment, requires additional resources, makes the stack more complex. The scope of what I had to do just did not justify bringing in this whole baggage. I needed to retry calls to AWS Glue service in case we maxed out capacity. Since the Glue job we are executing can take a couple minutes our calls to AWS Glue had to be pushed into the background. I’ll give you the code and summarize what it does. By no means this code is perfect but it works 🙂

# background.py
import threading
from queue import Queue

task_queue = Queue()
worker_thread = None


def enqueue_task(task):
    task_queue.put_nowait(task)

    global worker_thread
    if not worker_thread:
        worker_thread = _run_worker_thread()


def _process_tasks(task_queue):
    while task_queue.qsize():
        task = task_queue.get()
        try:
            print(f"Do stuff with task: {task}")
        except Exception as e:
            task_queue.put(task)

    global worker_thread
    worker_thread = None


def _run_worker_thread():
    t = threading.Thread(target=_process_tasks, args=(task_queue,))
    t.start()
    return t

Public interface of this small background module is one function enqueue_task. When called task is put on the queue and thread is started. Each subsequent call will enqueue task and thread will be closed after it processed all of them.

I find this simple and flexible enough to handle communication with flaky services or services with usage caps. Since this can not be scaled it has limited usage, but HTTP calls are just fine. This code had been inspired by one of the talks of Raymond Hettinger regarding concurrency and queue module.