Skip to main content
Version: 0.8.0

@produces basics

You can use @produces decorator to produce messages to Kafka topics.

In this guide we will create a simple FastKafka app that will produce hello world messages to hello_world topic.

Import FastKafka​

To use the @produces decorator, frist we need to import the base FastKafka app to create our application.

from fastkafka import FastKafka

Define the structure of the messages​

Next, you need to define the structure of the messages you want to send to the topic using pydantic. For the guide we’ll stick to something basic, but you are free to define any complex message structure you wish in your project, just make sure it can be JSON encoded.

Let’s import BaseModel and Field from pydantic and create a simple HelloWorld class containing one string parameter msg

from pydantic import BaseModel, Field

class HelloWorld(BaseModel):
msg: str = Field(
...,
example="Hello",
description="Demo hello world message",
)

Create a base FastKafka app​

Now we will create and define a base FastKafka app, replace the <url_of_your_kafka_bootstrap_server> and <port_of_your_kafka_bootstrap_server> with the actual values of your Kafka bootstrap server



kafka_brokers = {
"demo_broker": {
"url": "<url_of_your_kafka_bootstrap_server>",
"description": "local demo kafka broker",
"port": "<port_of_your_kafka_bootstrap_server>",
}
}

app = FastKafka(kafka_brokers=kafka_brokers)

Create a producer function and decorate it with @produces​

Let’s create a producer function that will produce HelloWorld messages to hello_world topic:


@app.produces()
async def to_hello_world(msg: str) -> HelloWorld:
return HelloWorld(msg=msg)

Now you can call your defined function as any normal python function in your code. The side effect of calling the function will be that the value you are returning will also be sent to a kafka topic.

By default, the topic is determined from your function name, the “to_" prefix is stripped and what is left over is used as a topic name. I this case, that is hello_world.

Instruct the app to start sending HelloWorld messages​

Let’s use @run_in_background decorator to instruct our app to send HelloWorld messages to hello_world topic every second.


import asyncio

@app.run_in_background()
async def hello_every_second():
while(True):
await to_hello_world(msg="Hello world!")
await asyncio.sleep(1)

Final app​

Your app code should look like this:

from fastkafka import FastKafka
from pydantic import BaseModel, Field

class HelloWorld(BaseModel):
msg: str = Field(
...,
example="Hello",
description="Demo hello world message",
)


kafka_brokers = {
"demo_broker": {
"url": "<url_of_your_kafka_bootstrap_server>",
"description": "local demo kafka broker",
"port": "<port_of_your_kafka_bootstrap_server>",
}
}

app = FastKafka(kafka_brokers=kafka_brokers)

@app.produces()
async def to_hello_world(msg: str) -> HelloWorld:
return HelloWorld(msg=msg)

import asyncio

@app.run_in_background()
async def hello_every_second():
while(True):
await to_hello_world(msg="Hello world!")
await asyncio.sleep(1)

Run the app​

script_file = "producer_example.py"
cmd = "fastkafka run --num-workers=1 --kafka-broker=demo_broker producer_example:app"
md(
f"Now we can run the app. Copy the code above in producer_example.py and run it by running\n```shell\n{cmd}\n```"
)

Now we can run the app. Copy the code above in producer_example.py and run it by running

fastkafka run --num-workers=1 --kafka-broker=demo_broker producer_example:app

After running the command, you should see this output in your terminal:

[84645]: [INFO] fastkafka._application.app: run_in_background() : Adding function 'hello_every_second' as background task
[84645]: [INFO] fastkafka._application.app: set_kafka_broker() : Setting bootstrap_servers value to '127.0.0.1:9092'
[84645]: [INFO] fastkafka._application.app: _create_producer() : created producer using the config: '{'bootstrap_servers': '127.0.0.1:9092'}'
[84645]: [INFO] fastkafka._application.app: _populate_bg_tasks() : Starting background task 'hello_every_second'
[84645]: [WARNING] aiokafka.cluster: Topic hello_world is not available during auto-create initialization
[84645]: [WARNING] aiokafka.cluster: Topic hello_world is not available during auto-create initialization
Starting process cleanup, this may take a few seconds...
[INFO] fastkafka._server: terminate_asyncio_process(): Terminating the process 84645...
[84645]: [INFO] fastkafka._application.app: _shutdown_bg_tasks() : Cancelling background task 'hello_every_second'
[84645]: [INFO] fastkafka._application.app: _shutdown_bg_tasks() : Waiting for background task 'hello_every_second' to finish
[84645]: [INFO] fastkafka._application.app: _shutdown_bg_tasks() : Execution finished for background task 'hello_every_second'
[INFO] fastkafka._server: terminate_asyncio_process(): Process 84645 terminated.

Check if the message was sent to the Kafka topic​

Lets check the topic and see if there is a “Hello world!" message in the hello_world topic. In your terminal run:

kafka-console-consumer.sh -topic=hello_world --from-beginning -bootstrap-server=<addr_of_your_kafka_bootstrap_server>

You should see the {“msg": “Hello world!"} messages in your topic.

Choosing a topic​

You probably noticed that you didn’t define which topic you are sending the message to, this is because the @produces decorator determines the topic by default from your function name. The decorator will take your function name and strip the default “to_" prefix from it and use the rest as the topic name. In this example case, the topic is hello_world.

!!! warn "New topics"

Kafka producers and application startup will fail if the topics you are producing to don't yet exist. Before running the app, make sure that the topics are created.

You can choose your custom prefix by defining the prefix parameter in produces decorator, like this:


@app.produces(prefix="send_to_")
async def send_to_hello_world(msg: str) -> HelloWorld:
return HelloWorld(msg=msg)

Also, you can define the topic name completely by defining the topic in parameter in produces decorator, like this:


@app.produces(topic="my_special_topic")
async def to_hello_world(msg: str) -> HelloWorld:
return HelloWorld(msg=msg)

Message data​

The return value from your function will be translated JSON string and then to bytes and sent to defined Kafka topic. The typing of the return value is used for generating the documentation for your Kafka app.

In this example case, the return value is HelloWorld class which will be translated into JSON formatted string and then to bytes. The translated data will then be sent to Kafka. In the from of: b'{"msg":"Hello world!"}'