Skip to main content
Version: 0.5.0

Benchmarking FastKafka app

Prerequisites​

To benchmark a FastKafka project, you will need the following:

  1. A library built with FastKafka.
  2. A running Kafka instance to benchmark the FastKafka application against.

Creating FastKafka Code​

Let’s create a FastKafka-based application and write it to the application.py file based on the tutorial.

# content of the "application.py" file

from contextlib import asynccontextmanager

from sklearn.datasets import load_iris
from sklearn.linear_model import LogisticRegression

from fastkafka import FastKafka

ml_models = {}


@asynccontextmanager
async def lifespan(app: FastKafka):
# Load the ML model
X, y = load_iris(return_X_y=True)
ml_models["iris_predictor"] = LogisticRegression(random_state=0, max_iter=500).fit(
X, y
)
yield
# Clean up the ML models and release the resources
ml_models.clear()


from pydantic import BaseModel, NonNegativeFloat, Field

class IrisInputData(BaseModel):
sepal_length: NonNegativeFloat = Field(
..., example=0.5, description="Sepal length in cm"
)
sepal_width: NonNegativeFloat = Field(
..., example=0.5, description="Sepal width in cm"
)
petal_length: NonNegativeFloat = Field(
..., example=0.5, description="Petal length in cm"
)
petal_width: NonNegativeFloat = Field(
..., example=0.5, description="Petal width in cm"
)


class IrisPrediction(BaseModel):
species: str = Field(..., example="setosa", description="Predicted species")

from fastkafka import FastKafka

kafka_brokers = {
"localhost": {
"url": "localhost",
"description": "local development kafka broker",
"port": 9092,
},
"production": {
"url": "kafka.airt.ai",
"description": "production kafka broker",
"port": 9092,
"protocol": "kafka-secure",
"security": {"type": "plain"},
},
}

kafka_app = FastKafka(
title="Iris predictions",
kafka_brokers=kafka_brokers,
lifespan=lifespan,
)

@kafka_app.consumes(topic="input_data", auto_offset_reset="latest")
async def on_input_data(msg: IrisInputData):
species_class = ml_models["iris_predictor"].predict(
[[msg.sepal_length, msg.sepal_width, msg.petal_length, msg.petal_width]]
)[0]

await to_predictions(species_class)


@kafka_app.produces(topic="predictions")
async def to_predictions(species_class: int) -> IrisPrediction:
iris_species = ["setosa", "versicolor", "virginica"]

prediction = IrisPrediction(species=iris_species[species_class])
return prediction

FastKafka has a decorator for benchmarking which is appropriately called as benchmark. Let’s edit our application.py file and add the benchmark decorator to the consumes method.

# content of the "application.py" file with benchmark

from contextlib import asynccontextmanager

from sklearn.datasets import load_iris
from sklearn.linear_model import LogisticRegression

from fastkafka import FastKafka

ml_models = {}


@asynccontextmanager
async def lifespan(app: FastKafka):
# Load the ML model
X, y = load_iris(return_X_y=True)
ml_models["iris_predictor"] = LogisticRegression(random_state=0, max_iter=500).fit(
X, y
)
yield
# Clean up the ML models and release the resources
ml_models.clear()


from pydantic import BaseModel, NonNegativeFloat, Field

class IrisInputData(BaseModel):
sepal_length: NonNegativeFloat = Field(
..., example=0.5, description="Sepal length in cm"
)
sepal_width: NonNegativeFloat = Field(
..., example=0.5, description="Sepal width in cm"
)
petal_length: NonNegativeFloat = Field(
..., example=0.5, description="Petal length in cm"
)
petal_width: NonNegativeFloat = Field(
..., example=0.5, description="Petal width in cm"
)


class IrisPrediction(BaseModel):
species: str = Field(..., example="setosa", description="Predicted species")

from fastkafka import FastKafka

kafka_brokers = {
"localhost": {
"url": "localhost",
"description": "local development kafka broker",
"port": 9092,
},
"production": {
"url": "kafka.airt.ai",
"description": "production kafka broker",
"port": 9092,
"protocol": "kafka-secure",
"security": {"type": "plain"},
},
}

kafka_app = FastKafka(
title="Iris predictions",
kafka_brokers=kafka_brokers,
lifespan=lifespan,
)

@kafka_app.consumes(topic="input_data", auto_offset_reset="latest")
@kafka_app.benchmark(interval=1, sliding_window_size=5)
async def on_input_data(msg: IrisInputData):
species_class = ml_models["iris_predictor"].predict(
[[msg.sepal_length, msg.sepal_width, msg.petal_length, msg.petal_width]]
)[0]

await to_predictions(species_class)


@kafka_app.produces(topic="predictions")
async def to_predictions(species_class: int) -> IrisPrediction:
iris_species = ["setosa", "versicolor", "virginica"]

prediction = IrisPrediction(species=iris_species[species_class])
return prediction

Here we are conducting a benchmark of a function that consumes data from the input_data topic with an interval of 1 second and a sliding window size of 5.

