Benchmarking FastKafka app
Prerequisites​
To benchmark a FastKafka
project, you will need the following:
- A library built with
FastKafka
. - A running
Kafka
instance to benchmark the FastKafka application against.
Creating FastKafka Code​
Let’s create a FastKafka
-based application and write it to the
application.py
file based on the tutorial.
# content of the "application.py" file
from contextlib import asynccontextmanager
from sklearn.datasets import load_iris
from sklearn.linear_model import LogisticRegression
from fastkafka import FastKafka
ml_models = {}
@asynccontextmanager
async def lifespan(app: FastKafka):
# Load the ML model
X, y = load_iris(return_X_y=True)
ml_models["iris_predictor"] = LogisticRegression(random_state=0, max_iter=500).fit(
X, y
)
yield
# Clean up the ML models and release the resources
ml_models.clear()
from pydantic import BaseModel, NonNegativeFloat, Field
class IrisInputData(BaseModel):
sepal_length: NonNegativeFloat = Field(
..., example=0.5, description="Sepal length in cm"
)
sepal_width: NonNegativeFloat = Field(
..., example=0.5, description="Sepal width in cm"
)
petal_length: NonNegativeFloat = Field(
..., example=0.5, description="Petal length in cm"
)
petal_width: NonNegativeFloat = Field(
..., example=0.5, description="Petal width in cm"
)
class IrisPrediction(BaseModel):
species: str = Field(..., example="setosa", description="Predicted species")
from fastkafka import FastKafka
kafka_brokers = {
"localhost": {
"url": "localhost",
"description": "local development kafka broker",
"port": 9092,
},
"production": {
"url": "kafka.airt.ai",
"description": "production kafka broker",
"port": 9092,
"protocol": "kafka-secure",
"security": {"type": "plain"},
},
}
kafka_app = FastKafka(
title="Iris predictions",
kafka_brokers=kafka_brokers,
lifespan=lifespan,
)
@kafka_app.consumes(topic="input_data", auto_offset_reset="latest")
async def on_input_data(msg: IrisInputData):
species_class = ml_models["iris_predictor"].predict(
[[msg.sepal_length, msg.sepal_width, msg.petal_length, msg.petal_width]]
)[0]
await to_predictions(species_class)
@kafka_app.produces(topic="predictions")
async def to_predictions(species_class: int) -> IrisPrediction:
iris_species = ["setosa", "versicolor", "virginica"]
prediction = IrisPrediction(species=iris_species[species_class])
return prediction
FastKafka
has a decorator for benchmarking which is appropriately
called as benchmark
. Let’s edit our application.py
file and add the
benchmark
decorator to the consumes method.
# content of the "application.py" file with benchmark
from contextlib import asynccontextmanager
from sklearn.datasets import load_iris
from sklearn.linear_model import LogisticRegression
from fastkafka import FastKafka
ml_models = {}
@asynccontextmanager
async def lifespan(app: FastKafka):
# Load the ML model
X, y = load_iris(return_X_y=True)
ml_models["iris_predictor"] = LogisticRegression(random_state=0, max_iter=500).fit(
X, y
)
yield
# Clean up the ML models and release the resources
ml_models.clear()
from pydantic import BaseModel, NonNegativeFloat, Field
class IrisInputData(BaseModel):
sepal_length: NonNegativeFloat = Field(
..., example=0.5, description="Sepal length in cm"
)
sepal_width: NonNegativeFloat = Field(
..., example=0.5, description="Sepal width in cm"
)
petal_length: NonNegativeFloat = Field(
..., example=0.5, description="Petal length in cm"
)
petal_width: NonNegativeFloat = Field(
..., example=0.5, description="Petal width in cm"
)
class IrisPrediction(BaseModel):
species: str = Field(..., example="setosa", description="Predicted species")
from fastkafka import FastKafka
kafka_brokers = {
"localhost": {
"url": "localhost",
"description": "local development kafka broker",
"port": 9092,
},
"production": {
"url": "kafka.airt.ai",
"description": "production kafka broker",
"port": 9092,
"protocol": "kafka-secure",
"security": {"type": "plain"},
},
}
kafka_app = FastKafka(
title="Iris predictions",
kafka_brokers=kafka_brokers,
lifespan=lifespan,
)
@kafka_app.consumes(topic="input_data", auto_offset_reset="latest")
@kafka_app.benchmark(interval=1, sliding_window_size=5)
async def on_input_data(msg: IrisInputData):
species_class = ml_models["iris_predictor"].predict(
[[msg.sepal_length, msg.sepal_width, msg.petal_length, msg.petal_width]]
)[0]
await to_predictions(species_class)
@kafka_app.produces(topic="predictions")
async def to_predictions(species_class: int) -> IrisPrediction:
iris_species = ["setosa", "versicolor", "virginica"]
prediction = IrisPrediction(species=iris_species[species_class])
return prediction
Here we are conducting a benchmark of a function that consumes data from
the input_data
topic with an interval of 1 second and a sliding window
size of 5.
This benchmark
method uses the interval
parameter to calculate the
results over a specific time period, and the sliding_window_size
parameter to determine the maximum number of results to use in
calculating the average throughput and standard deviation.
This benchmark is important to ensure that the function is performing optimally and to identify any areas for improvement.
Starting Kafka​
If you already have a Kafka
running somewhere, then you can skip this
step.
Please keep in mind that your benchmarking results may be affected by bottlenecks such as network, CPU cores in the Kafka machine, or even the Kafka configuration itself.
Installing Java and Kafka​
We need a working Kafka
instance to benchmark our FastKafka
app, and
to run Kafka
we need Java
. Thankfully, FastKafka
comes with a CLI
to install both Java
and Kafka
on our machine.
So, let’s install Java
and Kafka
by executing the following command.
fastkafka testing install_deps
The above command will extract Kafka
scripts at the location
“\$HOME/.local/kafka_2.13-3.3.2" on your machine.
Creating configuration for Zookeeper and Kafka​
Now we need to start Zookeeper
and Kafka
separately, and to start
them we need zookeeper.properties
and kafka.properties
files.
Let’s create a folder inside the folder where Kafka
scripts were
extracted and change directory into it.
mkdir $HOME/.local/kafka_2.13-3.3.2/data_dir && cd $HOME/.local/kafka_2.13-3.3.2/data_dir
Let’s create a file called zookeeper.properties
and write the
following content to the file:
dataDir=$HOME/.local/kafka_2.13-3.3.2/data_dir/zookeeper
clientPort=2181
maxClientCnxns=0
Similarly, let’s create a file called kafka.properties
and write the
following content to the file:
broker.id=0
listeners=PLAINTEXT://:9092
num.network.threads=3
num.io.threads=8
socket.send.buffer.bytes=102400
socket.receive.buffer.bytes=102400
socket.request.max.bytes=104857600
num.partitions=1
num.recovery.threads.per.data.dir=1
offsets.topic.replication.factor=1
transaction.state.log.replication.factor=1
transaction.state.log.min.isr=1
log.dirs=$HOME/.local/kafka_2.13-3.3.2/data_dir/kafka_logs
log.flush.interval.messages=10000
log.flush.interval.ms=1000
log.retention.hours=168
log.retention.bytes=1073741824
log.segment.bytes=1073741824
log.retention.check.interval.ms=300000
zookeeper.connect=localhost:2181
zookeeper.connection.timeout.ms=18000
Starting Zookeeper and Kafka​
We need two different terminals to run Zookeeper
in one and Kafka
in
another. Let’s open a new terminal and run the following commands to
start Zookeeper
:
export PATH=$PATH:$HOME/.jdk/jdk-11.0.18+10/bin
cd $HOME/.local/kafka_2.13-3.3.2/bin
./zookeeper-server-start.sh ../data_dir/zookeeper.properties
Once Zookeeper
is up and running, open a new terminal and execute the
follwing commands to start Kafka
:
export PATH=$PATH:$HOME/.jdk/jdk-11.0.18+10/bin
cd $HOME/.local/kafka_2.13-3.3.2/bin
./kafka-server-start.sh ../data_dir/kafka.properties
Now we have both Zookeeper
and Kafka
up and running.
Creating topics in Kafka​
In a new terminal, please execute the following command to create
necessary topics in Kafka
:
export PATH=$PATH:$HOME/.jdk/jdk-11.0.18+10/bin
cd $HOME/.local/kafka_2.13-3.3.2/bin
./kafka-topics.sh --create --topic input_data --partitions 6 --bootstrap-server localhost:9092
./kafka-topics.sh --create --topic predictions --partitions 6 --bootstrap-server localhost:9092
Populating topics with dummy data​
To benchmark our FastKafka
app, we need some data in Kafka
topics.
In the same terminal, let’s create some dummy data:
yes '{"sepal_length": 0.7739560486, "sepal_width": 0.8636615789, "petal_length": 0.6122663046, "petal_width": 0.1338914722}' | head -n 1000000 > /tmp/test_data
This command will create a file called test_data
in the tmp
folder
with one million rows of text. This will act as dummy data to populate
the input_data
topic.
Let’s populate the created topic input_data
with the dummy data which
we created above:
./kafka-console-producer.sh --bootstrap-server localhost:9092 --topic input_data < /tmp/test_data
Now our topic input_data
has one million records/messages in it. If
you want more messages in topic, you can simply execute the above
command again and again.
Benchmarking FastKafka​
Once Zookeeper
and Kafka
are ready, benchmarking FastKafka
app is
as simple as running the fastkafka run
command:
fastkafka run --num-workers 1 --kafka-broker localhost application:kafka_app
This command will start the FastKafka
app and begin consuming messages
from Kafka
, which we spun up earlier. Additionally, the same command
will output all of the benchmark throughputs based on the interval
and
sliding_window_size
values.
The output for the fastkafka run
command is:
[385814]: 23-04-07 10:49:18.380 [INFO] application: Current group id is ZDGTBVWVBBDMZCW
[385814]: 23-04-07 10:49:18.382 [INFO] fastkafka._application.app: set_kafka_broker() : Setting bootstrap_servers value to 'localhost:9092'
[385814]: 23-04-07 10:49:18.382 [INFO] fastkafka._application.app: _create_producer() : created producer using the config: '{'bootstrap_servers': 'localhost:9092'}'
[385814]: 23-04-07 10:49:18.387 [INFO] fastkafka._components.aiokafka_consumer_loop: aiokafka_consumer_loop() starting...
[385814]: 23-04-07 10:49:18.387 [INFO] fastkafka._components.aiokafka_consumer_loop: aiokafka_consumer_loop(): Consumer created using the following parameters: {'group_id': 'ZDGTBVWVBBDMZCW', 'auto_offset_reset': 'earliest', 'bootstrap_servers': 'localh
ost:9092', 'max_poll_records': 100}
[385814]: 23-04-07 10:49:18.390 [INFO] fastkafka._components.aiokafka_consumer_loop: aiokafka_consumer_loop(): Consumer started.
[385814]: 23-04-07 10:49:18.390 [INFO] aiokafka.consumer.subscription_state: Updating subscribed topics to: frozenset({'input_data'})
[385814]: 23-04-07 10:49:18.390 [INFO] aiokafka.consumer.consumer: Subscribed to topic(s): {'input_data'}
[385814]: 23-04-07 10:49:18.390 [INFO] fastkafka._components.aiokafka_consumer_loop: aiokafka_consumer_loop(): Consumer subscribed.
[385814]: 23-04-07 10:49:18.395 [INFO] aiokafka.consumer.group_coordinator: Discovered coordinator 0 for group ZDGTBVWVBBDMZCW
[385814]: 23-04-07 10:49:18.395 [INFO] aiokafka.consumer.group_coordinator: Revoking previously assigned partitions set() for group ZDGTBVWVBBDMZCW
[385814]: 23-04-07 10:49:18.395 [INFO] aiokafka.consumer.group_coordinator: (Re-)joining group ZDGTBVWVBBDMZCW
[385814]: 23-04-07 10:49:21.396 [INFO] aiokafka.consumer.group_coordinator: Joined group 'ZDGTBVWVBBDMZCW' (generation 1) with member_id aiokafka-0.8.0-b1f06560-6983-4d5e-a9af-8084e0e652cc
[385814]: 23-04-07 10:49:21.396 [INFO] aiokafka.consumer.group_coordinator: Elected group leader -- performing partition assignments using roundrobin
[385814]: 23-04-07 10:49:21.397 [INFO] aiokafka.consumer.group_coordinator: Successfully synced group ZDGTBVWVBBDMZCW with generation 1
[385814]: 23-04-07 10:49:21.397 [INFO] aiokafka.consumer.group_coordinator: Setting newly assigned partitions {TopicPartition(topic='input_data', partition=0), TopicPartition(topic='input_data', partition=1), TopicPartition(topic='input_data', partition
=2), TopicPartition(topic='input_data', partition=3)} for group ZDGTBVWVBBDMZCW
[385814]: 23-04-07 10:49:22.409 [INFO] fastkafka.benchmark: Throughput = 93,598, Avg throughput = 93,598 - For application.on_input_data(interval=1,sliding_window_size=5)
[385814]: 23-04-07 10:49:23.409 [INFO] fastkafka.benchmark: Throughput = 91,847, Avg throughput = 92,723 - For application.on_input_data(interval=1,sliding_window_size=5)
[385814]: 23-04-07 10:49:24.409 [INFO] fastkafka.benchmark: Throughput = 92,948, Avg throughput = 92,798 - For application.on_input_data(interval=1,sliding_window_size=5)
[385814]: 23-04-07 10:49:25.409 [INFO] fastkafka.benchmark: Throughput = 93,227, Avg throughput = 92,905 - For application.on_input_data(interval=1,sliding_window_size=5)
[385814]: 23-04-07 10:49:26.409 [INFO] fastkafka.benchmark: Throughput = 93,553, Avg throughput = 93,035 - For application.on_input_data(interval=1,sliding_window_size=5)
[385814]: 23-04-07 10:49:27.409 [INFO] fastkafka.benchmark: Throughput = 92,699, Avg throughput = 92,855 - For application.on_input_data(interval=1,sliding_window_size=5)
[385814]: 23-04-07 10:49:28.409 [INFO] fastkafka.benchmark: Throughput = 92,716, Avg throughput = 93,029 - For application.on_input_data(interval=1,sliding_window_size=5)
[385814]: 23-04-07 10:49:29.409 [INFO] fastkafka.benchmark: Throughput = 92,897, Avg throughput = 93,019 - For application.on_input_data(interval=1,sliding_window_size=5)
[385814]: 23-04-07 10:49:30.409 [INFO] fastkafka.benchmark: Throughput = 92,854, Avg throughput = 92,944 - For application.on_input_data(interval=1,sliding_window_size=5)
[385814]: 23-04-07 10:49:31.410 [INFO] fastkafka.benchmark: Throughput = 92,672, Avg throughput = 92,768 - For application.on_input_data(interval=1,sliding_window_size=5)
Based on the output, when using 1 worker, our FastKafka
app achieved a
throughput
of 93k messages per second and an average throughput
of
93k messages per second.