Skip to main content
Version: 0.7.1

Lifespan Events

Did you know that you can define some special code that runs before and after your Kafka application? This code will be executed just once, but it covers the whole lifespan of your app! 🚀

Lets break it down:

You can define logic (code) that should be executed before the application starts up. This is like a warm-up for your app, getting it ready to consume and produce messages.

Similarly, you can define logic (code) that should be executed when the application is shutting down. This is like a cool-down for your app, making sure everything is properly closed and cleaned up.

By executing code before consuming and after producing, you cover the entire lifecycle of your application 🎉

This is super handy for setting up shared resources that are needed across consumers and producers, like a database connection pool or a machine learning model. And the best part? You can clean up these resources when the app is shutting down!

So lets give it a try and see how it can make your Kafka app even more awesome! 💪

Lifespan example - Iris prediction model​

Let’s dive into an example to see how you can leverage the lifecycle handler to solve a common use case. Imagine that you have some machine learning models that need to consume incoming messages and produce response/prediction messages. These models are shared among consumers and producers, which means you don’t want to load them for every message.

Here’s where the lifecycle handler comes to the rescue! By loading the model before the messages are consumed and produced, but only right before the application starts receiving messages, you can ensure that the model is ready to use without compromising the performance of your tests. In the upcoming sections, we’ll walk you through how to initialize an Iris species prediction model and use it in your developed application.

Lifespan​

You can define this startup and shutdown logic using the lifespan parameter of the FastKafka app, and an async context manager.

Let’s start with an example and then see it in detail.

We create an async function lifespan() with yield like this:

from sklearn.datasets import load_iris
from sklearn.linear_model import LogisticRegression
from contextlib import asynccontextmanager

from fastkafka import FastKafka

ml_models = {}

@asynccontextmanager
async def lifespan(app: FastKafka):
# Load the ML model
print("Loading the model!")
X, y = load_iris(return_X_y=True)
ml_models["iris_predictor"] = LogisticRegression(random_state=0, max_iter=500).fit(X, y)
yield
# Clean up the ML models and release the resources

print("Exiting, clearing model dict!")
ml_models.clear()

The first thing to notice, is that we are defining an async function with yield. This is very similar to Dependencies with yield.

The first part of the function, before the yield, will be executed before the application starts. And the part after the yield will be executed after the application has finished.

This lifespan will create an iris_prediction model on application startup and cleanup the references after the app is shutdown.

The lifespan will be passed an KafkaApp reference on startup of your application, which you can use to reference your application on startup.

For demonstration sake, we also added prints so that when running the app we can see that our lifespan was called.

Async context manager​

Context managers can be used in with blocks, our lifespan, for example could be used like this:

ml_models = {}
async with lifespan(None):
print(ml_models)

When you create a context manager or an async context manager, what it does is that, before entering the with block, it will execute the code before the yield, and after exiting the with block, it will execute the code after the yield.

If you want to learn more about context managers and contextlib decorators, please visit Python official docs

App demo​

FastKafka app​

Lets now create our application using the created lifespan handler.

Notice how we passed our lifespan handler to the app when constructing it trough the lifespan argument.

from fastkafka import FastKafka

kafka_brokers = {
"localhost": {
"url": "<url_of_your_kafka_bootstrap_server>",
"description": "local development kafka broker",
"port": "<port_of_your_kafka_bootstrap_server>",
},
}

kafka_app = FastKafka(
title="Iris predictions",
kafka_brokers=kafka_brokers,
lifespan=lifespan,
)

Data modeling​

Lets model the Iris data for our app:

from pydantic import BaseModel, Field, NonNegativeFloat

class IrisInputData(BaseModel):
sepal_length: NonNegativeFloat = Field(
..., example=0.5, description="Sepal length in cm"
)
sepal_width: NonNegativeFloat = Field(
..., example=0.5, description="Sepal width in cm"
)
petal_length: NonNegativeFloat = Field(
..., example=0.5, description="Petal length in cm"
)
petal_width: NonNegativeFloat = Field(
..., example=0.5, description="Petal width in cm"
)


class IrisPrediction(BaseModel):
species: str = Field(..., example="setosa", description="Predicted species")

Consumers and producers​

Lets create a consumer and producer for our app that will generate predictions from input iris data.

@kafka_app.consumes(topic="input_data", auto_offset_reset="latest")
async def on_input_data(msg: IrisInputData):
species_class = ml_models["iris_predictor"].predict(
[[msg.sepal_length, msg.sepal_width, msg.petal_length, msg.petal_width]]
)[0]

to_predictions(species_class)


@kafka_app.produces(topic="predictions")
def to_predictions(species_class: int) -> IrisPrediction:
iris_species = ["setosa", "versicolor", "virginica"]

prediction = IrisPrediction(species=iris_species[species_class])
return prediction

Final app​

The final app looks like this:

from sklearn.datasets import load_iris
from sklearn.linear_model import LogisticRegression
from contextlib import asynccontextmanager

from pydantic import BaseModel, Field, NonNegativeFloat

from fastkafka import FastKafka

class IrisInputData(BaseModel):
sepal_length: NonNegativeFloat = Field(
..., example=0.5, description="Sepal length in cm"
)
sepal_width: NonNegativeFloat = Field(
..., example=0.5, description="Sepal width in cm"
)
petal_length: NonNegativeFloat = Field(
..., example=0.5, description="Petal length in cm"
)
petal_width: NonNegativeFloat = Field(
..., example=0.5, description="Petal width in cm"
)