This benchmark method uses the interval parameter to calculate the results over a specific time period, and the sliding_window_size parameter to determine the maximum number of results to use in calculating the average throughput and standard deviation.

This benchmark is important to ensure that the function is performing optimally and to identify any areas for improvement.

Starting Kafka​

If you already have a Kafka running somewhere, then you can skip this step.

Please keep in mind that your benchmarking results may be affected by bottlenecks such as network, CPU cores in the Kafka machine, or even the Kafka configuration itself.

Installing Java and Kafka​

We need a working Kafkainstance to benchmark our FastKafka app, and to run Kafka we need Java. Thankfully, FastKafka comes with a CLI to install both Java and Kafka on our machine.

So, let’s install Java and Kafka by executing the following command.

fastkafka testing install_deps

The above command will extract Kafka scripts at the location “\$HOME/.local/kafka_2.13-3.3.2" on your machine.

Creating configuration for Zookeeper and Kafka​

Now we need to start Zookeeper and Kafka separately, and to start them we need zookeeper.properties and kafka.properties files.

Let’s create a folder inside the folder where Kafka scripts were extracted and change directory into it.

mkdir $HOME/.local/kafka_2.13-3.3.2/data_dir && cd $HOME/.local/kafka_2.13-3.3.2/data_dir

Let’s create a file called zookeeper.properties and write the following content to the file:

dataDir=$HOME/.local/kafka_2.13-3.3.2/data_dir/zookeeper
clientPort=2181
maxClientCnxns=0

Similarly, let’s create a file called kafka.properties and write the following content to the file:

broker.id=0
listeners=PLAINTEXT://:9092

num.network.threads=3
num.io.threads=8
socket.send.buffer.bytes=102400
socket.receive.buffer.bytes=102400
socket.request.max.bytes=104857600

num.partitions=1
num.recovery.threads.per.data.dir=1
offsets.topic.replication.factor=1
transaction.state.log.replication.factor=1
transaction.state.log.min.isr=1

log.dirs=$HOME/.local/kafka_2.13-3.3.2/data_dir/kafka_logs
log.flush.interval.messages=10000
log.flush.interval.ms=1000
log.retention.hours=168
log.retention.bytes=1073741824
log.segment.bytes=1073741824
log.retention.check.interval.ms=300000

zookeeper.connect=localhost:2181
zookeeper.connection.timeout.ms=18000

Starting Zookeeper and Kafka​

We need two different terminals to run Zookeeper in one and Kafka in another. Let’s open a new terminal and run the following commands to start Zookeeper:

export PATH=$PATH:$HOME/.jdk/jdk-11.0.18+10/bin
cd $HOME/.local/kafka_2.13-3.3.2/bin
./zookeeper-server-start.sh ../data_dir/zookeeper.properties

Once Zookeeper is up and running, open a new terminal and execute the follwing commands to start Kafka:

export PATH=$PATH:$HOME/.jdk/jdk-11.0.18+10/bin
cd $HOME/.local/kafka_2.13-3.3.2/bin
./kafka-server-start.sh ../data_dir/kafka.properties

Now we have both Zookeeper and Kafka up and running.

Creating topics in Kafka​

In a new terminal, please execute the following command to create necessary topics in Kafka:

export PATH=$PATH:$HOME/.jdk/jdk-11.0.18+10/bin
cd $HOME/.local/kafka_2.13-3.3.2/bin
./kafka-topics.sh --create --topic input_data --partitions 6 --bootstrap-server localhost:9092
./kafka-topics.sh --create --topic predictions --partitions 6 --bootstrap-server localhost:9092

Populating topics with dummy data​

To benchmark our FastKafka app, we need some data in Kafka topics.

In the same terminal, let’s create some dummy data:

yes '{"sepal_length": 0.7739560486, "sepal_width": 0.8636615789, "petal_length": 0.6122663046, "petal_width": 0.1338914722}' | head -n 1000000 > /tmp/test_data

This command will create a file called test_data in the tmp folder with one million rows of text. This will act as dummy data to populate the input_data topic.

Let’s populate the created topic input_data with the dummy data which we created above:

./kafka-console-producer.sh --bootstrap-server localhost:9092 --topic input_data < /tmp/test_data

Now our topic input_data has one million records/messages in it. If you want more messages in topic, you can simply execute the above command again and again.

Benchmarking FastKafka​

Once Zookeeper and Kafka are ready, benchmarking FastKafka app is as simple as running the fastkafka run command:

fastkafka run --num-workers 1 --kafka-broker localhost application:kafka_app

This command will start the FastKafka app and begin consuming messages from Kafka, which we spun up earlier. Additionally, the same command will output all of the benchmark throughputs based on the interval and sliding_window_size values.

The output for the fastkafka run command is:

