@consumes basics
You can use @consumes
decorator to consume messages from Kafka topics.
In this guide we will create a simple FastKafka app that will consume
HelloWorld
messages from hello_world topic.
Import FastKafka
​
To use the @consumes
decorator, first we need to import the base
FastKafka app to create our application.
from fastkafka import FastKafka
Define the structure of the messages​
Next, you need to define the structure of the messages you want to consume from the topic using pydantic. For the guide we’ll stick to something basic, but you are free to define any complex message structure you wish in your project, just make sure it can be JSON encoded.
Let’s import BaseModel
and Field
from pydantic and create a simple
HelloWorld
class containing one string parameter msg
from pydantic import BaseModel, Field
class HelloWorld(BaseModel):
msg: str = Field(
...,
example="Hello",
description="Demo hello world message",
)
Create a base FastKafka app​
Now we will create and define a base FastKafka app, replace the
<url_of_your_kafka_bootstrap_server>
and
<port_of_your_kafka_bootstrap_server>
with the actual values of your
Kafka bootstrap server
kafka_brokers = {
"demo_broker": {
"url": "<url_of_your_kafka_bootstrap_server>",
"description": "local demo kafka broker",
"port": "<port_of_your_kafka_bootstrap_server>",
}
}
app = FastKafka(kafka_brokers=kafka_brokers)
Create a consumer function and decorate it with @consumes
​
Let’s create a consumer function that will consume HelloWorld
messages
from hello_world topic and log them.
from fastkafka._components.logger import get_logger
logger = get_logger(__name__)
@app.consumes()
async def on_hello_world(msg: HelloWorld):
logger.info(f"Got msg: {msg}")
The function decorated with the @consumes
decorator will be called
when a message is produced to Kafka.
The message will then be injected into the typed msg argument of the function and its type will be used to parse the message.
In this example case, when the message is sent into a hello_world
topic, it will be parsed into a HelloWorld class and on_hello_world
function will be called with the parsed class as msg argument value.
Final app​
Your app code should look like this:
from fastkafka import FastKafka
from pydantic import BaseModel, Field
class HelloWorld(BaseModel):
msg: str = Field(
...,
example="Hello",
description="Demo hello world message",
)
kafka_brokers = {
"demo_broker": {
"url": "<url_of_your_kafka_bootstrap_server>",
"description": "local demo kafka broker",
"port": "<port_of_your_kafka_bootstrap_server>",
}
}
app = FastKafka(kafka_brokers=kafka_brokers)
from fastkafka._components.logger import get_logger
logger = get_logger(__name__)
@app.consumes()
async def on_hello_world(msg: HelloWorld):
logger.info(f"Got msg: {msg}")
Run the app​
Now we can run the app. Copy the code above in consumer_example.py and run it by running
fastkafka run --num-workers=1 --kafka-broker=demo_broker consumer_example:app
After running the command, you should see this output in your terminal:
[382372]: [INFO] fastkafka._application.app: set_kafka_broker() : Setting bootstrap_servers value to '127.0.0.1:9092'
[382372]: [INFO] fastkafka._components.aiokafka_consumer_loop: aiokafka_consumer_loop() starting...
[382372]: [INFO] fastkafka._components.aiokafka_consumer_loop: aiokafka_consumer_loop(): Consumer created using the following parameters: {'bootstrap_servers': '127.0.0.1:9092', 'auto_offset_reset': 'earliest', 'max_poll_records': 100}
[382372]: [INFO] fastkafka._components.aiokafka_consumer_loop: aiokafka_consumer_loop(): Consumer started.
[382372]: [INFO] aiokafka.consumer.subscription_state: Updating subscribed topics to: frozenset({'hello_world'})
[382372]: [INFO] aiokafka.consumer.consumer: Subscribed to topic(s): {'hello_world'}
[382372]: [INFO] fastkafka._components.aiokafka_consumer_loop: aiokafka_consumer_loop(): Consumer subscribed.
[382372]: [WARNING] aiokafka.cluster: Topic hello_world is not available during auto-create initialization
[382372]: [INFO] aiokafka.consumer.group_coordinator: Metadata for topic has changed from {} to {'hello_world': 0}.
Starting process cleanup, this may take a few seconds...
[INFO] fastkafka._server: terminate_asyncio_process(): Terminating the process 382372...
[382372]: [INFO] fastkafka._components.aiokafka_consumer_loop: aiokafka_consumer_loop(): Consumer stopped.
[382372]: [INFO] fastkafka._components.aiokafka_consumer_loop: aiokafka_consumer_loop() finished.
[INFO] fastkafka._server: terminate_asyncio_process(): Process 382372 terminated.
Send the message to kafka topic​
Lets send a HelloWorld
message to the hello_world topic and check if
our consumer kafka application has logged the received message. In your
terminal, run:
echo {\"msg\": \"Hello world\"} | kafka-console-producer.sh --topic=hello_world --bootstrap-server=<addr_of_your_kafka_bootstrap_server>
You should see the “Got msg: msg='Hello world'" being logged by your consumer.
Choosing a topic​
You probably noticed that you didn’t define which topic you are
receiving the message from, this is because the @consumes
decorator
determines the topic by default from your function name. The decorator
will take your function name and strip the default “on_" prefix from it
and use the rest as the topic name. In this example case, the topic is
hello_world.
You can choose your custom prefix by defining the prefix
parameter in
consumes decorator, like this:
from fastkafka._components.logger import get_logger
logger = get_logger(__name__)
@app.consumes(prefix="read_from_")
async def read_from_hello_world(msg: HelloWorld):
logger.info(f"Got msg: {msg}")
Also, you can define the topic name completely by defining the topic
in parameter in consumes decorator, like this:
from fastkafka._components.logger import get_logger
logger = get_logger(__name__)
@app.consumes(topic="my_special_topic")
async def on_hello_world(msg: HelloWorld):
logger.info(f"Got msg: {msg}")
Message data​
The message received from kafka is translated from binary JSON
representation int the class defined by typing of msg parameter in the
function decorated by the @consumes
decorator.
In this example case, the message will be parsed into a HelloWorld
class.