Skip to main content
Version: dev 🚧

Encoding and Decoding Kafka Messages with FastKafka

Prerequisites​

  1. A basic knowledge of FastKafka is needed to proceed with this guide. If you are not familiar with FastKafka, please go through the tutorial first.
  2. FastKafka with its dependencies installed is needed. Please install FastKafka using the command - pip install fastkafka

Ways to Encode and Decode Messages with FastKafka​

In python, by default, we send Kafka messages as bytes. Even if our message is a string, we convert it to bytes and then send it to Kafka topic. imilarly, while consuming messages, we consume them as bytes and then convert them to strings.

In FastKafka, we specify message schema using Pydantic models as mentioned in tutorial:

# Define Pydantic models for Kafka messages
from pydantic import BaseModel, NonNegativeFloat, Field

class IrisInputData(BaseModel):
sepal_length: NonNegativeFloat = Field(
..., example=0.5, description="Sepal length in cm"
)
sepal_width: NonNegativeFloat = Field(
..., example=0.5, description="Sepal width in cm"
)
petal_length: NonNegativeFloat = Field(
..., example=0.5, description="Petal length in cm"
)
petal_width: NonNegativeFloat = Field(
..., example=0.5, description="Petal width in cm"
)


class IrisPrediction(BaseModel):
species: str = Field(..., example="setosa", description="Predicted species")

Then, we send and receive messages as instances of Pydantic models which we defined. So, FastKafka needs a way to encode/decode to these Pydantic model messages to bytes in order to send/receive messages to/from Kafka topics.

The @consumes and @produces methods of FastKafka accept a parameter called decoder/encoder to decode/encode Kafka messages. FastKafka provides three ways to encode and decode messages:

  1. json - This is the default encoder/decoder option in FastKafka. While producing, this option converts our instance of Pydantic model messages to a JSON string and then converts it to bytes before sending it to the topic. While consuming, it converts bytes to a JSON string and then constructs an instance of Pydantic model from the JSON string.
  2. avro - This option uses Avro encoding/decoding to convert instances of Pydantic model messages to bytes while producing, and while consuming, it constructs an instance of Pydantic model from bytes.
  3. custom encoder/decoder - If you are not happy with the json or avro encoder/decoder options, you can write your own encoder/decoder functions and use them to encode/decode Pydantic messages.

1. Json encoder and decoder​

The default option in FastKafka is json encoder/decoder. This option, while producing, converts our instance of pydantic model messages to json string and then converts to bytes before sending it to the topics. While consuming it converts bytes to json string and then constructs instance of pydantic model from json string.

We can use the application from tutorial as is, and it will use the json encoder/decoder by default. But, for clarity, let’s modify it to explicitly accept the ‘json’ encoder/decoder parameter:

# content of the "application.py" file

from contextlib import asynccontextmanager

from sklearn.datasets import load_iris
from sklearn.linear_model import LogisticRegression

from fastkafka import FastKafka

ml_models = {}


@asynccontextmanager
async def lifespan(app: FastKafka):
# Load the ML model
X, y = load_iris(return_X_y=True)
ml_models["iris_predictor"] = LogisticRegression(random_state=0, max_iter=500).fit(
X, y
)
yield
# Clean up the ML models and release the resources
ml_models.clear()


from pydantic import BaseModel, NonNegativeFloat, Field

class IrisInputData(BaseModel):
sepal_length: NonNegativeFloat = Field(
..., example=0.5, description="Sepal length in cm"
)
sepal_width: NonNegativeFloat = Field(
..., example=0.5, description="Sepal width in cm"
)
petal_length: NonNegativeFloat = Field(
..., example=0.5, description="Petal length in cm"
)
petal_width: NonNegativeFloat = Field(
..., example=0.5, description="Petal width in cm"
)


class IrisPrediction(BaseModel):
species: str = Field(..., example="setosa", description="Predicted species")

from fastkafka import FastKafka

kafka_brokers = {
"localhost": {
"url": "localhost",
"description": "local development kafka broker",
"port": 9092,
},
"production": {
"url": "kafka.airt.ai",
"description": "production kafka broker",
"port": 9092,
"protocol": "kafka-secure",
"security": {"type": "plain"},
},
}

kafka_app = FastKafka(
title="Iris predictions",
kafka_brokers=kafka_brokers,
lifespan=lifespan,
)

@kafka_app.consumes(topic="input_data", decoder="json")
async def on_input_data(msg: IrisInputData):
species_class = ml_models["iris_predictor"].predict(
[[msg.sepal_length, msg.sepal_width, msg.petal_length, msg.petal_width]]
)[0]

await to_predictions(species_class)


@kafka_app.produces(topic="predictions", encoder="json")
async def to_predictions(species_class: int) -> IrisPrediction:
iris_species = ["setosa", "versicolor", "virginica"]

prediction = IrisPrediction(species=iris_species[species_class])
return prediction

In the above code, the @kafka_app.consumes decorator sets up a consumer for the “input_data" topic, using the ‘json’ decoder to convert the message payload to an instance of IrisInputData. The @kafka_app.produces decorator sets up a producer for the “predictions" topic, using the ‘json’ encoder to convert the instance of IrisPrediction to message payload.

2. Avro encoder and decoder​

What is Avro?​

Avro is a row-oriented remote procedure call and data serialization framework developed within Apache’s Hadoop project. It uses JSON for defining data types and protocols, and serializes data in a compact binary format. To learn more about the Apache Avro, please check out the docs.

Installing FastKafka with Avro dependencies​

FastKafka with dependencies for Apache Avro installed is needed to use avro encoder/decoder. Please install FastKafka with Avro support using the command - pip install fastkafka[avro]

Defining Avro Schema Using Pydantic Models​

By default, you can use Pydantic model to define your message schemas. FastKafka internally takes care of encoding and decoding avro messages, based on the Pydantic models.

So, similar to the tutorial, the message schema will remain as it is.

# Define Pydantic models for Avro messages
from pydantic import BaseModel, NonNegativeFloat, Field

class IrisInputData(BaseModel):
sepal_length: NonNegativeFloat = Field(
..., example=0.5, description="Sepal length in cm"
)
sepal_width: NonNegativeFloat = Field(
..., example=0.5, description="Sepal width in cm"
)
petal_length: NonNegativeFloat = Field(
..., example=0.5, description="Petal length in cm"
)
petal_width: NonNegativeFloat = Field(
..., example=0.5, description="Petal width in cm"
)


class IrisPrediction(BaseModel):
species: str = Field(..., example="setosa", description="Predicted species")

No need to change anything to support avro. You can use existing Pydantic models as is.

Reusing existing avro schema​

If you are using some other library to send and receive avro encoded messages, it is highly likely that you already have an Avro schema defined.

Building pydantic models from avro schema dictionary​

Let’s modify the above example and let’s assume we have schemas already for IrisInputData and IrisPrediction which will look like below:

iris_input_data_schema = {
"type": "record",
"namespace": "IrisInputData",
"name": "IrisInputData",
"fields": [
{"doc": "Sepal length in cm", "type": "double", "name": "sepal_length"},
{"doc": "Sepal width in cm", "type": "double", "name": "sepal_width"},
{"doc": "Petal length in cm", "type": "double", "name": "petal_length"},
{"doc": "Petal width in cm", "type": "double", "name": "petal_width"},
],
}
iris_prediction_schema = {
"type": "record",
"namespace": "IrisPrediction",
"name": "IrisPrediction",
"fields": [{"doc": "Predicted species", "type": "string", "name": "species"}],
}

We can easily construct pydantic models from avro schema using avsc_to_pydantic function which is included as part of FastKafka itself.

from fastkafka.encoder import avsc_to_pydantic

IrisInputData = avsc_to_pydantic(iris_input_data_schema)
print(IrisInputData.model_fields)

IrisPrediction = avsc_to_pydantic(iris_prediction_schema)
print(IrisPrediction.model_fields)

The above code will convert avro schema to pydantic models and will print pydantic models’ fields. The output of the above is:

{'sepal_length': ModelField(name='sepal_length', type=float, required=True),
'sepal_width': ModelField(name='sepal_width', type=float, required=True),
'petal_length': ModelField(name='petal_length', type=float, required=True),
'petal_width': ModelField(name='petal_width', type=float, required=True)}

{'species': ModelField(name='species', type=str, required=True)}

This is exactly same as manually defining the pydantic models ourselves. You don’t have to worry about not making any mistakes while converting avro schema to pydantic models manually. You can easily and automatically accomplish it by using avsc_to_pydantic function as demonstrated above.

Building pydantic models from .avsc file​

Not all cases will have avro schema conveniently defined as a python dictionary. You may have it stored as the proprietary .avsc files in filesystem. Let’s see how to convert those .avsc files to pydantic models.

Let’s assume our avro files are stored in files called iris_input_data_schema.avsc and iris_prediction_schema.avsc. In that case, following code converts the schema to pydantic models:

import json
from fastkafka.encoder import avsc_to_pydantic


with open("iris_input_data_schema.avsc", "rb") as f:
iris_input_data_schema = json.load(f)

with open("iris_prediction_schema.avsc", "rb") as f:
iris_prediction_schema = json.load(f)


IrisInputData = avsc_to_pydantic(iris_input_data_schema)
print(IrisInputData.model_fields)

IrisPrediction = avsc_to_pydantic(iris_prediction_schema)
print(IrisPrediction.model_fields)

Consume/Produce avro messages with FastKafka​

FastKafka provides @consumes and @produces methods to consume/produces messages to/from a Kafka topic. This is explained in tutorial.

The @consumes and @produces methods accepts a parameter called decoder/encoder to decode/encode avro messages.

@kafka_app.consumes(topic="input_data", encoder="avro")
async def on_input_data(msg: IrisInputData):
species_class = ml_models["iris_predictor"].predict(
[[msg.sepal_length, msg.sepal_width, msg.petal_length, msg.petal_width]]
)[0]

await to_predictions(species_class)


@kafka_app.produces(topic="predictions", decoder="avro")
async def to_predictions(species_class: int) -> IrisPrediction:
iris_species = ["setosa", "versicolor", "virginica"]

prediction = IrisPrediction(species=iris_species[species_class])
return prediction

In the above example, in @consumes and @produces methods, we explicitly instruct FastKafka to decode and encode messages using the avro decoder/encoder instead of the default json decoder/encoder.

Assembling it all together​

Let’s rewrite the sample code found in tutorial to use avro to decode and encode messages:

# content of the "application.py" file

from contextlib import asynccontextmanager

from sklearn.datasets import load_iris
from sklearn.linear_model import LogisticRegression

from fastkafka import FastKafka

ml_models = {}


@asynccontextmanager
async def lifespan(app: FastKafka):
# Load the ML model
X, y = load_iris(return_X_y=True)
ml_models["iris_predictor"] = LogisticRegression(random_state=0, max_iter=500).fit(
X, y
)
yield
# Clean up the ML models and release the resources
ml_models.clear()


iris_input_data_schema = {
"type": "record",
"namespace": "IrisInputData",
"name": "IrisInputData",
"fields": [
{"doc": "Sepal length in cm", "type": "double", "name": "sepal_length"},
{"doc": "Sepal width in cm", "type": "double", "name": "sepal_width"},
{"doc": "Petal length in cm", "type": "double", "name": "petal_length"},
{"doc": "Petal width in cm", "type": "double", "name": "petal_width"},
],
}
iris_prediction_schema = {
"type": "record",
"namespace": "IrisPrediction",
"name": "IrisPrediction",
"fields": [{"doc": "Predicted species", "type": "string", "name": "species"}],
}
# Or load schema from avsc files