[385814]: 23-04-07 10:49:18.380 [INFO] application: Current group id is ZDGTBVWVBBDMZCW
[385814]: 23-04-07 10:49:18.382 [INFO] fastkafka._application.app: set_kafka_broker() : Setting bootstrap_servers value to 'localhost:9092'
[385814]: 23-04-07 10:49:18.382 [INFO] fastkafka._application.app: _create_producer() : created producer using the config: '{'bootstrap_servers': 'localhost:9092'}'
[385814]: 23-04-07 10:49:18.387 [INFO] fastkafka._components.aiokafka_consumer_loop: aiokafka_consumer_loop() starting...
[385814]: 23-04-07 10:49:18.387 [INFO] fastkafka._components.aiokafka_consumer_loop: aiokafka_consumer_loop(): Consumer created using the following parameters: {'group_id': 'ZDGTBVWVBBDMZCW', 'auto_offset_reset': 'earliest', 'bootstrap_servers': 'localh
ost:9092', 'max_poll_records': 100}
[385814]: 23-04-07 10:49:18.390 [INFO] fastkafka._components.aiokafka_consumer_loop: aiokafka_consumer_loop(): Consumer started.
[385814]: 23-04-07 10:49:18.390 [INFO] aiokafka.consumer.subscription_state: Updating subscribed topics to: frozenset({'input_data'})
[385814]: 23-04-07 10:49:18.390 [INFO] aiokafka.consumer.consumer: Subscribed to topic(s): {'input_data'}
[385814]: 23-04-07 10:49:18.390 [INFO] fastkafka._components.aiokafka_consumer_loop: aiokafka_consumer_loop(): Consumer subscribed.
[385814]: 23-04-07 10:49:18.395 [INFO] aiokafka.consumer.group_coordinator: Discovered coordinator 0 for group ZDGTBVWVBBDMZCW
[385814]: 23-04-07 10:49:18.395 [INFO] aiokafka.consumer.group_coordinator: Revoking previously assigned partitions set() for group ZDGTBVWVBBDMZCW
[385814]: 23-04-07 10:49:18.395 [INFO] aiokafka.consumer.group_coordinator: (Re-)joining group ZDGTBVWVBBDMZCW
[385814]: 23-04-07 10:49:21.396 [INFO] aiokafka.consumer.group_coordinator: Joined group 'ZDGTBVWVBBDMZCW' (generation 1) with member_id aiokafka-0.8.0-b1f06560-6983-4d5e-a9af-8084e0e652cc
[385814]: 23-04-07 10:49:21.396 [INFO] aiokafka.consumer.group_coordinator: Elected group leader -- performing partition assignments using roundrobin
[385814]: 23-04-07 10:49:21.397 [INFO] aiokafka.consumer.group_coordinator: Successfully synced group ZDGTBVWVBBDMZCW with generation 1
[385814]: 23-04-07 10:49:21.397 [INFO] aiokafka.consumer.group_coordinator: Setting newly assigned partitions {TopicPartition(topic='input_data', partition=0), TopicPartition(topic='input_data', partition=1), TopicPartition(topic='input_data', partition
=2), TopicPartition(topic='input_data', partition=3)} for group ZDGTBVWVBBDMZCW
[385814]: 23-04-07 10:49:22.409 [INFO] fastkafka.benchmark: Throughput = 93,598, Avg throughput = 93,598 - For application.on_input_data(interval=1,sliding_window_size=5)
[385814]: 23-04-07 10:49:23.409 [INFO] fastkafka.benchmark: Throughput = 91,847, Avg throughput = 92,723 - For application.on_input_data(interval=1,sliding_window_size=5)
[385814]: 23-04-07 10:49:24.409 [INFO] fastkafka.benchmark: Throughput = 92,948, Avg throughput = 92,798 - For application.on_input_data(interval=1,sliding_window_size=5)
[385814]: 23-04-07 10:49:25.409 [INFO] fastkafka.benchmark: Throughput = 93,227, Avg throughput = 92,905 - For application.on_input_data(interval=1,sliding_window_size=5)
[385814]: 23-04-07 10:49:26.409 [INFO] fastkafka.benchmark: Throughput = 93,553, Avg throughput = 93,035 - For application.on_input_data(interval=1,sliding_window_size=5)
[385814]: 23-04-07 10:49:27.409 [INFO] fastkafka.benchmark: Throughput = 92,699, Avg throughput = 92,855 - For application.on_input_data(interval=1,sliding_window_size=5)
[385814]: 23-04-07 10:49:28.409 [INFO] fastkafka.benchmark: Throughput = 92,716, Avg throughput = 93,029 - For application.on_input_data(interval=1,sliding_window_size=5)
[385814]: 23-04-07 10:49:29.409 [INFO] fastkafka.benchmark: Throughput = 92,897, Avg throughput = 93,019 - For application.on_input_data(interval=1,sliding_window_size=5)
[385814]: 23-04-07 10:49:30.409 [INFO] fastkafka.benchmark: Throughput = 92,854, Avg throughput = 92,944 - For application.on_input_data(interval=1,sliding_window_size=5)
[385814]: 23-04-07 10:49:31.410 [INFO] fastkafka.benchmark: Throughput = 92,672, Avg throughput = 92,768 - For application.on_input_data(interval=1,sliding_window_size=5)

Based on the output, when using 1 worker, our FastKafka app achieved a throughput of 93k messages per second and an average throughput of 93k messages per second.