FastKafka tutorial
FastKafka is a powerful and easy-to-use Python library for building asynchronous services that interact with Kafka topics. Built on top of Pydantic, AIOKafka and AsyncAPI, FastKafka simplifies the process of writing producers and consumers for Kafka topics, handling all the parsing, networking, task scheduling and data generation automatically. With FastKafka, you can quickly prototype and develop high-performance Kafka-based services with minimal code, making it an ideal choice for developers looking to streamline their workflow and accelerate their projects.
Install​
FastKafka works on macOS, Linux, and most Unix-style operating systems.
You can install it with pip
as usual:
pip install fastkafka
try:
import fastkafka
except:
! pip install fastkafka
Running in Colab​
You can start this interactive tutorial in Google Colab by clicking the button below:
Writing server code​
Here is an example python script using FastKafka that takes data from a Kafka topic, makes a prediction using a predictive model, and outputs the prediction to another Kafka topic.
Preparing the demo model​
First we will prepare our model using the Iris dataset so that we can demonstrate the preditions using FastKafka. The following call downloads the dataset and trains the model.
We will wrap the model creation into a lifespan of our app so that the model is created just before the app is started.
from contextlib import asynccontextmanager
from sklearn.datasets import load_iris
from sklearn.linear_model import LogisticRegression
from fastkafka import FastKafka
ml_models = {}
@asynccontextmanager
async def lifespan(app: FastKafka):
# Load the ML model
X, y = load_iris(return_X_y=True)
ml_models["iris_predictor"] = LogisticRegression(random_state=0, max_iter=500).fit(
X, y
)
yield
# Clean up the ML models and release the resources
ml_models.clear()
Messages​
FastKafka uses Pydantic to parse input
JSON-encoded data into Python objects, making it easy to work with
structured data in your Kafka-based applications. Pydantic’s
BaseModel
class allows you
to define messages using a declarative syntax, making it easy to specify
the fields and types of your messages.
This example defines two message classes for use in a FastKafka application:
The
IrisInputData
class is used to represent input data for a predictive model. It has four fields of typeNonNegativeFloat
, which is a subclass of float that only allows non-negative floating point values.The
IrisPrediction
class is used to represent the output of the predictive model. It has a single fieldspecies
of type string representing the predicted species.
These message classes will be used to parse and validate incoming data in Kafka consumers and producers.
from pydantic import BaseModel, Field, NonNegativeFloat
class IrisInputData(BaseModel):
sepal_length: NonNegativeFloat = Field(
..., example=0.5, description="Sepal length in cm"
)
sepal_width: NonNegativeFloat = Field(
..., example=0.5, description="Sepal width in cm"
)
petal_length: NonNegativeFloat = Field(
..., example=0.5, description="Petal length in cm"
)
petal_width: NonNegativeFloat = Field(
..., example=0.5, description="Petal width in cm"
)
class IrisPrediction(BaseModel):
species: str = Field(..., example="setosa", description="Predicted species")
Application​
This example shows how to initialize a FastKafka application.
It starts by defining a dictionary called kafka_brokers
, which
contains two entries: "localhost"
and "production"
, specifying local
development and production Kafka brokers. Each entry specifies the URL,
port, and other details of a Kafka broker. This dictionary is used for
generating the documentation only and it is not being checked by the
actual server.
Next, an object of the FastKafka
class is initialized with the minimum
set of arguments:
kafka_brokers
: a dictionary used for generation of documentation
from fastkafka import FastKafka
kafka_brokers = {
"localhost": {
"url": "localhost",
"description": "local development kafka broker",
"port": 9092,
},
"production": {
"url": "kafka.airt.ai",
"description": "production kafka broker",
"port": 9092,
"protocol": "kafka-secure",
"security": {"type": "plain"},
},
}
kafka_app = FastKafka(
title="Iris predictions",
kafka_brokers=kafka_brokers,
lifespan=lifespan,
)
Function decorators​
FastKafka provides convenient function decorators @kafka_app.consumes
and @kafka_app.produces
to allow you to delegate the actual process of
consuming and producing data to Kafka, and
decoding and encoding JSON encode messages
from user defined functions to the framework. The FastKafka framework delegates these jobs to AIOKafka and Pydantic libraries.
These decorators make it easy to specify the processing logic for your Kafka consumers and producers, allowing you to focus on the core business logic of your application without worrying about the underlying Kafka integration.
This following example shows how to use the @kafka_app.consumes
and
@kafka_app.produces
decorators in a FastKafka application:
The
@kafka_app.consumes
decorator is applied to theon_input_data
function, which specifies that this function should be called whenever a message is received on the “input_data" Kafka topic. Theon_input_data
function takes a single argument which is expected to be an instance of theIrisInputData
message class. Specifying the type of the single argument is instructing the Pydantic to useIrisInputData.parse_raw()
on the consumed message before passing it to the user defined functionon_input_data
.The
@produces
decorator is applied to theto_predictions
function, which specifies that this function should produce a message to the “predictions" Kafka topic whenever it is called. Theto_predictions
function takes a single integer argumentspecies_class
representing one of three possible strign values predicted by the mdoel. It creates a newIrisPrediction
message using this value and then returns it. The framework will call theIrisPrediction.json().encode("utf-8")
function on the returned value and produce it to the specified topic.
@kafka_app.consumes(topic="input_data", auto_offset_reset="latest")
async def on_input_data(msg: IrisInputData):
species_class = ml_models["iris_predictor"].predict(
[[msg.sepal_length, msg.sepal_width, msg.petal_length, msg.petal_width]]
)[0]
await to_predictions(species_class)
@kafka_app.produces(topic="predictions")
async def to_predictions(species_class: int) -> IrisPrediction:
iris_species = ["setosa", "versicolor", "virginica"]
prediction = IrisPrediction(species=iris_species[species_class])
return prediction
Testing the service​
The service can be tested using the Tester
instances which internally
starts Kafka broker and zookeeper.
Before running tests, we have to install Java runtime and Apache Kafka locally. To simplify the process, we provide the following convenience command:
fastkafka testing install_deps
[INFO] fastkafka._components.test_dependencies: Java is already installed.
[INFO] fastkafka._components.test_dependencies: But not exported to PATH, exporting...
[INFO] fastkafka._components.test_dependencies: Kafka is installed.
[INFO] fastkafka._components.test_dependencies: But not exported to PATH, exporting...
[INFO] fastkafka._components.test_dependencies: Java is already installed.
[INFO] fastkafka._components.test_dependencies: But not exported to PATH, exporting...
[INFO] fastkafka._components.test_dependencies: Kafka is installed.
[INFO] fastkafka._components.test_dependencies: But not exported to PATH, exporting...
from fastkafka.testing import Tester
msg = IrisInputData(
sepal_length=0.1,
sepal_width=0.2,
petal_length=0.3,
petal_width=0.4,
)
# Start Tester app and create local Kafka broker for testing
async with Tester(kafka_app) as tester:
# Send IrisInputData message to input_data topic
await tester.to_input_data(msg)
# Assert that the kafka_app responded with IrisPrediction in predictions topic
await tester.awaited_mocks.on_predictions.assert_awaited_with(
IrisPrediction(species="setosa"), timeout=2
)
[INFO] fastkafka._testing.in_memory_broker: InMemoryBroker._start() called
[INFO] fastkafka._testing.in_memory_broker: InMemoryBroker._patch_consumers_and_producers(): Patching consumers and producers!
[INFO] fastkafka._testing.in_memory_broker: InMemoryBroker starting
[INFO] fastkafka._application.app: _create_producer() : created producer using the config: '{'bootstrap_servers': 'localhost:9092'}'
[INFO] fastkafka._testing.in_memory_broker: AIOKafkaProducer patched start() called()
[INFO] fastkafka._application.app: _create_producer() : created producer using the config: '{'bootstrap_servers': 'localhost:9092'}'
[INFO] fastkafka._testing.in_memory_broker: AIOKafkaProducer patched start() called()
[INFO] fastkafka._components.aiokafka_consumer_loop: aiokafka_consumer_loop() starting...
[INFO] fastkafka._components.aiokafka_consumer_loop: aiokafka_consumer_loop(): Consumer created using the following parameters: {'bootstrap_servers': 'localhost:9092', 'auto_offset_reset': 'latest', 'max_poll_records': 100}
[INFO] fastkafka._testing.in_memory_broker: AIOKafkaConsumer patched start() called()
[INFO] fastkafka._components.aiokafka_consumer_loop: aiokafka_consumer_loop(): Consumer started.
[INFO] fastkafka._testing.in_memory_broker: AIOKafkaConsumer patched subscribe() called
[INFO] fastkafka._testing.in_memory_broker: AIOKafkaConsumer.subscribe(), subscribing to: ['input_data']
[INFO] fastkafka._components.aiokafka_consumer_loop: aiokafka_consumer_loop(): Consumer subscribed.
[INFO] fastkafka._components.aiokafka_consumer_loop: aiokafka_consumer_loop() starting...
[INFO] fastkafka._components.aiokafka_consumer_loop: aiokafka_consumer_loop(): Consumer created using the following parameters: {'bootstrap_servers': 'localhost:9092', 'auto_offset_reset': 'earliest', 'max_poll_records': 100}
[INFO] fastkafka._testing.in_memory_broker: AIOKafkaConsumer patched start() called()
[INFO] fastkafka._components.aiokafka_consumer_loop: aiokafka_consumer_loop(): Consumer started.
[INFO] fastkafka._testing.in_memory_broker: AIOKafkaConsumer patched subscribe() called
[INFO] fastkafka._testing.in_memory_broker: AIOKafkaConsumer.subscribe(), subscribing to: ['predictions']
[INFO] fastkafka._components.aiokafka_consumer_loop: aiokafka_consumer_loop(): Consumer subscribed.
[INFO] fastkafka._components.aiokafka_consumer_loop: _aiokafka_consumer_loop(): Consumer loop shutting down, waiting for send_stream to drain...
[INFO] fastkafka._testing.in_memory_broker: AIOKafkaConsumer patched stop() called
[INFO] fastkafka._components.aiokafka_consumer_loop: aiokafka_consumer_loop(): Consumer stopped.
[INFO] fastkafka._components.aiokafka_consumer_loop: aiokafka_consumer_loop() finished.
[INFO] fastkafka._testing.in_memory_broker: AIOKafkaProducer patched stop() called
[INFO] fastkafka._components.aiokafka_consumer_loop: _aiokafka_consumer_loop(): Consumer loop shutting down, waiting for send_stream to drain...
[INFO] fastkafka._testing.in_memory_broker: AIOKafkaConsumer patched stop() called
[INFO] fastkafka._components.aiokafka_consumer_loop: aiokafka_consumer_loop(): Consumer stopped.
[INFO] fastkafka._components.aiokafka_consumer_loop: aiokafka_consumer_loop() finished.
[INFO] fastkafka._testing.in_memory_broker: AIOKafkaProducer patched stop() called
[INFO] fastkafka._testing.in_memory_broker: InMemoryBroker._stop() called
[INFO] fastkafka._testing.in_memory_broker: InMemoryBroker stopping
Recap​
We have created a Iris classification model and encapulated it into our
fastkafka application. The app will consume the IrisInputData from the
input_data
topic and produce the predictions to predictions
topic.
To test the app we have:
Created the app
Started our Tester class which mirrors the developed app topics for testing purpuoses
Sent IrisInputData message to
input_data
topicAsserted and checked that the developed iris classification service has reacted to IrisInputData message
Running the service​
The service can be started using builtin faskafka run
CLI command.
Before we can do that, we will concatenate the code snippets from above
and save them in a file "application.py"
# content of the "application.py" file
from contextlib import asynccontextmanager
from sklearn.datasets import load_iris
from sklearn.linear_model import LogisticRegression
from fastkafka import FastKafka
ml_models = {}
@asynccontextmanager
async def lifespan(app: FastKafka):
# Load the ML model
X, y = load_iris(return_X_y=True)
ml_models["iris_predictor"] = LogisticRegression(random_state=0, max_iter=500).fit(
X, y
)
yield
# Clean up the ML models and release the resources
ml_models.clear()
from pydantic import BaseModel, NonNegativeFloat, Field
class IrisInputData(BaseModel):
sepal_length: NonNegativeFloat = Field(
..., example=0.5, description="Sepal length in cm"
)
sepal_width: NonNegativeFloat = Field(
..., example=0.5, description="Sepal width in cm"
)
petal_length: NonNegativeFloat = Field(
..., example=0.5, description="Petal length in cm"
)
petal_width: NonNegativeFloat = Field(
..., example=0.5, description="Petal width in cm"
)
class IrisPrediction(BaseModel):
species: str = Field(..., example="setosa", description="Predicted species")
from fastkafka import FastKafka
kafka_brokers = {
"localhost": {
"url": "localhost",
"description": "local development kafka broker",
"port": 9092,
},
"production": {
"url": "kafka.airt.ai",
"description": "production kafka broker",
"port": 9092,
"protocol": "kafka-secure",
"security": {"type": "plain"},
},
}
kafka_app = FastKafka(
title="Iris predictions",
kafka_brokers=kafka_brokers,
lifespan=lifespan,
)
@kafka_app.consumes(topic="input_data", auto_offset_reset="latest")
async def on_input_data(msg: IrisInputData):
species_class = ml_models["iris_predictor"].predict(
[[msg.sepal_length, msg.sepal_width, msg.petal_length, msg.petal_width]]
)[0]
to_predictions(species_class)
@kafka_app.produces(topic="predictions")
def to_predictions(species_class: int) -> IrisPrediction:
iris_species = ["setosa", "versicolor", "virginica"]
prediction = IrisPrediction(species=iris_species[species_class])
return prediction
To run the service, you will need a running Kafka broker on localhost as
specified in the kafka_brokers
parameter above. We can start the Kafka
broker locally using the ApacheKafkaBroker
. Notice that the same
happens automatically in the Tester
as shown above.
[INFO] fastkafka._testing.apache_kafka_broker: ApacheKafkaBroker.start(): entering...
[WARNING] fastkafka._testing.apache_kafka_broker: ApacheKafkaBroker.start(): (<_UnixSelectorEventLoop running=True closed=False debug=False>) is already running!
[WARNING] fastkafka._testing.apache_kafka_broker: ApacheKafkaBroker.start(): calling nest_asyncio.apply()
[INFO] fastkafka._components.test_dependencies: Java is already installed.
[INFO] fastkafka._components.test_dependencies: But not exported to PATH, exporting...
[INFO] fastkafka._components.test_dependencies: Kafka is installed.
[INFO] fastkafka._components.test_dependencies: But not exported to PATH, exporting...
[INFO] fastkafka._testing.apache_kafka_broker: Starting zookeeper...
[INFO] fastkafka._testing.apache_kafka_broker: Starting kafka...
[INFO] fastkafka._testing.apache_kafka_broker: Local Kafka broker up and running on 127.0.0.1:9092
[INFO] fastkafka._testing.apache_kafka_broker: <class 'fastkafka.testing.ApacheKafkaBroker'>.start(): returning 127.0.0.1:9092
[INFO] fastkafka._testing.apache_kafka_broker: ApacheKafkaBroker.start(): exited.
'127.0.0.1:9092'
Then, we start the FastKafka service by running the following command in
the folder where the application.py
file is located:
fastkafka run --num-workers=2 --kafka-broker localhost application:kafka_app
In the above command, we use --num-workers
option to specify how many
workers to launch and we use --kafka-broker
option to specify which
kafka broker configuration to use from earlier specified kafka_brokers
[1200656]: [INFO] fastkafka._application.app: set_kafka_broker() : Setting bootstrap_servers value to 'localhost:9092'
[1200656]: [INFO] fastkafka._application.app: _create_producer() : created producer using the config: '{'bootstrap_servers': 'localhost:9092'}'
[1200656]: [INFO] fastkafka._components.aiokafka_consumer_loop: aiokafka_consumer_loop() starting...
[1200656]: [INFO] fastkafka._components.aiokafka_consumer_loop: aiokafka_consumer_loop(): Consumer created using the following parameters: {'bootstrap_servers': 'localhost:9092', 'auto_offset_reset': 'latest', 'max_poll_records': 100}
[1200656]: [INFO] fastkafka._components.aiokafka_consumer_loop: aiokafka_consumer_loop(): Consumer started.
[1200656]: [INFO] aiokafka.consumer.subscription_state: Updating subscribed topics to: frozenset({'input_data'})
[1200656]: [INFO] aiokafka.consumer.consumer: Subscribed to topic(s): {'input_data'}
[1200656]: [INFO] fastkafka._components.aiokafka_consumer_loop: aiokafka_consumer_loop(): Consumer subscribed.
[1200654]: [INFO] fastkafka._application.app: set_kafka_broker() : Setting bootstrap_servers value to 'localhost:9092'
[1200654]: [INFO] fastkafka._application.app: _create_producer() : created producer using the config: '{'bootstrap_servers': 'localhost:9092'}'
[1200656]: [WARNING] aiokafka.cluster: Topic input_data is not available during auto-create initialization
[1200656]: [INFO] aiokafka.consumer.group_coordinator: Metadata for topic has changed from {} to {'input_data': 0}.
[1200654]: [INFO] fastkafka._components.aiokafka_consumer_loop: aiokafka_consumer_loop() starting...
[1200654]: [INFO] fastkafka._components.aiokafka_consumer_loop: aiokafka_consumer_loop(): Consumer created using the following parameters: {'bootstrap_servers': 'localhost:9092', 'auto_offset_reset': 'latest', 'max_poll_records': 100}
[1200654]: [INFO] fastkafka._components.aiokafka_consumer_loop: aiokafka_consumer_loop(): Consumer started.
[1200654]: [INFO] aiokafka.consumer.subscription_state: Updating subscribed topics to: frozenset({'input_data'})
[1200654]: [INFO] aiokafka.consumer.consumer: Subscribed to topic(s): {'input_data'}
[1200654]: [INFO] fastkafka._components.aiokafka_consumer_loop: aiokafka_consumer_loop(): Consumer subscribed.
[1200654]: [WARNING] aiokafka.cluster: Topic input_data is not available during auto-create initialization
[1200654]: [INFO] aiokafka.consumer.group_coordinator: Metadata for topic has changed from {} to {'input_data': 0}.
[1200654]: [ERROR] aiokafka: Unable connect to node with id 0: [Errno 111] Connect call failed ('192.168.112.2', 9092)
[1200654]: [ERROR] aiokafka: Unable to update metadata from [0]
[1200656]: [ERROR] aiokafka: Unable connect to node with id 0: [Errno 111] Connect call failed ('192.168.112.2', 9092)
[1200656]: [ERROR] aiokafka: Unable to update metadata from [0]
^C
[1200656]: [INFO] fastkafka._components.aiokafka_consumer_loop: _aiokafka_consumer_loop(): Consumer loop shutting down, waiting for send_stream to drain...
[1200656]: [INFO] fastkafka._components.aiokafka_consumer_loop: aiokafka_consumer_loop(): Consumer stopped.
[1200656]: [INFO] fastkafka._components.aiokafka_consumer_loop: aiokafka_consumer_loop() finished.
[1200654]: [INFO] fastkafka._components.aiokafka_consumer_loop: _aiokafka_consumer_loop(): Consumer loop shutting down, waiting for send_stream to drain...
[1200654]: [INFO] fastkafka._components.aiokafka_consumer_loop: aiokafka_consumer_loop(): Consumer stopped.
[1200654]: [INFO] fastkafka._components.aiokafka_consumer_loop: aiokafka_consumer_loop() finished.
Starting process cleanup, this may take a few seconds...
[INFO] fastkafka._server: terminate_asyncio_process(): Terminating the process 1200654...
[INFO] fastkafka._server: terminate_asyncio_process(): Terminating the process 1200656...
You need to interupt running of the cell above by selecting
Runtime->Interupt execution
on the toolbar above.
Finally, we can stop the local Kafka Broker:
[INFO] fastkafka._testing.apache_kafka_broker: ApacheKafkaBroker.stop(): entering...
[INFO] fastkafka._components._subprocess: terminate_asyncio_process(): Terminating the process 1200193...
[INFO] fastkafka._components._subprocess: terminate_asyncio_process(): Process 1200193 was already terminated.
[INFO] fastkafka._components._subprocess: terminate_asyncio_process(): Terminating the process 1199820...
[INFO] fastkafka._components._subprocess: terminate_asyncio_process(): Process 1199820 was already terminated.
[INFO] fastkafka._testing.apache_kafka_broker: ApacheKafkaBroker.stop(): exited.
Documentation​
The kafka app comes with builtin documentation generation using AsyncApi HTML generator.
When running in Colab, we need to update Node.js first:
We need to install all dependancies for the generator using the following command line:
fastkafka docs install_deps
[INFO] fastkafka._components.docs_dependencies: AsyncAPI generator installed
To generate the documentation programatically you just need to call the folloving command:
fastkafka docs generate application:kafka_app
[INFO] fastkafka._components.asyncapi: New async specifications generated at: '/work/fastkafka/nbs/guides/asyncapi/spec/asyncapi.yml'
[INFO] fastkafka._components.asyncapi: Async docs generated at 'asyncapi/docs'
[INFO] fastkafka._components.asyncapi: Output of '$ npx -y -p @asyncapi/generator ag asyncapi/spec/asyncapi.yml @asyncapi/html-template -o asyncapi/docs --force-write'
Done! ✨
Check out your shiny new generated files at /work/fastkafka/nbs/guides/asyncapi/docs.
. This will generate the asyncapi folder in relative path where all your documentation will be saved. You can check out the content of it with:
ls -l asyncapi
total 8
drwxrwxr-x 4 kumaran kumaran 4096 Mar 21 10:09 docs
drwxrwxr-x 2 kumaran kumaran 4096 Mar 21 10:09 spec
In docs folder you will find the servable static html file of your
documentation. This can also be served using our fastkafka docs serve
CLI command (more on that in our guides).
In spec folder you will find a asyncapi.yml file containing the async API specification of your application.
We can locally preview the generated documentation by running the following command:
fastkafka docs serve application:kafka_app
[INFO] fastkafka._components.asyncapi: New async specifications generated at: '/work/fastkafka/nbs/guides/asyncapi/spec/asyncapi.yml'
[INFO] fastkafka._components.asyncapi: Async docs generated at 'asyncapi/docs'
[INFO] fastkafka._components.asyncapi: Output of '$ npx -y -p @asyncapi/generator ag asyncapi/spec/asyncapi.yml @asyncapi/html-template -o asyncapi/docs --force-write'
Done! ✨
Check out your shiny new generated files at /work/fastkafka/nbs/guides/asyncapi/docs.
Serving documentation on http://127.0.0.1:8000
^C
Interupting serving of documentation and cleaning up...
From the parameters passed to the application constructor, we get the documentation bellow:
from fastkafka import FastKafka
kafka_brokers = {
"localhost": {
"url": "localhost",
"description": "local development kafka broker",
"port": 9092,
},
"production": {
"url": "kafka.airt.ai",
"description": "production kafka broker",
"port": 9092,
"protocol": "kafka-secure",
"security": {"type": "plain"},
},
}
kafka_app = FastKafka(
title="Iris predictions",
kafka_brokers=kafka_brokers,
bootstrap_servers="localhost:9092",
)
The following documentation snippet are for the consumer as specified in the code above:
The following documentation snippet are for the producer as specified in the code above:
Finally, all messages as defined as subclasses of BaseModel are documented as well: