Skip to main content
Version: 0.8.0

@consumes basics

You can use @consumes decorator to consume messages from Kafka topics.

In this guide we will create a simple FastKafka app that will consume HelloWorld messages from hello_world topic.

Import FastKafka​

To use the @consumes decorator, first we need to import the base FastKafka app to create our application.

from fastkafka import FastKafka

In this demo we will log the messages to the output so that we can inspect and verify that our app is consuming properly. For that we need to import the logger.

from fastkafka._components.logger import get_logger

logger = get_logger(__name__)

Define the structure of the messages​

Next, you need to define the structure of the messages you want to consume from the topic using pydantic. For the guide we’ll stick to something basic, but you are free to define any complex message structure you wish in your project, just make sure it can be JSON encoded.

Let’s import BaseModel and Field from pydantic and create a simple HelloWorld class containing one string parameter msg

from pydantic import BaseModel, Field
class HelloWorld(BaseModel):
msg: str = Field(
...,
example="Hello",
description="Demo hello world message",
)

Create a base FastKafka app​

Now we will create and define a base FastKafka app, replace the <url_of_your_kafka_bootstrap_server> and <port_of_your_kafka_bootstrap_server> with the actual values of your Kafka bootstrap server

kafka_brokers = {
"demo_broker": {
"url": "<url_of_your_kafka_bootstrap_server>",
"description": "local demo kafka broker",
"port": "<port_of_your_kafka_bootstrap_server>",
}
}

app = FastKafka(kafka_brokers=kafka_brokers)

Create a consumer function and decorate it with @consumes​

Let’s create a consumer function that will consume HelloWorld messages from hello_world topic and log them.

@app.consumes()
async def on_hello_world(msg: HelloWorld):
logger.info(f"Got msg: {msg}")

The function decorated with the @consumes decorator will be called when a message is produced to Kafka.

The message will then be injected into the typed msg argument of the function and its type will be used to parse the message.

In this example case, when the message is sent into a hello_world topic, it will be parsed into a HelloWorld class and on_hello_world function will be called with the parsed class as msg argument value.

Final app​

Your app code should look like this:

from fastkafka import FastKafka
from pydantic import BaseModel, Field
from fastkafka._components.logger import get_logger

logger = get_logger(__name__)

class HelloWorld(BaseModel):
msg: str = Field(
...,
example="Hello",
description="Demo hello world message",
)

kafka_brokers = {
"demo_broker": {
"url": "<url_of_your_kafka_bootstrap_server>",
"description": "local demo kafka broker",
"port": "<port_of_your_kafka_bootstrap_server>",
}
}

app = FastKafka(kafka_brokers=kafka_brokers)
@app.consumes()
async def on_hello_world(msg: HelloWorld):
logger.info(f"Got msg: {msg}")

Run the app​

Now we can run the app. Copy the code above in consumer_example.py and run it by running

fastkafka run --num-workers=1 --kafka-broker=demo_broker consumer_example:app

After running the command, you should see this output in your terminal:

[14442]: 23-06-15 07:16:00.564 [INFO] fastkafka._components.aiokafka_consumer_loop: aiokafka_consumer_loop() starting...
[14442]: 23-06-15 07:16:00.564 [INFO] fastkafka._components.aiokafka_consumer_loop: aiokafka_consumer_loop(): Consumer created using the following parameters: {'auto_offset_reset': 'earliest', 'max_poll_records': 100, 'bootstrap_servers': '127.0.0.1:9092'}
[14442]: 23-06-15 07:16:00.577 [INFO] fastkafka._components.aiokafka_consumer_loop: aiokafka_consumer_loop(): Consumer started.
[14442]: 23-06-15 07:16:00.577 [INFO] aiokafka.consumer.subscription_state: Updating subscribed topics to: frozenset({'hello_world'})
[14442]: 23-06-15 07:16:00.577 [INFO] aiokafka.consumer.consumer: Subscribed to topic(s): {'hello_world'}
[14442]: 23-06-15 07:16:00.577 [INFO] fastkafka._components.aiokafka_consumer_loop: aiokafka_consumer_loop(): Consumer subscribed.
[14442]: 23-06-15 07:16:00.585 [INFO] aiokafka.consumer.group_coordinator: Metadata for topic has changed from {} to {'hello_world': 1}.
Starting process cleanup, this may take a few seconds...
23-06-15 07:16:04.626 [INFO] fastkafka._components._subprocess: terminate_asyncio_process(): Terminating the process 14442...
[14442]: 23-06-15 07:16:05.735 [INFO] fastkafka._components.aiokafka_consumer_loop: aiokafka_consumer_loop(): Consumer stopped.
[14442]: 23-06-15 07:16:05.735 [INFO] fastkafka._components.aiokafka_consumer_loop: aiokafka_consumer_loop() finished.
23-06-15 07:16:05.853 [INFO] fastkafka._components._subprocess: terminate_asyncio_process(): Process 14442 terminated.

