Lifespan Events
Did you know that you can define some special code that runs before and after your Kafka application? This code will be executed just once, but it covers the whole lifespan of your app! π
Lets break it down:
You can define logic (code) that should be executed before the application starts up. This is like a warm-up for your app, getting it ready to consume and produce messages.
Similarly, you can define logic (code) that should be executed when the application is shutting down. This is like a cool-down for your app, making sure everything is properly closed and cleaned up.
By executing code before consuming and after producing, you cover the entire lifecycle of your application π
This is super handy for setting up shared resources that are needed across consumers and producers, like a database connection pool or a machine learning model. And the best part? You can clean up these resources when the app is shutting down!
So lets give it a try and see how it can make your Kafka app even more awesome! πͺ
Lifespan example - Iris prediction modelβ
Letβs dive into an example to see how you can leverage the lifecycle handler to solve a common use case. Imagine that you have some machine learning models that need to consume incoming messages and produce response/prediction messages. These models are shared among consumers and producers, which means you donβt want to load them for every message.
Hereβs where the lifecycle handler comes to the rescue! By loading the model before the messages are consumed and produced, but only right before the application starts receiving messages, you can ensure that the model is ready to use without compromising the performance of your tests. In the upcoming sections, weβll walk you through how to initialize an Iris species prediction model and use it in your developed application.
Lifespanβ
You can define this startup and shutdown logic using the lifespan parameter of the FastKafka app, and an async context manager.
Letβs start with an example and then see it in detail.
We create an async function lifespan() with yield like this:
from sklearn.datasets import load_iris
from sklearn.linear_model import LogisticRegression
from contextlib import asynccontextmanager
from fastkafka import FastKafka
ml_models = {}
@asynccontextmanager
async def lifespan(app: FastKafka):
# Load the ML model
print("Loading the model!")
X, y = load_iris(return_X_y=True)
ml_models["iris_predictor"] = LogisticRegression(random_state=0, max_iter=500).fit(X, y)
yield
# Clean up the ML models and release the resources
print("Exiting, clearing model dict!")
ml_models.clear()
The first thing to notice, is that we are defining an async function
with yield
. This is very similar to Dependencies with yield
.
The first part of the function, before the yield
, will be executed
before the application starts. And the part after the yield
will
be executed after the application has finished.
This lifespan will create an iris_prediction model on application startup and cleanup the references after the app is shutdown.
The lifespan will be passed an KafkaApp reference on startup of your application, which you can use to reference your application on startup.
For demonstration sake, we also added prints so that when running the app we can see that our lifespan was called.
Async context managerβ
Context managers can be used in with
blocks, our lifespan, for example
could be used like this:
ml_models = {}
async with lifespan(None):
print(ml_models)
When you create a context manager or an async context manager, what it
does is that, before entering the with
block, it will execute the code
before the yield
, and after exiting the with
block, it will execute
the code after the yield
.
If you want to learn more about context managers and contextlib decorators, please visit Python official docs
App demoβ
FastKafka appβ
Lets now create our application using the created lifespan handler.
Notice how we passed our lifespan handler to the app when constructing
it trough the lifespan
argument.
from fastkafka import FastKafka
kafka_brokers = {
"localhost": {
"url": "<url_of_your_kafka_bootstrap_server>",
"description": "local development kafka broker",
"port": "<port_of_your_kafka_bootstrap_server>",
},
}
kafka_app = FastKafka(
title="Iris predictions",
kafka_brokers=kafka_brokers,
lifespan=lifespan,
)
Data modelingβ
Lets model the Iris data for our app:
from pydantic import BaseModel, Field, NonNegativeFloat
class IrisInputData(BaseModel):
sepal_length: NonNegativeFloat = Field(
..., example=0.5, description="Sepal length in cm"
)
sepal_width: NonNegativeFloat = Field(
..., example=0.5, description="Sepal width in cm"
)
petal_length: NonNegativeFloat = Field(
..., example=0.5, description="Petal length in cm"
)
petal_width: NonNegativeFloat = Field(
..., example=0.5, description="Petal width in cm"
)
class IrisPrediction(BaseModel):
species: str = Field(..., example="setosa", description="Predicted species")
Consumers and producersβ
Lets create a consumer and producer for our app that will generate predictions from input iris data.
@kafka_app.consumes(topic="input_data", auto_offset_reset="latest")
async def on_input_data(msg: IrisInputData):
species_class = ml_models["iris_predictor"].predict(
[[msg.sepal_length, msg.sepal_width, msg.petal_length, msg.petal_width]]
)[0]
await to_predictions(species_class)
@kafka_app.produces(topic="predictions")
async def to_predictions(species_class: int) -> IrisPrediction:
iris_species = ["setosa", "versicolor", "virginica"]
prediction = IrisPrediction(species=iris_species[species_class])
return prediction
Final appβ
The final app looks like this:
from sklearn.datasets import load_iris
from sklearn.linear_model import LogisticRegression
from contextlib import asynccontextmanager
from pydantic import BaseModel, Field, NonNegativeFloat
from fastkafka import FastKafka
class IrisInputData(BaseModel):
sepal_length: NonNegativeFloat = Field(
..., example=0.5, description="Sepal length in cm"
)
sepal_width: NonNegativeFloat = Field(
..., example=0.5, description="Sepal width in cm"
)
petal_length: NonNegativeFloat = Field(
..., example=0.5, description="Petal length in cm"
)
petal_width: NonNegativeFloat = Field(
..., example=0.5, description="Petal width in cm"
)
class IrisPrediction(BaseModel):
species: str = Field(..., example="setosa", description="Predicted species")
ml_models = {}
@asynccontextmanager
async def lifespan(app: FastKafka):
# Load the ML model
print("Loading the model!")
X, y = load_iris(return_X_y=True)
ml_models["iris_predictor"] = LogisticRegression(random_state=0, max_iter=500).fit(X, y)
yield
# Clean up the ML models and release the resources
print("Exiting, clearing model dict!")
ml_models.clear()
kafka_brokers = {
"localhost": {
"url": "<url_of_your_kafka_bootstrap_server>",
"description": "local development kafka broker",
"port": "<port_of_your_kafka_bootstrap_server>",
},
}
kafka_app = FastKafka(
title="Iris predictions",
kafka_brokers=kafka_brokers,
lifespan=lifespan,
)
@kafka_app.consumes(topic="input_data", auto_offset_reset="latest")
async def on_input_data(msg: IrisInputData):
species_class = ml_models["iris_predictor"].predict(
[[msg.sepal_length, msg.sepal_width, msg.petal_length, msg.petal_width]]
)[0]
await to_predictions(species_class)
@kafka_app.produces(topic="predictions")
async def to_predictions(species_class: int) -> IrisPrediction:
iris_species = ["setosa", "versicolor", "virginica"]
prediction = IrisPrediction(species=iris_species[species_class])
return prediction
Running the appβ
Now we can run the app with your custom lifespan handler. Copy the code above in lifespan_example.py and run it by running
fastkafka run --num-workers=1 --kafka-broker=localhost lifespan_example:kafka_app
When you run the app, you should see a simmilar output to the one below:
Recapβ
In this guide we have defined a lifespan handler and passed to our FastKafka app.
Some important points are:
- Lifespan handler is implemented as AsyncContextManager
- Code before yield in lifespan will be executed before application startup
- Code after yield in lifespan will be executed after application shutdown
- You can pass your lifespan handler to FastKafka app on
initialisation by passing a
lifespan
argument