Skip to main content
Version: 0.6.0

Defining a partition key

Partition keys are used in Apache Kafka to determine which partition a message should be written to. This ensures that related messages are kept together in the same partition, which can be useful for ensuring order or for grouping related messages together for efficient processing. Additionally, partitioning data across multiple partitions allows Kafka to distribute load across multiple brokers and scale horizontally, while replicating data across multiple brokers provides fault tolerance.

You can define your partition keys when using the @produces decorator, this guide will demonstrate to you this feature.

Return a key from the producing function​

To define a key for the message that you want to produce to Kafka topic, you need to wrap the response into KafkaEvent class and set the key value. Check the example below:


from fastkafka import KafkaEvent

@app.produces()
async def to_hello_world(msg: str) -> KafkaEvent[HelloWorld]:
return KafkaEvent(HelloWorld(msg=msg), key=b"my_key")

In the example, we want to return the HelloWorld message class with the key defined as my_key. So, we wrap the message and key into a KafkaEvent class and return it as such.

While generating the documentation, the KafkaEvent class will be unwrapped and the HelloWorld class will be documented in the definition of message type, same way if you didn’t use the key.

!!! info "Which key to choose?"

Although we have defined a fixed key in this example, nothing is stopping you from calculating a key beforehand and passing it in, or using the message parts for key calculation. Just make sure that the key is in `bytes` format when you wrap it in `KafkaEvent`.

App example​

We will modify the app example from @producer basics guide to return the HelloWorld with our key. The final app will look like this (make sure you replace the <url_of_your_kafka_bootstrap_server> and <port_of_your_kafka_bootstrap_server> with the actual values):


from fastkafka import FastKafka
from pydantic import BaseModel, Field

class HelloWorld(BaseModel):
msg: str = Field(
...,
example="Hello",
description="Demo hello world message",
)

kafka_brokers = {
"demo_broker": {
"url": "<url_of_your_kafka_bootstrap_server>",
"description": "local demo kafka broker",
"port": "<port_of_your_kafka_bootstrap_server>",
}
}

app = FastKafka(kafka_brokers=kafka_brokers)

from fastkafka import KafkaEvent

@app.produces()
async def to_hello_world(msg: str) -> KafkaEvent[HelloWorld]:
return KafkaEvent(HelloWorld(msg=msg), key=b"my_key")

import asyncio

@app.run_in_background()
async def hello_every_second():
while(True):
await to_hello_world(msg="Hello world!")
await asyncio.sleep(1)

Run the app​

Now we can run the app. Copy the code above in producer_example.py and run it by running

fastkafka run --num-workers=1 --kafka-broker=demo_broker producer_with_key_example:app

After running the command, you should see this output in your terminal:

[347835]: [INFO] fastkafka._application.app: run_in_background() : Adding function 'hello_every_second' as background task
[347835]: [INFO] fastkafka._application.app: set_kafka_broker() : Setting bootstrap_servers value to '127.0.0.1:9092'
[347835]: [INFO] fastkafka._application.app: _create_producer() : created producer using the config: '{'bootstrap_servers': '127.0.0.1:9092'}'
[347835]: [INFO] fastkafka._application.app: _populate_bg_tasks() : Starting background task 'hello_every_second'
Starting process cleanup, this may take a few seconds...
[INFO] fastkafka._server: terminate_asyncio_process(): Terminating the process 347835...
[347835]: [INFO] fastkafka._application.app: _shutdown_bg_tasks() : Cancelling background task 'hello_every_second'
[347835]: [INFO] fastkafka._application.app: _shutdown_bg_tasks() : Waiting for background task 'hello_every_second' to finish
[347835]: [INFO] fastkafka._application.app: _shutdown_bg_tasks() : Execution finished for background task 'hello_every_second'
[INFO] fastkafka._server: terminate_asyncio_process(): Process 347835 terminated.

Check if the message was sent to the Kafka topic with the desired key​

Lets check the topic and see if there is a “Hello world!" message in the hello_world topic with the defined key. In your terminal run:

kafka-console-consumer.sh --topic=hello_world --property print.key=true --from-beginning --bootstrap-server=<address_of_your_kafka_bootstrap_server>

You should see the my_key {“msg": “Hello world!"} messages in your topic appearing, the my_key part of the message is the key that we defined in our producing function.