Batch consuming
If you want to consume data in batches @consumes
decorator makes that
possible for you. By typing a consumed msg object as a list
of
messages the consumer will call your consuming function with a batch of
messages consumed from a single partition. Let’s demonstrate that now.
Consume function with batching​
To consume messages in batches, you need to wrap you message type into a
list and the @consumes
decorator will take care of the rest for you.
Your consumes function will be called with batches grouped by partition
now.
@app.consumes(auto_offset_reset="earliest")
async def on_hello_world(msg: List[HelloWorld]):
logger.info(f"Got msg batch: {msg}")
App example​
We will modify the app example from @consumes
basics guide to consume
HelloWorld
messages batch. The final app will look like this (make
sure you replace the <url_of_your_kafka_bootstrap_server>
and
<port_of_your_kafka_bootstrap_server>
with the actual values):
import asyncio
from typing import List
from pydantic import BaseModel, Field
from fastkafka import FastKafka
from fastkafka._components.logger import get_logger
logger = get_logger(__name__)
class HelloWorld(BaseModel):
msg: str = Field(
...,
example="Hello",
description="Demo hello world message",
)
kafka_brokers = {
"demo_broker": {
"url": "<url_of_your_kafka_bootstrap_server>",
"description": "local demo kafka broker",
"port": "<port_of_your_kafka_bootstrap_server>",
}
}
app = FastKafka(kafka_brokers=kafka_brokers)
@app.consumes(auto_offset_reset="earliest")
async def on_hello_world(msg: List[HelloWorld]):
logger.info(f"Got msg batch: {msg}")
Send the messages to kafka topic​
Lets send a couple of HelloWorld
messages to the hello_world topic
and check if our consumer kafka application has logged the received
messages batch. In your terminal, run the following command at least two
times to create multiple messages in your kafka queue:
echo { ^"msg^": ^"Hello world^" }
echo { ^"msg^": ^"Hello world^" } | kafka-console-producer.bat --topic=hello_world --bootstrap-server=<addr_of_your_kafka_bootstrap_server>
Now we can run the app. Copy the code of the example app in consumer_example.py and run it by running
fastkafka run --num-workers=1 --kafka-broker=demo_broker consumer_example:app
You should see the your Kafka messages being logged in batches by your consumer.