Skip to main content
Version: 0.7.1

Batch consuming

If you want to consume data in batches @consumes decorator makes that possible for you. By typing a consumed msg object as a list of messages the consumer will call your consuming function with a batch of messages consumed from a single partition. Let’s demonstrate that now.

Consume function with batching​

To consume messages in batches, you need to wrap you message type into a list and the @consumes decorator will take care of the rest for you. Your consumes function will be called with batches grouped by partition now.

@app.consumes(auto_offset_reset="earliest")
async def on_hello_world(msg: List[HelloWorld]):
logger.info(f"Got msg batch: {msg}")

App example​

We will modify the app example from @consumes basics guide to consume HelloWorld messages batch. The final app will look like this (make sure you replace the <url_of_your_kafka_bootstrap_server> and <port_of_your_kafka_bootstrap_server> with the actual values):


import asyncio
from typing import List
from pydantic import BaseModel, Field

from fastkafka import FastKafka
from fastkafka._components.logger import get_logger

logger = get_logger(__name__)

class HelloWorld(BaseModel):
msg: str = Field(
...,
example="Hello",
description="Demo hello world message",
)

kafka_brokers = {
"demo_broker": {
"url": "<url_of_your_kafka_bootstrap_server>",
"description": "local demo kafka broker",
"port": "<port_of_your_kafka_bootstrap_server>",
}
}

app = FastKafka(kafka_brokers=kafka_brokers)

@app.consumes(auto_offset_reset="earliest")
async def on_hello_world(msg: List[HelloWorld]):
logger.info(f"Got msg batch: {msg}")

Send the messages to kafka topic​

Lets send a couple of HelloWorld messages to the hello_world topic and check if our consumer kafka application has logged the received messages batch. In your terminal, run the following command at least two times to create multiple messages in your kafka queue:

echo {\"msg\": \"Hello world\"} | kafka-console-producer.sh --topic=hello_world --bootstrap-server=<addr_of_your_kafka_bootstrap_server>

Now we can run the app. Copy the code of the example app in consumer_example.py and run it by running

fastkafka run --num-workers=1 --kafka-broker=demo_broker consumer_example:app

You should see the your Kafka messages being logged in batches by your consumer.