from fastkafka.encoder import avsc_to_pydantic

IrisInputData = avsc_to_pydantic(iris_input_data_schema)
IrisPrediction = avsc_to_pydantic(iris_prediction_schema)


from fastkafka import FastKafka

kafka_brokers = {
"localhost": {
"url": "localhost",
"description": "local development kafka broker",
"port": 9092,
},
"production": {
"url": "kafka.airt.ai",
"description": "production kafka broker",
"port": 9092,
"protocol": "kafka-secure",
"security": {"type": "plain"},
},
}

kafka_app = FastKafka(
title="Iris predictions",
kafka_brokers=kafka_brokers,
lifespan=lifespan,
)

@kafka_app.consumes(topic="input_data", decoder="avro")
async def on_input_data(msg: IrisInputData):
species_class = ml_models["iris_predictor"].predict(
[[msg.sepal_length, msg.sepal_width, msg.petal_length, msg.petal_width]]
)[0]

await to_predictions(species_class)


@kafka_app.produces(topic="predictions", encoder="avro")
async def to_predictions(species_class: int) -> IrisPrediction:
iris_species = ["setosa", "versicolor", "virginica"]

prediction = IrisPrediction(species=iris_species[species_class])
return prediction

The above code is a sample implementation of using FastKafka to consume and produce Avro-encoded messages from/to a Kafka topic. The code defines two Avro schemas for the input data and the prediction result. It then uses the avsc_to_pydantic function from the FastKafka library to convert the Avro schema into Pydantic models, which will be used to decode and encode Avro messages.

The FastKafka class is then instantiated with the broker details, and two functions decorated with @kafka_app.consumes and @kafka_app.produces are defined to consume messages from the “input_data" topic and produce messages to the “predictions" topic, respectively. The functions uses the decoder=“avro" and encoder=“avro" parameters to decode and encode the Avro messages.

In summary, the above code demonstrates a straightforward way to use Avro-encoded messages with FastKafka to build a message processing pipeline.

3. Custom encoder and decoder​

If you are not happy with the json or avro encoder/decoder options, you can write your own encoder/decoder functions and use them to encode/decode Pydantic messages.

Writing a custom encoder and decoder​

In this section, let’s see how to write a custom encoder and decoder which obfuscates kafka message with simple ROT13 cipher.

import codecs
import json
from typing import Any, Type


def custom_encoder(msg: BaseModel) -> bytes:
msg_str = msg.json()
obfuscated = codecs.encode(msg_str, 'rot13')
raw_bytes = obfuscated.encode("utf-8")
return raw_bytes

def custom_decoder(raw_msg: bytes, cls: Type[BaseModel]) -> Any:
obfuscated = raw_msg.decode("utf-8")
msg_str = codecs.decode(obfuscated, 'rot13')
msg_dict = json.loads(msg_str)
return cls(**msg_dict)

The above code defines two custom functions for encoding and decoding messages in a Kafka application using the FastKafka library.

The encoding function, custom_encoder(), takes a message msg which is an instance of a Pydantic model, converts it to a JSON string using the json() method, obfuscates the resulting string using the ROT13 algorithm from the codecs module, and finally encodes the obfuscated string as raw bytes using the UTF-8 encoding.

The decoding function, custom_decoder(), takes a raw message raw_msg in bytes format, a Pydantic class to construct instance with cls parameter. It first decodes the raw message from UTF-8 encoding, then uses the ROT13 algorithm to de-obfuscate the string. Finally, it loads the resulting JSON string using the json.loads() method and returns a new instance of the specified cls class initialized with the decoded dictionary.

These functions can be used with FastKafka’s encoder and decoder parameters to customize the serialization and deserialization of messages in Kafka topics.

Let’s test the above code

i = IrisInputData(sepal_length=0.5, sepal_width=0.5, petal_length=0.5, petal_width=0.5)