class IrisPrediction(BaseModel):
species: str = Field(..., example="setosa", description="Predicted species")
ml_models = {}

@asynccontextmanager
async def lifespan(app: FastKafka):
# Load the ML model
print("Loading the model!")
X, y = load_iris(return_X_y=True)
ml_models["iris_predictor"] = LogisticRegression(random_state=0, max_iter=500).fit(X, y)
yield
# Clean up the ML models and release the resources

print("Exiting, clearing model dict!")
ml_models.clear()

kafka_brokers = {
"localhost": {
"url": "<url_of_your_kafka_bootstrap_server>",
"description": "local development kafka broker",
"port": "<port_of_your_kafka_bootstrap_server>",
},
}

kafka_app = FastKafka(
title="Iris predictions",
kafka_brokers=kafka_brokers,
lifespan=lifespan,
)

@kafka_app.consumes(topic="input_data", auto_offset_reset="latest")
async def on_input_data(msg: IrisInputData):
species_class = ml_models["iris_predictor"].predict(
[[msg.sepal_length, msg.sepal_width, msg.petal_length, msg.petal_width]]
)[0]

to_predictions(species_class)


@kafka_app.produces(topic="predictions")
def to_predictions(species_class: int) -> IrisPrediction:
iris_species = ["setosa", "versicolor", "virginica"]

prediction = IrisPrediction(species=iris_species[species_class])
return prediction

Running the app​

Now we can run the app with your custom lifespan handler. Copy the code above in lifespan_example.py and run it by running

fastkafka run --num-workers=1 --kafka-broker=localhost lifespan_example:kafka_app

When you run the app, you should see a simmilar output to the one below:

[262292]: [INFO] fastkafka._application.app: set_kafka_broker() : Setting bootstrap_servers value to '127.0.0.1:9092'
[262292]: Loading the model!
[262292]: [INFO] fastkafka._application.app: _create_producer() : created producer using the config: '{'bootstrap_servers': '127.0.0.1:9092'}'
[262292]: [INFO] fastkafka._components.aiokafka_producer_manager: AIOKafkaProducerManager.start(): Entering...
[262292]: [INFO] fastkafka._components.aiokafka_producer_manager: _aiokafka_producer_manager(): Starting...
[262292]: [INFO] fastkafka._components.aiokafka_producer_manager: _aiokafka_producer_manager(): Starting send_stream
[262292]: [INFO] fastkafka._components.aiokafka_producer_manager: AIOKafkaProducerManager.start(): Finished.
[262292]: [INFO] fastkafka._components.aiokafka_consumer_loop: aiokafka_consumer_loop() starting...
[262292]: [INFO] fastkafka._components.aiokafka_consumer_loop: aiokafka_consumer_loop(): Consumer created using the following parameters: {'bootstrap_servers': '127.0.0.1:9092', 'auto_offset_reset': 'latest', 'max_poll_records': 100}
[262292]: [INFO] fastkafka._components.aiokafka_consumer_loop: aiokafka_consumer_loop(): Consumer started.
[262292]: [INFO] aiokafka.consumer.subscription_state: Updating subscribed topics to: frozenset({'input_data'})
[262292]: [INFO] aiokafka.consumer.consumer: Subscribed to topic(s): {'input_data'}
[262292]: [INFO] fastkafka._components.aiokafka_consumer_loop: aiokafka_consumer_loop(): Consumer subscribed.
[262292]: [WARNING] aiokafka.cluster: Topic input_data is not available during auto-create initialization
[262292]: [INFO] aiokafka.consumer.group_coordinator: Metadata for topic has changed from {} to {'input_data': 0}.
Starting process cleanup, this may take a few seconds...
[INFO] fastkafka._server: terminate_asyncio_process(): Terminating the process 262292...
[262292]: [INFO] fastkafka._components.aiokafka_consumer_loop: aiokafka_consumer_loop(): Consumer stopped.
[262292]: [INFO] fastkafka._components.aiokafka_consumer_loop: aiokafka_consumer_loop() finished.
[262292]: [INFO] fastkafka._components.aiokafka_producer_manager: AIOKafkaProducerManager.stop(): Entering...
[262292]: [INFO] fastkafka._components.aiokafka_producer_manager: _aiokafka_producer_manager(): Exiting send_stream
[262292]: [INFO] fastkafka._components.aiokafka_producer_manager: _aiokafka_producer_manager(): Finished.
[262292]: [INFO] fastkafka._components.aiokafka_producer_manager: AIOKafkaProducerManager.stop(): Stoping producer...
[262292]: [INFO] fastkafka._components.aiokafka_producer_manager: AIOKafkaProducerManager.stop(): Finished
[262292]: Exiting, clearing model dict!
[INFO] fastkafka._server: terminate_asyncio_process(): Process 262292 terminated.

Recap​

In this guide we have defined a lifespan handler and passed to our FastKafka app.

Some important points are:

  1. Lifespan handler is implemented as AsyncContextManager
  2. Code before yield in lifespan will be executed before application startup
  3. Code after yield in lifespan will be executed after application shutdown
  4. You can pass your lifespan handler to FastKafka app on initialisation by passing a lifespan argument