Send the message to kafka topic​

Lets send a HelloWorld message to the hello_world topic and check if our consumer kafka application has logged the received message. In your terminal, run:

echo { \"msg\": \"Hello world\" } | kafka-console-producer.sh --topic=hello_world --bootstrap-server=<addr_of_your_kafka_bootstrap_server>
[15588]: 23-06-15 07:16:15.282 [INFO] fastkafka._components.aiokafka_consumer_loop: aiokafka_consumer_loop() starting...
[15588]: 23-06-15 07:16:15.282 [INFO] fastkafka._components.aiokafka_consumer_loop: aiokafka_consumer_loop(): Consumer created using the following parameters: {'auto_offset_reset': 'earliest', 'max_poll_records': 100, 'bootstrap_servers': '127.0.0.1:9092'}
[15588]: 23-06-15 07:16:15.294 [INFO] fastkafka._components.aiokafka_consumer_loop: aiokafka_consumer_loop(): Consumer started.
[15588]: 23-06-15 07:16:15.294 [INFO] aiokafka.consumer.subscription_state: Updating subscribed topics to: frozenset({'hello_world'})
[15588]: 23-06-15 07:16:15.295 [INFO] aiokafka.consumer.consumer: Subscribed to topic(s): {'hello_world'}
[15588]: 23-06-15 07:16:15.295 [INFO] fastkafka._components.aiokafka_consumer_loop: aiokafka_consumer_loop(): Consumer subscribed.
[15588]: 23-06-15 07:16:15.302 [INFO] aiokafka.consumer.group_coordinator: Metadata for topic has changed from {} to {'hello_world': 1}.
[15588]: 23-06-15 07:16:25.867 [INFO] consumer_example: Got msg: msg='Hello world'
Starting process cleanup, this may take a few seconds...
23-06-15 07:16:34.168 [INFO] fastkafka._components._subprocess: terminate_asyncio_process(): Terminating the process 15588...
[15588]: 23-06-15 07:16:35.358 [INFO] fastkafka._components.aiokafka_consumer_loop: aiokafka_consumer_loop(): Consumer stopped.
[15588]: 23-06-15 07:16:35.359 [INFO] fastkafka._components.aiokafka_consumer_loop: aiokafka_consumer_loop() finished.
23-06-15 07:16:35.475 [INFO] fastkafka._components._subprocess: terminate_asyncio_process(): Process 15588 terminated.

You should see the “Got msg: msg='Hello world'" being logged by your consumer.

Choosing a topic​

You probably noticed that you didn’t define which topic you are receiving the message from, this is because the @consumes decorator determines the topic by default from your function name. The decorator will take your function name and strip the default “on_" prefix from it and use the rest as the topic name. In this example case, the topic is hello_world.

You can choose your custom prefix by defining the prefix parameter in consumes decorator, like this:

@app.consumes(prefix="read_from_")
async def read_from_hello_world(msg: HelloWorld):
logger.info(f"Got msg: {msg}")

Also, you can define the topic name completely by defining the topic in parameter in consumes decorator, like this:

@app.consumes(topic="my_special_topic")
async def on_hello_world(msg: HelloWorld):
logger.info(f"Got msg: {msg}")

Message data​

The message received from kafka is translated from binary JSON representation int the class defined by typing of msg parameter in the function decorated by the @consumes decorator.

In this example case, the message will be parsed into a HelloWorld class.

Message metadata​

If you need any of Kafka message metadata such as timestamp, partition or headers you can access the metadata by adding a EventMetadata typed argument to your consumes function and the metadata from the incoming message will be automatically injected when calling the consumes function.

Let’s demonstrate that.

Create a consumer function with metadata​

The only difference from the original basic consume function is that we are now passing the meta: EventMetadata argument to the function. The @consumes decorator will register that and, when a message is consumed, it will also pass the metadata to your function. Now you can use the metadata in your consume function. Lets log it to see what it contains.

First, we need to import the EventMetadata

from fastkafka import EventMetadata

Now we can add the meta argument to our consuming function.

@app.consumes()
async def on_hello_world(msg: HelloWorld, meta: EventMetadata):
logger.info(f"Got metadata: {meta}")

Your final app should look like this:

from fastkafka import FastKafka
from pydantic import BaseModel, Field
from fastkafka import EventMetadata
from fastkafka._components.logger import get_logger

logger = get_logger(__name__)
class HelloWorld(BaseModel):
msg: str = Field(
...,
example="Hello",
description="Demo hello world message",
)
kafka_brokers = {
"demo_broker": {
"url": "<url_of_your_kafka_bootstrap_server>",
"description": "local demo kafka broker",
"port": "<port_of_your_kafka_bootstrap_server>",
}
}

app = FastKafka(kafka_brokers=kafka_brokers)
@app.consumes()
async def on_hello_world(msg: HelloWorld, meta: EventMetadata):
logger.info(f"Got metadata: {meta}")

Now lets run the app and send a message to the broker to see the logged message metadata.

You should see a similar log as the one below and the metadata being logged in your app.

[20050]: 23-06-15 07:18:55.661 [INFO] fastkafka._components.aiokafka_consumer_loop: aiokafka_consumer_loop() starting...
[20050]: 23-06-15 07:18:55.661 [INFO] fastkafka._components.aiokafka_consumer_loop: aiokafka_consumer_loop(): Consumer created using the following parameters: {'auto_offset_reset': 'earliest', 'max_poll_records': 100, 'bootstrap_servers': '127.0.0.1:9092'}
[20050]: 23-06-15 07:18:55.675 [INFO] fastkafka._components.aiokafka_consumer_loop: aiokafka_consumer_loop(): Consumer started.
[20050]: 23-06-15 07:18:55.675 [INFO] aiokafka.consumer.subscription_state: Updating subscribed topics to: frozenset({'hello_world'})
[20050]: 23-06-15 07:18:55.675 [INFO] aiokafka.consumer.consumer: Subscribed to topic(s): {'hello_world'}
[20050]: 23-06-15 07:18:55.675 [INFO] fastkafka._components.aiokafka_consumer_loop: aiokafka_consumer_loop(): Consumer subscribed.
[20050]: 23-06-15 07:18:55.682 [INFO] aiokafka.consumer.group_coordinator: Metadata for topic has changed from {} to {'hello_world': 1}.
[20050]: 23-06-15 07:19:06.337 [INFO] consumer_example: Got metadata: EventMetadata(topic='hello_world', partition=0, offset=0, timestamp=1686813546255, timestamp_type=0, key=None, value=b'{ "msg": "Hello world" }', checksum=None, serialized_key_size=-1, serialized_value_size=24, headers=())
Starting process cleanup, this may take a few seconds...
23-06-15 07:19:14.547 [INFO] fastkafka._components._subprocess: terminate_asyncio_process(): Terminating the process 20050...
[20050]: 23-06-15 07:19:15.630 [INFO] fastkafka._components.aiokafka_consumer_loop: aiokafka_consumer_loop(): Consumer stopped.
[20050]: 23-06-15 07:19:15.630 [INFO] fastkafka._components.aiokafka_consumer_loop: aiokafka_consumer_loop() finished.
23-06-15 07:19:15.742 [INFO] fastkafka._components._subprocess: terminate_asyncio_process(): Process 20050 terminated.

As you can see in the log, from the metadata you now have the information about the partition, offset, timestamp, key and headers. 🎉

Dealing with high latency consuming functions​

If your functions have high latency due to, for example, lengthy database calls you will notice a big decrease in performance. This is due to the issue of how the consumes decorator executes your consume functions when consuming events. By default, the consume function will run the consuming funtions for one topic sequentially, this is the most straightforward approach and results with the least amount of overhead.

But, to handle those high latency tasks and run them in parallel, FastKafka has a DynamicTaskExecutor prepared for your consumers. This executor comes with additional overhead, so use it only when you need to handle high latency functions.

Lets demonstrate how to use it.

To your consumes decorator, add an executor option and set it to "DynamicTaskExecutor", this will enable the consumer to handle high latency functions effectively.

Your consuming function should now look like this:

@app.consumes(executor="DynamicTaskExecutor")
async def on_hello_world(msg: HelloWorld):
logger.info(f"Got msg: {msg}")

And the complete app should now look like this:

from fastkafka import FastKafka
from pydantic import BaseModel, Field
from fastkafka._components.logger import get_logger

logger = get_logger(__name__)

class HelloWorld(BaseModel):
msg: str = Field(
...,
example="Hello",
description="Demo hello world message",
)

kafka_brokers = {
"demo_broker": {
"url": "<url_of_your_kafka_bootstrap_server>",
"description": "local demo kafka broker",
"port": "<port_of_your_kafka_bootstrap_server>",
}
}

app = FastKafka(kafka_brokers=kafka_brokers)

@app.consumes(executor="DynamicTaskExecutor")
async def on_hello_world(msg: HelloWorld):
logger.info(f"Got msg: {msg}")

You can now run your app using the CLI commands described in this guide.

Lets send a HelloWorld message to the hello_world topic and check if our consumer kafka application has logged the received message. In your terminal, run:

echo { \"msg\": \"Hello world\" } | kafka-console-producer.sh --topic=hello_world --bootstrap-server=<addr_of_your_kafka_bootstrap_server>

You should see a similar log as the one below.

[21539]: 23-06-15 07:19:25.135 [INFO] fastkafka._components.aiokafka_consumer_loop: aiokafka_consumer_loop() starting...
[21539]: 23-06-15 07:19:25.135 [INFO] fastkafka._components.aiokafka_consumer_loop: aiokafka_consumer_loop(): Consumer created using the following parameters: {'auto_offset_reset': 'earliest', 'max_poll_records': 100, 'bootstrap_servers': '127.0.0.1:9092'}
[21539]: 23-06-15 07:19:25.147 [INFO] fastkafka._components.aiokafka_consumer_loop: aiokafka_consumer_loop(): Consumer started.
[21539]: 23-06-15 07:19:25.147 [INFO] aiokafka.consumer.subscription_state: Updating subscribed topics to: frozenset({'hello_world'})
[21539]: 23-06-15 07:19:25.147 [INFO] aiokafka.consumer.consumer: Subscribed to topic(s): {'hello_world'}
[21539]: 23-06-15 07:19:25.147 [INFO] fastkafka._components.aiokafka_consumer_loop: aiokafka_consumer_loop(): Consumer subscribed.
[21539]: 23-06-15 07:19:25.154 [INFO] aiokafka.consumer.group_coordinator: Metadata for topic has changed from {} to {'hello_world': 1}.
[21539]: 23-06-15 07:19:35.512 [INFO] consumer_example: Got msg: msg='Hello world'
Starting process cleanup, this may take a few seconds...
23-06-15 07:19:44.023 [INFO] fastkafka._components._subprocess: terminate_asyncio_process(): Terminating the process 21539...
[21539]: 23-06-15 07:19:45.202 [INFO] fastkafka._components.aiokafka_consumer_loop: aiokafka_consumer_loop(): Consumer stopped.
[21539]: 23-06-15 07:19:45.203 [INFO] fastkafka._components.aiokafka_consumer_loop: aiokafka_consumer_loop() finished.
23-06-15 07:19:45.313 [INFO] fastkafka._components._subprocess: terminate_asyncio_process(): Process 21539 terminated.

Inside the log, you should see the “Got msg: msg='Hello world'" being logged by your consumer.