encoded = custom_encoder(i)
display(encoded)

decoded = custom_decoder(encoded, IrisInputData)
display(decoded)

This will result in following output

b'{"frcny_yratgu": 0.5, "frcny_jvqgu": 0.5, "crgny_yratgu": 0.5, "crgny_jvqgu": 0.5}'

IrisInputData(sepal_length=0.5, sepal_width=0.5, petal_length=0.5, petal_width=0.5)

Assembling it all together​

Let’s rewrite the sample code found in tutorial to use our custom decoder and encoder functions:

# content of the "application.py" file

from contextlib import asynccontextmanager

from sklearn.datasets import load_iris
from sklearn.linear_model import LogisticRegression

from fastkafka import FastKafka

ml_models = {}


@asynccontextmanager
async def lifespan(app: FastKafka):
# Load the ML model
X, y = load_iris(return_X_y=True)
ml_models["iris_predictor"] = LogisticRegression(random_state=0, max_iter=500).fit(
X, y
)
yield
# Clean up the ML models and release the resources
ml_models.clear()


from pydantic import BaseModel, NonNegativeFloat, Field

class IrisInputData(BaseModel):
sepal_length: NonNegativeFloat = Field(
..., example=0.5, description="Sepal length in cm"
)
sepal_width: NonNegativeFloat = Field(
..., example=0.5, description="Sepal width in cm"
)
petal_length: NonNegativeFloat = Field(
..., example=0.5, description="Petal length in cm"
)
petal_width: NonNegativeFloat = Field(
..., example=0.5, description="Petal width in cm"
)


class IrisPrediction(BaseModel):
species: str = Field(..., example="setosa", description="Predicted species")


import codecs
import json
from typing import Any, Type


def custom_encoder(msg: BaseModel) -> bytes:
msg_str = msg.json()
obfuscated = codecs.encode(msg_str, 'rot13')
raw_bytes = obfuscated.encode("utf-8")
return raw_bytes

def custom_decoder(raw_msg: bytes, cls: Type[BaseModel]) -> Any:
obfuscated = raw_msg.decode("utf-8")
msg_str = codecs.decode(obfuscated, 'rot13')
msg_dict = json.loads(msg_str)
return cls(**msg_dict)


from fastkafka import FastKafka

kafka_brokers = {
"localhost": {
"url": "localhost",
"description": "local development kafka broker",
"port": 9092,
},
"production": {
"url": "kafka.airt.ai",
"description": "production kafka broker",
"port": 9092,
"protocol": "kafka-secure",
"security": {"type": "plain"},
},
}

kafka_app = FastKafka(
title="Iris predictions",
kafka_brokers=kafka_brokers,
lifespan=lifespan,
)

@kafka_app.consumes(topic="input_data", decoder=custom_decoder)
async def on_input_data(msg: IrisInputData):
species_class = ml_models["iris_predictor"].predict(
[[msg.sepal_length, msg.sepal_width, msg.petal_length, msg.petal_width]]
)[0]

await to_predictions(species_class)


@kafka_app.produces(topic="predictions", encoder=custom_encoder)
async def to_predictions(species_class: int) -> IrisPrediction:
iris_species = ["setosa", "versicolor", "virginica"]

prediction = IrisPrediction(species=iris_species[species_class])
return prediction

This code defines a custom encoder and decoder functions for encoding and decoding messages sent through a Kafka messaging system.

The custom encoder function takes a message represented as a BaseModel and encodes it as bytes by first converting it to a JSON string and then obfuscating it using the ROT13 encoding. The obfuscated message is then converted to bytes using UTF-8 encoding and returned.

The custom decoder function takes in the bytes representing an obfuscated message, decodes it using UTF-8 encoding, then decodes the ROT13 obfuscation, and finally loads it as a dictionary using the json module. This dictionary is then converted to a BaseModel instance using the cls parameter.