Encoding and Decoding Kafka Messages with FastKafka
Prerequisites​
- A basic knowledge of
FastKafka
is needed to proceed with this guide. If you are not familiar withFastKafka
, please go through the tutorial first. FastKafka
with its dependencies installed is needed. Please installFastKafka
using the command -pip install fastkafka
Ways to Encode and Decode Messages with FastKafka​
In python, by default, we send Kafka messages as bytes. Even if our message is a string, we convert it to bytes and then send it to Kafka topic. imilarly, while consuming messages, we consume them as bytes and then convert them to strings.
In FastKafka, we specify message schema using Pydantic models as mentioned in tutorial:
# Define Pydantic models for Kafka messages
from pydantic import BaseModel, NonNegativeFloat, Field
class IrisInputData(BaseModel):
sepal_length: NonNegativeFloat = Field(
..., example=0.5, description="Sepal length in cm"
)
sepal_width: NonNegativeFloat = Field(
..., example=0.5, description="Sepal width in cm"
)
petal_length: NonNegativeFloat = Field(
..., example=0.5, description="Petal length in cm"
)
petal_width: NonNegativeFloat = Field(
..., example=0.5, description="Petal width in cm"
)
class IrisPrediction(BaseModel):
species: str = Field(..., example="setosa", description="Predicted species")
Then, we send and receive messages as instances of Pydantic models which we defined. So, FastKafka needs a way to encode/decode to these Pydantic model messages to bytes in order to send/receive messages to/from Kafka topics.
The @consumes
and @produces
methods of FastKafka accept a parameter
called decoder
/encoder
to decode/encode Kafka messages. FastKafka
provides three ways to encode and decode messages:
- json - This is the default encoder/decoder option in FastKafka. While producing, this option converts our instance of Pydantic model messages to a JSON string and then converts it to bytes before sending it to the topic. While consuming, it converts bytes to a JSON string and then constructs an instance of Pydantic model from the JSON string.
- avro - This option uses Avro encoding/decoding to convert instances of Pydantic model messages to bytes while producing, and while consuming, it constructs an instance of Pydantic model from bytes.
- custom encoder/decoder - If you are not happy with the json or avro encoder/decoder options, you can write your own encoder/decoder functions and use them to encode/decode Pydantic messages.
1. Json encoder and decoder​
The default option in FastKafka is json encoder/decoder. This option, while producing, converts our instance of pydantic model messages to json string and then converts to bytes before sending it to the topics. While consuming it converts bytes to json string and then constructs instance of pydantic model from json string.
We can use the application from tutorial as is, and it will use the json encoder/decoder by default. But, for clarity, let’s modify it to explicitly accept the ‘json’ encoder/decoder parameter:
# content of the "application.py" file
from contextlib import asynccontextmanager
from sklearn.datasets import load_iris
from sklearn.linear_model import LogisticRegression
from fastkafka import FastKafka
ml_models = {}
@asynccontextmanager
async def lifespan(app: FastKafka):
# Load the ML model
X, y = load_iris(return_X_y=True)
ml_models["iris_predictor"] = LogisticRegression(random_state=0, max_iter=500).fit(
X, y
)
yield
# Clean up the ML models and release the resources
ml_models.clear()
from pydantic import BaseModel, NonNegativeFloat, Field
class IrisInputData(BaseModel):
sepal_length: NonNegativeFloat = Field(
..., example=0.5, description="Sepal length in cm"
)
sepal_width: NonNegativeFloat = Field(
..., example=0.5, description="Sepal width in cm"
)
petal_length: NonNegativeFloat = Field(
..., example=0.5, description="Petal length in cm"
)
petal_width: NonNegativeFloat = Field(
..., example=0.5, description="Petal width in cm"
)
class IrisPrediction(BaseModel):
species: str = Field(..., example="setosa", description="Predicted species")
from fastkafka import FastKafka
kafka_brokers = {
"localhost": {
"url": "localhost",
"description": "local development kafka broker",
"port": 9092,
},
"production": {
"url": "kafka.airt.ai",
"description": "production kafka broker",
"port": 9092,
"protocol": "kafka-secure",
"security": {"type": "plain"},
},
}
kafka_app = FastKafka(
title="Iris predictions",
kafka_brokers=kafka_brokers,
lifespan=lifespan,
)
@kafka_app.consumes(topic="input_data", decoder="json")
async def on_input_data(msg: IrisInputData):
species_class = ml_models["iris_predictor"].predict(
[[msg.sepal_length, msg.sepal_width, msg.petal_length, msg.petal_width]]
)[0]
await to_predictions(species_class)
@kafka_app.produces(topic="predictions", encoder="json")
async def to_predictions(species_class: int) -> IrisPrediction:
iris_species = ["setosa", "versicolor", "virginica"]
prediction = IrisPrediction(species=iris_species[species_class])
return prediction
In the above code, the @kafka_app.consumes
decorator sets up a
consumer for the “input_data" topic, using the ‘json’ decoder to convert
the message payload to an instance of IrisInputData
. The
@kafka_app.produces
decorator sets up a producer for the “predictions"
topic, using the ‘json’ encoder to convert the instance of
IrisPrediction
to message payload.
2. Avro encoder and decoder​
What is Avro?​
Avro is a row-oriented remote procedure call and data serialization framework developed within Apache’s Hadoop project. It uses JSON for defining data types and protocols, and serializes data in a compact binary format. To learn more about the Apache Avro, please check out the docs.
Installing FastKafka with Avro dependencies​
FastKafka
with dependencies for Apache Avro installed is needed to use avro
encoder/decoder. Please install
FastKafka
with Avro support using the command - pip install fastkafka[avro]
Defining Avro Schema Using Pydantic Models​
By default, you can use Pydantic model to define your message schemas. FastKafka internally takes care of encoding and decoding avro messages, based on the Pydantic models.
So, similar to the tutorial, the message schema will remain as it is.
# Define Pydantic models for Avro messages
from pydantic import BaseModel, NonNegativeFloat, Field
class IrisInputData(BaseModel):
sepal_length: NonNegativeFloat = Field(
..., example=0.5, description="Sepal length in cm"
)
sepal_width: NonNegativeFloat = Field(
..., example=0.5, description="Sepal width in cm"
)
petal_length: NonNegativeFloat = Field(
..., example=0.5, description="Petal length in cm"
)
petal_width: NonNegativeFloat = Field(
..., example=0.5, description="Petal width in cm"
)
class IrisPrediction(BaseModel):
species: str = Field(..., example="setosa", description="Predicted species")
No need to change anything to support avro. You can use existing Pydantic models as is.
Reusing existing avro schema​
If you are using some other library to send and receive avro encoded messages, it is highly likely that you already have an Avro schema defined.
Building pydantic models from avro schema dictionary​
Let’s modify the above example and let’s assume we have schemas already
for IrisInputData
and IrisPrediction
which will look like below:
iris_input_data_schema = {
"type": "record",
"namespace": "IrisInputData",
"name": "IrisInputData",
"fields": [
{"doc": "Sepal length in cm", "type": "double", "name": "sepal_length"},
{"doc": "Sepal width in cm", "type": "double", "name": "sepal_width"},
{"doc": "Petal length in cm", "type": "double", "name": "petal_length"},
{"doc": "Petal width in cm", "type": "double", "name": "petal_width"},
],
}
iris_prediction_schema = {
"type": "record",
"namespace": "IrisPrediction",
"name": "IrisPrediction",
"fields": [{"doc": "Predicted species", "type": "string", "name": "species"}],
}
We can easily construct pydantic models from avro schema using
avsc_to_pydantic
function which is included as part of
FastKafka
itself.
from fastkafka.encoder import avsc_to_pydantic
IrisInputData = avsc_to_pydantic(iris_input_data_schema)
print(IrisInputData.__fields__)
IrisPrediction = avsc_to_pydantic(iris_prediction_schema)
print(IrisPrediction.__fields__)
The above code will convert avro schema to pydantic models and will print pydantic models’ fields. The output of the above is:
{'sepal_length': ModelField(name='sepal_length', type=float, required=True),
'sepal_width': ModelField(name='sepal_width', type=float, required=True),
'petal_length': ModelField(name='petal_length', type=float, required=True),
'petal_width': ModelField(name='petal_width', type=float, required=True)}
{'species': ModelField(name='species', type=str, required=True)}
This is exactly same as manually defining the pydantic models ourselves.
You don’t have to worry about not making any mistakes while converting
avro schema to pydantic models manually. You can easily and
automatically accomplish it by using
avsc_to_pydantic
function as demonstrated above.
Building pydantic models from .avsc
file​
Not all cases will have avro schema conveniently defined as a python
dictionary. You may have it stored as the proprietary .avsc
files in
filesystem. Let’s see how to convert those .avsc
files to pydantic
models.
Let’s assume our avro files are stored in files called
iris_input_data_schema.avsc
and iris_prediction_schema.avsc
. In that
case, following code converts the schema to pydantic models:
import json
from fastkafka.encoder import avsc_to_pydantic
with open("iris_input_data_schema.avsc", "rb") as f:
iris_input_data_schema = json.load(f)
with open("iris_prediction_schema.avsc", "rb") as f:
iris_prediction_schema = json.load(f)
IrisInputData = avsc_to_pydantic(iris_input_data_schema)
print(IrisInputData.__fields__)
IrisPrediction = avsc_to_pydantic(iris_prediction_schema)
print(IrisPrediction.__fields__)
Consume/Produce avro messages with FastKafka​
FastKafka
provides @consumes
and @produces
methods to consume/produces
messages to/from a Kafka
topic. This is explained in
tutorial.
The @consumes
and @produces
methods accepts a parameter called
decoder
/encoder
to decode/encode avro messages.
@kafka_app.consumes(topic="input_data", encoder="avro")
async def on_input_data(msg: IrisInputData):
species_class = ml_models["iris_predictor"].predict(
[[msg.sepal_length, msg.sepal_width, msg.petal_length, msg.petal_width]]
)[0]
await to_predictions(species_class)
@kafka_app.produces(topic="predictions", decoder="avro")
async def to_predictions(species_class: int) -> IrisPrediction:
iris_species = ["setosa", "versicolor", "virginica"]
prediction = IrisPrediction(species=iris_species[species_class])
return prediction
In the above example, in @consumes
and @produces
methods, we
explicitly instruct FastKafka to decode
and encode
messages using
the avro
decoder
/encoder
instead of the default json
decoder
/encoder
.
Assembling it all together​
Let’s rewrite the sample code found in
tutorial to use avro
to decode
and
encode
messages:
# content of the "application.py" file
from contextlib import asynccontextmanager
from sklearn.datasets import load_iris
from sklearn.linear_model import LogisticRegression
from fastkafka import FastKafka
ml_models = {}
@asynccontextmanager
async def lifespan(app: FastKafka):
# Load the ML model
X, y = load_iris(return_X_y=True)
ml_models["iris_predictor"] = LogisticRegression(random_state=0, max_iter=500).fit(
X, y
)
yield
# Clean up the ML models and release the resources
ml_models.clear()
iris_input_data_schema = {
"type": "record",
"namespace": "IrisInputData",
"name": "IrisInputData",
"fields": [
{"doc": "Sepal length in cm", "type": "double", "name": "sepal_length"},
{"doc": "Sepal width in cm", "type": "double", "name": "sepal_width"},
{"doc": "Petal length in cm", "type": "double", "name": "petal_length"},
{"doc": "Petal width in cm", "type": "double", "name": "petal_width"},
],
}
iris_prediction_schema = {
"type": "record",
"namespace": "IrisPrediction",
"name": "IrisPrediction",
"fields": [{"doc": "Predicted species", "type": "string", "name": "species"}],
}
# Or load schema from avsc files
from fastkafka.encoder import avsc_to_pydantic
IrisInputData = avsc_to_pydantic(iris_input_data_schema)
IrisPrediction = avsc_to_pydantic(iris_prediction_schema)
from fastkafka import FastKafka
kafka_brokers = {
"localhost": {
"url": "localhost",
"description": "local development kafka broker",
"port": 9092,
},
"production": {
"url": "kafka.airt.ai",
"description": "production kafka broker",
"port": 9092,
"protocol": "kafka-secure",
"security": {"type": "plain"},
},
}
kafka_app = FastKafka(
title="Iris predictions",
kafka_brokers=kafka_brokers,
lifespan=lifespan,
)
@kafka_app.consumes(topic="input_data", decoder="avro")
async def on_input_data(msg: IrisInputData):
species_class = ml_models["iris_predictor"].predict(
[[msg.sepal_length, msg.sepal_width, msg.petal_length, msg.petal_width]]
)[0]
await to_predictions(species_class)
@kafka_app.produces(topic="predictions", encoder="avro")
async def to_predictions(species_class: int) -> IrisPrediction:
iris_species = ["setosa", "versicolor", "virginica"]
prediction = IrisPrediction(species=iris_species[species_class])
return prediction
The above code is a sample implementation of using FastKafka to consume
and produce Avro-encoded messages from/to a Kafka topic. The code
defines two Avro schemas for the input data and the prediction result.
It then uses the
avsc_to_pydantic
function from the FastKafka library to convert the Avro schema into
Pydantic models, which will be used to decode and encode Avro messages.
The
FastKafka
class is then instantiated with the broker details, and two functions
decorated with @kafka_app.consumes
and @kafka_app.produces
are
defined to consume messages from the “input_data" topic and produce
messages to the “predictions" topic, respectively. The functions uses
the decoder=“avro" and encoder=“avro" parameters to decode and encode
the Avro messages.
In summary, the above code demonstrates a straightforward way to use Avro-encoded messages with FastKafka to build a message processing pipeline.
3. Custom encoder and decoder​
If you are not happy with the json or avro encoder/decoder options, you can write your own encoder/decoder functions and use them to encode/decode Pydantic messages.
Writing a custom encoder and decoder​
In this section, let’s see how to write a custom encoder and decoder which obfuscates kafka message with simple ROT13 cipher.
import codecs
import json
from typing import Any
from pydantic.main import ModelMetaclass
def custom_encoder(msg: BaseModel) -> bytes:
msg_str = msg.json()
obfuscated = codecs.encode(msg_str, 'rot13')
raw_bytes = obfuscated.encode("utf-8")
return raw_bytes
def custom_decoder(raw_msg: bytes, cls: ModelMetaclass) -> Any:
obfuscated = raw_msg.decode("utf-8")
msg_str = codecs.decode(obfuscated, 'rot13')
msg_dict = json.loads(msg_str)
return cls(**msg_dict)
The above code defines two custom functions for encoding and decoding messages in a Kafka application using the FastKafka library.
The encoding function, custom_encoder()
, takes a message msg
which
is an instance of a Pydantic model, converts it to a JSON string using
the json()
method, obfuscates the resulting string using the ROT13
algorithm from the codecs
module, and finally encodes the obfuscated
string as raw bytes using the UTF-8 encoding.
The decoding function, custom_decoder()
, takes a raw message raw_msg
in bytes format, a Pydantic class to construct instance with cls
parameter. It first decodes the raw message from UTF-8 encoding, then
uses the ROT13 algorithm to de-obfuscate the string. Finally, it loads
the resulting JSON string using the json.loads()
method and returns a
new instance of the specified cls
class initialized with the decoded
dictionary.
These functions can be used with FastKafka’s encoder
and decoder
parameters to customize the serialization and deserialization of
messages in Kafka topics.
Let’s test the above code
i = IrisInputData(sepal_length=0.5, sepal_width=0.5, petal_length=0.5, petal_width=0.5)
encoded = custom_encoder(i)
display(encoded)
decoded = custom_decoder(encoded, IrisInputData)
display(decoded)
This will result in following output
b'{"frcny_yratgu": 0.5, "frcny_jvqgu": 0.5, "crgny_yratgu": 0.5, "crgny_jvqgu": 0.5}'
IrisInputData(sepal_length=0.5, sepal_width=0.5, petal_length=0.5, petal_width=0.5)
Assembling it all together​
Let’s rewrite the sample code found in tutorial to use our custom decoder and encoder functions:
# content of the "application.py" file
from contextlib import asynccontextmanager
from sklearn.datasets import load_iris
from sklearn.linear_model import LogisticRegression
from fastkafka import FastKafka
ml_models = {}
@asynccontextmanager
async def lifespan(app: FastKafka):
# Load the ML model
X, y = load_iris(return_X_y=True)
ml_models["iris_predictor"] = LogisticRegression(random_state=0, max_iter=500).fit(
X, y
)
yield
# Clean up the ML models and release the resources
ml_models.clear()
from pydantic import BaseModel, NonNegativeFloat, Field
class IrisInputData(BaseModel):
sepal_length: NonNegativeFloat = Field(
..., example=0.5, description="Sepal length in cm"
)
sepal_width: NonNegativeFloat = Field(
..., example=0.5, description="Sepal width in cm"
)
petal_length: NonNegativeFloat = Field(
..., example=0.5, description="Petal length in cm"
)
petal_width: NonNegativeFloat = Field(
..., example=0.5, description="Petal width in cm"
)
class IrisPrediction(BaseModel):
species: str = Field(..., example="setosa", description="Predicted species")
import codecs
import json
from typing import Any
from pydantic.main import ModelMetaclass
def custom_encoder(msg: BaseModel) -> bytes:
msg_str = msg.json()
obfuscated = codecs.encode(msg_str, 'rot13')
raw_bytes = obfuscated.encode("utf-8")
return raw_bytes
def custom_decoder(raw_msg: bytes, cls: ModelMetaclass) -> Any:
obfuscated = raw_msg.decode("utf-8")
msg_str = codecs.decode(obfuscated, 'rot13')
msg_dict = json.loads(msg_str)
return cls(**msg_dict)
from fastkafka import FastKafka
kafka_brokers = {
"localhost": {
"url": "localhost",
"description": "local development kafka broker",
"port": 9092,
},
"production": {
"url": "kafka.airt.ai",
"description": "production kafka broker",
"port": 9092,
"protocol": "kafka-secure",
"security": {"type": "plain"},
},
}
kafka_app = FastKafka(
title="Iris predictions",
kafka_brokers=kafka_brokers,
lifespan=lifespan,
)
@kafka_app.consumes(topic="input_data", decoder=custom_decoder)
async def on_input_data(msg: IrisInputData):
species_class = ml_models["iris_predictor"].predict(
[[msg.sepal_length, msg.sepal_width, msg.petal_length, msg.petal_width]]
)[0]
await to_predictions(species_class)
@kafka_app.produces(topic="predictions", encoder=custom_encoder)
async def to_predictions(species_class: int) -> IrisPrediction:
iris_species = ["setosa", "versicolor", "virginica"]
prediction = IrisPrediction(species=iris_species[species_class])
return prediction
This code defines a custom encoder and decoder functions for encoding and decoding messages sent through a Kafka messaging system.
The custom encoder
function takes a message represented as a
BaseModel
and encodes it as bytes by first converting it to a JSON
string and then obfuscating it using the ROT13 encoding. The obfuscated
message is then converted to bytes using UTF-8 encoding and returned.
The custom decoder
function takes in the bytes representing an
obfuscated message, decodes it using UTF-8 encoding, then decodes the
ROT13 obfuscation, and finally loads it as a dictionary using the json
module. This dictionary is then converted to a BaseModel
instance
using the cls parameter.