Skip to main content
Version: dev 🚧

Batch producing

If you want to send your data in batches @produces decorator makes that possible for you. By returning a list of messages you want to send in a batch the producer will collect the messages and send them in a batch to a Kafka broker.

This guide will demonstrate how to use this feature.

Return a batch from the producing function​

To define a batch that you want to produce to Kafka topic, you need to return the List of the messages that you want to be batched from your producing function.


from typing import List

@app.produces()
async def to_hello_world(msgs: List[str]) -> List[HelloWorld]:
return [HelloWorld(msg=msg) for msg in msgs]

In the example, we want to return the HelloWorld message class batch that is created from a list of msgs we passed into our producing function.

Lets also prepare a backgound task that will send a batch of “hello world" messages when the app starts.


@app.run_in_background()
async def prepare_and_send_hello_batch():
msgs=[f"Hello world {i}" for i in range(10)]
await to_hello_world(msgs)

App example​

We will modify the app example from @producer basics guide to return the HelloWorld batch. The final app will look like this (make sure you replace the <url_of_your_kafka_bootstrap_server> and <port_of_your_kafka_bootstrap_server> with the actual values):


import asyncio
from fastkafka import FastKafka
from pydantic import BaseModel, Field

class HelloWorld(BaseModel):
msg: str = Field(
...,
example="Hello",
description="Demo hello world message",
)

kafka_brokers = {
"demo_broker": {
"url": "<url_of_your_kafka_bootstrap_server>",
"description": "local demo kafka broker",
"port": "<port_of_your_kafka_bootstrap_server>",
}
}

app = FastKafka(kafka_brokers=kafka_brokers)

@app.run_in_background()
async def prepare_and_send_hello_batch():
msgs=[f"Hello world {i}" for i in range(10)]
await to_hello_world(msgs)

from typing import List

@app.produces()
async def to_hello_world(msgs: List[str]) -> List[HelloWorld]:
return [HelloWorld(msg=msg) for msg in msgs]

Run the app​

Now we can run the app. Copy the code above in producer_example.py and run it by running

fastkafka run --num-workers=1 --kafka-broker=demo_broker producer_with_key_example:app

After running the command, you should see this output in your terminal:

[46480]: [INFO] fastkafka._application.app: run_in_background() : Adding function 'prepare_and_send_hello_batch' as background task
[46480]: [INFO] fastkafka._application.app: set_kafka_broker() : Setting bootstrap_servers value to '127.0.0.1:9092'
[46480]: [INFO] fastkafka._application.app: _create_producer() : created producer using the config: '{'bootstrap_servers': '127.0.0.1:9092'}'
[46480]: [INFO] fastkafka._application.app: _populate_bg_tasks() : Starting background task 'prepare_and_send_hello_batch'
Starting process cleanup, this may take a few seconds...
[INFO] fastkafka._server: terminate_asyncio_process(): Terminating the process 46480...
[46480]: [INFO] fastkafka._application.app: _shutdown_bg_tasks() : Cancelling background task 'prepare_and_send_hello_batch'
[46480]: [INFO] fastkafka._application.app: _shutdown_bg_tasks() : Waiting for background task 'prepare_and_send_hello_batch' to finish
[46480]: [INFO] fastkafka._application.app: _shutdown_bg_tasks() : Execution finished for background task 'prepare_and_send_hello_batch'
[INFO] fastkafka._server: terminate_asyncio_process(): Process 46480 terminated.

Check if the batch was sent to the Kafka topic with the defined key​

Lets check the topic and see if there are “Hello world" messages in the hello_world topic. In your terminal run:

kafka-console-consumer.sh --topic=hello_world --from-beginning --bootstrap-server=<address_of_your_kafka_bootstrap_server>

You should see the batch of messages in your topic.

Batch key​

To define a key for your batch like in Defining a partition key guide you can wrap the returning value in a KafkaEvent class. To learn more about defining a partition ke and KafkaEvent class, please, have a look at Defining a partition key guide.

Let’s demonstrate that.

To define a key, we just need to modify our producing function, like this:


from typing import List
from fastkafka import KafkaEvent

@app.produces()
async def to_hello_world(msgs: List[str]) -> KafkaEvent[List[HelloWorld]]:
return KafkaEvent([HelloWorld(msg=msg) for msg in msgs], key=b"my_key")

Now our app looks like this:


import asyncio
from fastkafka import FastKafka
from pydantic import BaseModel, Field

class HelloWorld(BaseModel):
msg: str = Field(
...,
example="Hello",
description="Demo hello world message",
)

kafka_brokers = {
"demo_broker": {
"url": "<url_of_your_kafka_bootstrap_server>",
"description": "local demo kafka broker",
"port": "<port_of_your_kafka_bootstrap_server>",
}
}

app = FastKafka(kafka_brokers=kafka_brokers)

@app.run_in_background()
async def prepare_and_send_hello_batch():
msgs=[f"Hello world {i}" for i in range(10)]
await to_hello_world(msgs)

from typing import List
from fastkafka import KafkaEvent

@app.produces()
async def to_hello_world(msgs: List[str]) -> KafkaEvent[List[HelloWorld]]:
return KafkaEvent([HelloWorld(msg=msg) for msg in msgs], key=b"my_key")

Check if the batch was sent to the Kafka topic​

Lets check the topic and see if there are “Hello world" messages in the hello_world topic, containing a defined key. In your terminal run:

kafka-console-consumer.sh --topic=hello_world --property print.key=true --from-beginning --bootstrap-server=<address_of_your_kafka_bootstrap_server>

You should see the batch of messages with the defined key in your topic.