Batch producing
If you want to send your data in batches @produces
decorator makes
that possible for you. By returning a list
of messages you want to
send in a batch the producer will collect the messages and send them in
a batch to a Kafka broker.
This guide will demonstrate how to use this feature.
Return a batch from the producing function​
To define a batch that you want to produce to Kafka topic, you need to
return the List
of the messages that you want to be batched from your
producing function.
from typing import List
@app.produces()
async def to_hello_world(msgs: List[str]) -> List[HelloWorld]:
return [HelloWorld(msg=msg) for msg in msgs]
In the example, we want to return the HelloWorld
message class batch
that is created from a list of msgs we passed into our producing
function.
Lets also prepare a backgound task that will send a batch of “hello world" messages when the app starts.
@app.run_in_background()
async def prepare_and_send_hello_batch():
msgs=[f"Hello world {i}" for i in range(10)]
await to_hello_world(msgs)
App example​
We will modify the app example from @producer
basics guide to return the
HelloWorld
batch. The final app will look like this (make sure you
replace the <url_of_your_kafka_bootstrap_server>
and
<port_of_your_kafka_bootstrap_server>
with the actual values):
import asyncio
from fastkafka import FastKafka
from pydantic import BaseModel, Field
class HelloWorld(BaseModel):
msg: str = Field(
...,
example="Hello",
description="Demo hello world message",
)
kafka_brokers = {
"demo_broker": {
"url": "<url_of_your_kafka_bootstrap_server>",
"description": "local demo kafka broker",
"port": "<port_of_your_kafka_bootstrap_server>",
}
}
app = FastKafka(kafka_brokers=kafka_brokers)
@app.run_in_background()
async def prepare_and_send_hello_batch():
msgs=[f"Hello world {i}" for i in range(10)]
await to_hello_world(msgs)
from typing import List
@app.produces()
async def to_hello_world(msgs: List[str]) -> List[HelloWorld]:
return [HelloWorld(msg=msg) for msg in msgs]
Run the app​
Now we can run the app. Copy the code above in producer_example.py and run it by running
fastkafka run --num-workers=1 --kafka-broker=demo_broker producer_with_key_example:app
After running the command, you should see this output in your terminal:
[46480]: [INFO] fastkafka._application.app: run_in_background() : Adding function 'prepare_and_send_hello_batch' as background task
[46480]: [INFO] fastkafka._application.app: set_kafka_broker() : Setting bootstrap_servers value to '127.0.0.1:9092'
[46480]: [INFO] fastkafka._application.app: _create_producer() : created producer using the config: '{'bootstrap_servers': '127.0.0.1:9092'}'
[46480]: [INFO] fastkafka._application.app: _populate_bg_tasks() : Starting background task 'prepare_and_send_hello_batch'
Starting process cleanup, this may take a few seconds...
[INFO] fastkafka._server: terminate_asyncio_process(): Terminating the process 46480...
[46480]: [INFO] fastkafka._application.app: _shutdown_bg_tasks() : Cancelling background task 'prepare_and_send_hello_batch'
[46480]: [INFO] fastkafka._application.app: _shutdown_bg_tasks() : Waiting for background task 'prepare_and_send_hello_batch' to finish
[46480]: [INFO] fastkafka._application.app: _shutdown_bg_tasks() : Execution finished for background task 'prepare_and_send_hello_batch'
[INFO] fastkafka._server: terminate_asyncio_process(): Process 46480 terminated.
Check if the batch was sent to the Kafka topic with the defined key​
Lets check the topic and see if there are “Hello world" messages in the hello_world topic. In your terminal run:
kafka-console-consumer.sh --topic=hello_world --from-beginning --bootstrap-server=<address_of_your_kafka_bootstrap_server>
You should see the batch of messages in your topic.
Batch key​
To define a key for your batch like in Defining a partition
key guide you can wrap the
returning value in a
KafkaEvent
class. To learn more about defining a partition ke and
KafkaEvent
class, please, have a look at Defining a partition
key guide.
Let’s demonstrate that.
To define a key, we just need to modify our producing function, like this:
from typing import List
from fastkafka import KafkaEvent
@app.produces()
async def to_hello_world(msgs: List[str]) -> KafkaEvent[List[HelloWorld]]:
return KafkaEvent([HelloWorld(msg=msg) for msg in msgs], key=b"my_key")
Now our app looks like this:
import asyncio
from fastkafka import FastKafka
from pydantic import BaseModel, Field
class HelloWorld(BaseModel):
msg: str = Field(
...,
example="Hello",
description="Demo hello world message",
)
kafka_brokers = {
"demo_broker": {
"url": "<url_of_your_kafka_bootstrap_server>",
"description": "local demo kafka broker",
"port": "<port_of_your_kafka_bootstrap_server>",
}
}
app = FastKafka(kafka_brokers=kafka_brokers)
@app.run_in_background()
async def prepare_and_send_hello_batch():
msgs=[f"Hello world {i}" for i in range(10)]
await to_hello_world(msgs)
from typing import List
from fastkafka import KafkaEvent
@app.produces()
async def to_hello_world(msgs: List[str]) -> KafkaEvent[List[HelloWorld]]:
return KafkaEvent([HelloWorld(msg=msg) for msg in msgs], key=b"my_key")
Check if the batch was sent to the Kafka topic​
Lets check the topic and see if there are “Hello world" messages in the hello_world topic, containing a defined key. In your terminal run:
kafka-console-consumer.sh --topic=hello_world --property print.key=true --from-beginning --bootstrap-server=<address_of_your_kafka_bootstrap_server>
You should see the batch of messages with the defined key in your topic.