Skip to main content
Version: dev 🚧

Using multiple Kafka clusters

Ready to take your FastKafka app to the next level? This guide shows you how to connect to multiple Kafka clusters effortlessly. Consolidate topics and produce messages across clusters like a pro. Unleash the full potential of your Kafka-powered app with FastKafka. Let’s dive in and elevate your application’s capabilities!

Test message​

To showcase the functionalities of FastKafka and illustrate the concepts discussed, we can use a simple test message called TestMsg. Here’s the definition of the TestMsg class:

class TestMsg(BaseModel):
msg: str = Field(...)

Defining multiple broker configurations​

When building a FastKafka application, you may need to consume messages from multiple Kafka clusters, each with its own set of broker configurations. FastKafka provides the flexibility to define different broker clusters using the brokers argument in the consumes decorator. Let’s explore an example code snippet

from pydantic import BaseModel, Field

from fastkafka import FastKafka


class TestMsg(BaseModel):
msg: str = Field(...)


kafka_brokers_1 = dict(
development=dict(url="dev.server_1", port=9092),
production=dict(url="prod.server_1", port=9092),
)
kafka_brokers_2 = dict(
development=dict(url="dev.server_2", port=9092),
production=dict(url="prod.server_1", port=9092),
)

app = FastKafka(kafka_brokers=kafka_brokers_1, bootstrap_servers_id="development")


@app.consumes(topic="preprocessed_signals")
async def on_preprocessed_signals_1(msg: TestMsg):
print(f"Received on s1: {msg=}")
await to_predictions_1(msg)


@app.consumes(topic="preprocessed_signals", brokers=kafka_brokers_2)
async def on_preprocessed_signals_2(msg: TestMsg):
print(f"Received on s2: {msg=}")
await to_predictions_2(msg)


@app.produces(topic="predictions")
async def to_predictions_1(msg: TestMsg) -> TestMsg:
return msg


@app.produces(topic="predictions", brokers=kafka_brokers_2)
async def to_predictions_2(msg: TestMsg) -> TestMsg:
return msg

In this example, the application has two consumes endpoints, both of which will consume events from preprocessed_signals topic. on_preprocessed_signals_1 will consume events from kafka_brokers_1 configuration and on_preprocessed_signals_2 will consume events from kafka_brokers_2 configuration. When producing, to_predictions_1 will produce to predictions topic on kafka_brokers_1 cluster and to_predictions_2 will produce to predictions topic on kafka_brokers_2 cluster.

How it works​

The kafka_brokers_1 configuration represents the primary cluster, while kafka_brokers_2 serves as an alternative cluster specified in the decorator.

Using the FastKafka class, the app object is initialized with the primary broker configuration (kafka_brokers_1). By default, the @app.consumes decorator without the brokers argument consumes messages from the preprocessed_signals topic on kafka_brokers_1.

To consume messages from a different cluster, the @app.consumes decorator includes the brokers argument. This allows explicit specification of the broker cluster in the on_preprocessed_signals_2 function, enabling consumption from the same topic but using the kafka_brokers_2 configuration.

The brokers argument can also be used in the @app.produces decorator to define multiple broker clusters for message production.

It’s important to ensure that all broker configurations have the same required settings as the primary cluster to ensure consistent behavior.

Testing the application​

To test our FastKafka ‘mirroring’ application, we can use our testing framework. Lets take a look how it’s done:

from fastkafka.testing import Tester

async with Tester(app) as tester:
# Send TestMsg to topic/broker pair on_preprocessed_signals_1 is consuming from
await tester.mirrors[app.on_preprocessed_signals_1](TestMsg(msg="signal_s1"))
# Assert on_preprocessed_signals_1 consumed sent message
await app.awaited_mocks.on_preprocessed_signals_1.assert_called_with(
TestMsg(msg="signal_s1"), timeout=5
)
# Assert app has produced a prediction
await tester.mirrors[app.to_predictions_1].assert_called_with(
TestMsg(msg="signal_s1"), timeout=5
)

# Send TestMsg to topic/broker pair on_preprocessed_signals_2 is consuming from
await tester.mirrors[app.on_preprocessed_signals_2](TestMsg(msg="signal_s2"))
# Assert on_preprocessed_signals_2 consumed sent message
await app.awaited_mocks.on_preprocessed_signals_2.assert_called_with(
TestMsg(msg="signal_s2"), timeout=5
)
# Assert app has produced a prediction
await tester.mirrors[app.to_predictions_2].assert_called_with(
TestMsg(msg="signal_s2"), timeout=5
)
23-06-23 12:15:51.156 [INFO] fastkafka._testing.in_memory_broker: InMemoryBroker._patch_consumers_and_producers(): Patching consumers and producers!
23-06-23 12:15:51.157 [INFO] fastkafka._testing.in_memory_broker: InMemoryBroker starting
23-06-23 12:15:51.157 [INFO] fastkafka._application.app: _create_producer() : created producer using the config: '{'bootstrap_servers': 'dev.server_1:9092'}'
23-06-23 12:15:51.158 [INFO] fastkafka._testing.in_memory_broker: AIOKafkaProducer patched start() called()
23-06-23 12:15:51.158 [INFO] fastkafka._application.app: _create_producer() : created producer using the config: '{'bootstrap_servers': 'dev.server_2:9092'}'
23-06-23 12:15:51.159 [INFO] fastkafka._testing.in_memory_broker: AIOKafkaProducer patched start() called()
23-06-23 12:15:51.178 [INFO] fastkafka._application.app: _create_producer() : created producer using the config: '{'bootstrap_servers': 'dev.server_1:9092'}'
23-06-23 12:15:51.178 [INFO] fastkafka._testing.in_memory_broker: AIOKafkaProducer patched start() called()
23-06-23 12:15:51.179 [INFO] fastkafka._application.app: _create_producer() : created producer using the config: '{'bootstrap_servers': 'dev.server_2:9092'}'
23-06-23 12:15:51.180 [INFO] fastkafka._testing.in_memory_broker: AIOKafkaProducer patched start() called()
23-06-23 12:15:51.180 [INFO] fastkafka._components.aiokafka_consumer_loop: aiokafka_consumer_loop() starting...
23-06-23 12:15:51.180 [INFO] fastkafka._components.aiokafka_consumer_loop: aiokafka_consumer_loop(): Consumer created using the following parameters: {'auto_offset_reset': 'earliest', 'max_poll_records': 100, 'bootstrap_servers': 'dev.server_1:9092'}
23-06-23 12:15:51.181 [INFO] fastkafka._testing.in_memory_broker: AIOKafkaConsumer patched start() called()
23-06-23 12:15:51.181 [INFO] fastkafka._components.aiokafka_consumer_loop: aiokafka_consumer_loop(): Consumer started.
23-06-23 12:15:51.182 [INFO] fastkafka._testing.in_memory_broker: AIOKafkaConsumer patched subscribe() called
23-06-23 12:15:51.182 [INFO] fastkafka._testing.in_memory_broker: AIOKafkaConsumer.subscribe(), subscribing to: ['preprocessed_signals']
23-06-23 12:15:51.182 [INFO] fastkafka._components.aiokafka_consumer_loop: aiokafka_consumer_loop(): Consumer subscribed.
23-06-23 12:15:51.186 [INFO] fastkafka._components.aiokafka_consumer_loop: aiokafka_consumer_loop() starting...
23-06-23 12:15:51.187 [INFO] fastkafka._components.aiokafka_consumer_loop: aiokafka_consumer_loop(): Consumer created using the following parameters: {'auto_offset_reset': 'earliest', 'max_poll_records': 100, 'bootstrap_servers': 'dev.server_2:9092'}
23-06-23 12:15:51.187 [INFO] fastkafka._testing.in_memory_broker: AIOKafkaConsumer patched start() called()
23-06-23 12:15:51.188 [INFO] fastkafka._components.aiokafka_consumer_loop: aiokafka_consumer_loop(): Consumer started.
23-06-23 12:15:51.188 [INFO] fastkafka._testing.in_memory_broker: AIOKafkaConsumer patched subscribe() called
23-06-23 12:15:51.189 [INFO] fastkafka._testing.in_memory_broker: AIOKafkaConsumer.subscribe(), subscribing to: ['preprocessed_signals']
23-06-23 12:15:51.189 [INFO] fastkafka._components.aiokafka_consumer_loop: aiokafka_consumer_loop(): Consumer subscribed.
23-06-23 12:15:51.189 [INFO] fastkafka._components.aiokafka_consumer_loop: aiokafka_consumer_loop() starting...
23-06-23 12:15:51.190 [INFO] fastkafka._components.aiokafka_consumer_loop: aiokafka_consumer_loop(): Consumer created using the following parameters: {'auto_offset_reset': 'earliest', 'max_poll_records': 100, 'bootstrap_servers': 'dev.server_1:9092'}
23-06-23 12:15:51.190 [INFO] fastkafka._testing.in_memory_broker: AIOKafkaConsumer patched start() called()
23-06-23 12:15:51.190 [INFO] fastkafka._components.aiokafka_consumer_loop: aiokafka_consumer_loop(): Consumer started.
23-06-23 12:15:51.191 [INFO] fastkafka._testing.in_memory_broker: AIOKafkaConsumer patched subscribe() called
23-06-23 12:15:51.191 [INFO] fastkafka._testing.in_memory_broker: AIOKafkaConsumer.subscribe(), subscribing to: ['predictions']
23-06-23 12:15:51.191 [INFO] fastkafka._components.aiokafka_consumer_loop: aiokafka_consumer_loop(): Consumer subscribed.
23-06-23 12:15:51.192 [INFO] fastkafka._components.aiokafka_consumer_loop: aiokafka_consumer_loop() starting...
23-06-23 12:15:51.192 [INFO] fastkafka._components.aiokafka_consumer_loop: aiokafka_consumer_loop(): Consumer created using the following parameters: {'auto_offset_reset': 'earliest', 'max_poll_records': 100, 'bootstrap_servers': 'dev.server_2:9092'}
23-06-23 12:15:51.193 [INFO] fastkafka._testing.in_memory_broker: AIOKafkaConsumer patched start() called()
23-06-23 12:15:51.193 [INFO] fastkafka._components.aiokafka_consumer_loop: aiokafka_consumer_loop(): Consumer started.
23-06-23 12:15:51.193 [INFO] fastkafka._testing.in_memory_broker: AIOKafkaConsumer patched subscribe() called
23-06-23 12:15:51.194 [INFO] fastkafka._testing.in_memory_broker: AIOKafkaConsumer.subscribe(), subscribing to: ['predictions']
23-06-23 12:15:51.194 [INFO] fastkafka._components.aiokafka_consumer_loop: aiokafka_consumer_loop(): Consumer subscribed.
Received on s1: msg=TestMsg(msg='signal_s1')
Received on s2: msg=TestMsg(msg='signal_s2')
23-06-23 12:15:56.181 [INFO] fastkafka._testing.in_memory_broker: AIOKafkaConsumer patched stop() called
23-06-23 12:15:56.181 [INFO] fastkafka._components.aiokafka_consumer_loop: aiokafka_consumer_loop(): Consumer stopped.
23-06-23 12:15:56.182 [INFO] fastkafka._components.aiokafka_consumer_loop: aiokafka_consumer_loop() finished.
23-06-23 12:15:56.182 [INFO] fastkafka._testing.in_memory_broker: AIOKafkaConsumer patched stop() called
23-06-23 12:15:56.182 [INFO] fastkafka._components.aiokafka_consumer_loop: aiokafka_consumer_loop(): Consumer stopped.
23-06-23 12:15:56.183 [INFO] fastkafka._components.aiokafka_consumer_loop: aiokafka_consumer_loop() finished.
23-06-23 12:15:56.183 [INFO] fastkafka._testing.in_memory_broker: AIOKafkaProducer patched stop() called
23-06-23 12:15:56.183 [INFO] fastkafka._testing.in_memory_broker: AIOKafkaProducer patched stop() called
23-06-23 12:15:56.184 [INFO] fastkafka._testing.in_memory_broker: AIOKafkaConsumer patched stop() called
23-06-23 12:15:56.184 [INFO] fastkafka._components.aiokafka_consumer_loop: aiokafka_consumer_loop(): Consumer stopped.
23-06-23 12:15:56.185 [INFO] fastkafka._components.aiokafka_consumer_loop: aiokafka_consumer_loop() finished.
23-06-23 12:15:56.185 [INFO] fastkafka._testing.in_memory_broker: AIOKafkaConsumer patched stop() called
23-06-23 12:15:56.185 [INFO] fastkafka._components.aiokafka_consumer_loop: aiokafka_consumer_loop(): Consumer stopped.
23-06-23 12:15:56.186 [INFO] fastkafka._components.aiokafka_consumer_loop: aiokafka_consumer_loop() finished.
23-06-23 12:15:56.186 [INFO] fastkafka._testing.in_memory_broker: AIOKafkaProducer patched stop() called
23-06-23 12:15:56.186 [INFO] fastkafka._testing.in_memory_broker: AIOKafkaProducer patched stop() called
23-06-23 12:15:56.188 [INFO] fastkafka._testing.in_memory_broker: InMemoryBroker stopping

The usage of the tester.mirrors dictionary allows specifying the desired topic/broker combination for sending the test messages, especially when working with multiple Kafka clusters. This ensures that the data is sent to the appropriate topic/broker based on the consuming function, and consumed from appropriate topic/broker based on the producing function.

Running the application​

You can run your application using fastkafka run CLI command in the same way that you would run a single cluster app.

To start your app, copy the code above in multi_cluster_example.py and run it by running:

Now we can run the app. Copy the code above in multi_cluster_example.py, adjust your server configurations, and run it by running

fastkafka run --num-workers=1 --kafka-broker=development multi_cluster_example:app

In your app logs, you should see your app starting up and your two consumer functions connecting to different kafka clusters.

[182747]: 23-06-23 12:16:14.092 [INFO] fastkafka._components.aiokafka_consumer_loop: aiokafka_consumer_loop() starting...
[182747]: 23-06-23 12:16:14.092 [INFO] fastkafka._components.aiokafka_consumer_loop: aiokafka_consumer_loop(): Consumer created using the following parameters: {'auto_offset_reset': 'earliest', 'max_poll_records': 100, 'bootstrap_servers': '127.0.0.1:24092'}
[182747]: 23-06-23 12:16:14.092 [INFO] fastkafka._components.aiokafka_consumer_loop: aiokafka_consumer_loop() starting...
[182747]: 23-06-23 12:16:14.092 [INFO] fastkafka._components.aiokafka_consumer_loop: aiokafka_consumer_loop(): Consumer created using the following parameters: {'auto_offset_reset': 'earliest', 'max_poll_records': 100, 'bootstrap_servers': '127.0.0.1:24093'}
[182747]: 23-06-23 12:16:14.131 [INFO] fastkafka._components.aiokafka_consumer_loop: aiokafka_consumer_loop(): Consumer started.
[182747]: 23-06-23 12:16:14.131 [INFO] aiokafka.consumer.subscription_state: Updating subscribed topics to: frozenset({'preprocessed_signals'})
[182747]: 23-06-23 12:16:14.131 [INFO] aiokafka.consumer.consumer: Subscribed to topic(s): {'preprocessed_signals'}
[182747]: 23-06-23 12:16:14.131 [INFO] fastkafka._components.aiokafka_consumer_loop: aiokafka_consumer_loop(): Consumer subscribed.
[182747]: 23-06-23 12:16:14.136 [INFO] fastkafka._components.aiokafka_consumer_loop: aiokafka_consumer_loop(): Consumer started.
[182747]: 23-06-23 12:16:14.136 [INFO] aiokafka.consumer.subscription_state: Updating subscribed topics to: frozenset({'preprocessed_signals'})
[182747]: 23-06-23 12:16:14.136 [INFO] aiokafka.consumer.consumer: Subscribed to topic(s): {'preprocessed_signals'}
[182747]: 23-06-23 12:16:14.136 [INFO] fastkafka._components.aiokafka_consumer_loop: aiokafka_consumer_loop(): Consumer subscribed.
[182747]: 23-06-23 12:16:14.141 [INFO] aiokafka.consumer.group_coordinator: Metadata for topic has changed from {} to {'preprocessed_signals': 1}.
[182747]: 23-06-23 12:16:14.141 [INFO] aiokafka.consumer.group_coordinator: Metadata for topic has changed from {} to {'preprocessed_signals': 1}.
Starting process cleanup, this may take a few seconds...
23-06-23 12:16:18.294 [INFO] fastkafka._components._subprocess: terminate_asyncio_process(): Terminating the process 182747...
[182747]: 23-06-23 12:16:19.380 [INFO] fastkafka._components.aiokafka_consumer_loop: aiokafka_consumer_loop(): Consumer stopped.
[182747]: 23-06-23 12:16:19.380 [INFO] fastkafka._components.aiokafka_consumer_loop: aiokafka_consumer_loop() finished.
[182747]: 23-06-23 12:16:19.380 [INFO] fastkafka._components.aiokafka_consumer_loop: aiokafka_consumer_loop(): Consumer stopped.
[182747]: 23-06-23 12:16:19.380 [INFO] fastkafka._components.aiokafka_consumer_loop: aiokafka_consumer_loop() finished.
23-06-23 12:16:19.471 [INFO] fastkafka._components._subprocess: terminate_asyncio_process(): Process 182747 terminated.

Application documentation​

At the moment the documentation for multicluster app is not yet implemented, but it is under development and you can expecti it soon!

Examples on how to use multiple broker configurations​

Example #1​

In this section, we’ll explore how you can effectively forward topics between different Kafka clusters, enabling seamless data synchronization for your applications.

Imagine having two Kafka clusters, namely kafka_brokers_1 and kafka_brokers_2, each hosting its own set of topics and messages. Now, if you want to forward a specific topic (in this case: preprocessed_signals) from kafka_brokers_1 to kafka_brokers_2, FastKafka provides an elegant solution.

Let’s examine the code snippet that configures our application for topic forwarding:

from pydantic import BaseModel, Field

from fastkafka import FastKafka

class TestMsg(BaseModel):
msg: str = Field(...)

kafka_brokers_1 = dict(localhost=dict(url="server_1", port=9092))
kafka_brokers_2 = dict(localhost=dict(url="server_2", port=9092))

app = FastKafka(kafka_brokers=kafka_brokers_1)


@app.consumes(topic="preprocessed_signals")
async def on_preprocessed_signals_original(msg: TestMsg):
await to_preprocessed_signals_forward(msg)


@app.produces(topic="preprocessed_signals", brokers=kafka_brokers_2)
async def to_preprocessed_signals_forward(data: TestMsg) -> TestMsg:
return data

Here’s how it works: our FastKafka application is configured to consume messages from kafka_brokers_1 and process them in the on_preprocessed_signals_original function. We want to forward these messages to kafka_brokers_2. To achieve this, we define the to_preprocessed_signals_forward function as a producer, seamlessly producing the processed messages to the preprocessed_signals topic within the kafka_brokers_2 cluster.

Testing​

To test our FastKafka forwarding application, we can use our testing framework. Let’s take a look at the testing code snippet:

from fastkafka.testing import Tester

async with Tester(app) as tester:
await tester.mirrors[app.on_preprocessed_signals_original](TestMsg(msg="signal"))
await tester.mirrors[app.to_preprocessed_signals_forward].assert_called(timeout=5)
23-06-23 12:16:31.689 [INFO] fastkafka._testing.in_memory_broker: InMemoryBroker._patch_consumers_and_producers(): Patching consumers and producers!
23-06-23 12:16:31.690 [INFO] fastkafka._testing.in_memory_broker: InMemoryBroker starting
23-06-23 12:16:31.691 [INFO] fastkafka._application.app: _create_producer() : created producer using the config: '{'bootstrap_servers': 'server_2:9092'}'
23-06-23 12:16:31.691 [INFO] fastkafka._testing.in_memory_broker: AIOKafkaProducer patched start() called()
23-06-23 12:16:31.701 [INFO] fastkafka._application.app: _create_producer() : created producer using the config: '{'bootstrap_servers': 'server_1:9092'}'
23-06-23 12:16:31.702 [INFO] fastkafka._testing.in_memory_broker: AIOKafkaProducer patched start() called()
23-06-23 12:16:31.702 [INFO] fastkafka._components.aiokafka_consumer_loop: aiokafka_consumer_loop() starting...
23-06-23 12:16:31.703 [INFO] fastkafka._components.aiokafka_consumer_loop: aiokafka_consumer_loop(): Consumer created using the following parameters: {'auto_offset_reset': 'earliest', 'max_poll_records': 100, 'bootstrap_servers': 'server_1:9092'}
23-06-23 12:16:31.703 [INFO] fastkafka._testing.in_memory_broker: AIOKafkaConsumer patched start() called()
23-06-23 12:16:31.704 [INFO] fastkafka._components.aiokafka_consumer_loop: aiokafka_consumer_loop(): Consumer started.
23-06-23 12:16:31.704 [INFO] fastkafka._testing.in_memory_broker: AIOKafkaConsumer patched subscribe() called
23-06-23 12:16:31.704 [INFO] fastkafka._testing.in_memory_broker: AIOKafkaConsumer.subscribe(), subscribing to: ['preprocessed_signals']
23-06-23 12:16:31.706 [INFO] fastkafka._components.aiokafka_consumer_loop: aiokafka_consumer_loop(): Consumer subscribed.
23-06-23 12:16:31.706 [INFO] fastkafka._components.aiokafka_consumer_loop: aiokafka_consumer_loop() starting...
23-06-23 12:16:31.707 [INFO] fastkafka._components.aiokafka_consumer_loop: aiokafka_consumer_loop(): Consumer created using the following parameters: {'auto_offset_reset': 'earliest', 'max_poll_records': 100, 'bootstrap_servers': 'server_2:9092'}
23-06-23 12:16:31.707 [INFO] fastkafka._testing.in_memory_broker: AIOKafkaConsumer patched start() called()
23-06-23 12:16:31.708 [INFO] fastkafka._components.aiokafka_consumer_loop: aiokafka_consumer_loop(): Consumer started.
23-06-23 12:16:31.708 [INFO] fastkafka._testing.in_memory_broker: AIOKafkaConsumer patched subscribe() called
23-06-23 12:16:31.709 [INFO] fastkafka._testing.in_memory_broker: AIOKafkaConsumer.subscribe(), subscribing to: ['preprocessed_signals']
23-06-23 12:16:31.709 [INFO] fastkafka._components.aiokafka_consumer_loop: aiokafka_consumer_loop(): Consumer subscribed.
23-06-23 12:16:35.703 [INFO] fastkafka._testing.in_memory_broker: AIOKafkaConsumer patched stop() called
23-06-23 12:16:35.703 [INFO] fastkafka._components.aiokafka_consumer_loop: aiokafka_consumer_loop(): Consumer stopped.
23-06-23 12:16:35.704 [INFO] fastkafka._components.aiokafka_consumer_loop: aiokafka_consumer_loop() finished.
23-06-23 12:16:35.704 [INFO] fastkafka._testing.in_memory_broker: AIOKafkaProducer patched stop() called
23-06-23 12:16:35.705 [INFO] fastkafka._testing.in_memory_broker: AIOKafkaConsumer patched stop() called
23-06-23 12:16:35.705 [INFO] fastkafka._components.aiokafka_consumer_loop: aiokafka_consumer_loop(): Consumer stopped.
23-06-23 12:16:35.706 [INFO] fastkafka._components.aiokafka_consumer_loop: aiokafka_consumer_loop() finished.
23-06-23 12:16:35.707 [INFO] fastkafka._testing.in_memory_broker: AIOKafkaProducer patched stop() called
23-06-23 12:16:35.707 [INFO] fastkafka._testing.in_memory_broker: InMemoryBroker stopping

With the help of the Tester object, we can simulate and verify the behavior of our FastKafka application. Here’s how it works:

  1. We create an instance of the Tester by passing in our app object, which represents our FastKafka application.

  2. Using the tester.mirrors dictionary, we can send a message to a specific Kafka broker and topic combination. In this case, we use tester.mirrors[app.on_preprocessed_signals_original] to send a TestMsg message with the content “signal" to the appropriate Kafka broker and topic.

  3. After sending the message, we can perform assertions on the mirrored function using tester.mirrors[app.to_preprocessed_signals_forward].assert_called(timeout=5). This assertion ensures that the mirrored function has been called within a specified timeout period (in this case, 5 seconds).

Example #2​

In this section, we’ll explore how you can effortlessly consume data from multiple sources, process it, and aggregate the results into a single topic on a specific cluster.

Imagine you have two Kafka clusters: kafka_brokers_1 and kafka_brokers_2, each hosting its own set of topics and messages. Now, what if you want to consume data from both clusters, perform some processing, and produce the results to a single topic on kafka_brokers_1? FastKafka has got you covered!

Let’s take a look at the code snippet that configures our application for aggregating multiple clusters:

from pydantic import BaseModel, Field

from fastkafka import FastKafka

class TestMsg(BaseModel):
msg: str = Field(...)

kafka_brokers_1 = dict(localhost=dict(url="server_1", port=9092))
kafka_brokers_2 = dict(localhost=dict(url="server_2", port=9092))

app = FastKafka(kafka_brokers=kafka_brokers_1)


@app.consumes(topic="preprocessed_signals")
async def on_preprocessed_signals_1(msg: TestMsg):
print(f"Default: {msg=}")
await to_predictions(msg)


@app.consumes(topic="preprocessed_signals", brokers=kafka_brokers_2)
async def on_preprocessed_signals_2(msg: TestMsg):
print(f"Specified: {msg=}")
await to_predictions(msg)


@app.produces(topic="predictions")
async def to_predictions(prediction: TestMsg) -> TestMsg:
print(f"Sending prediction: {prediction}")
return [prediction]

Here’s the idea: our FastKafka application is set to consume messages from the topic “preprocessed_signals" on kafka_brokers_1 cluster, as well as from the same topic on kafka_brokers_2 cluster. We have two consuming functions, on_preprocessed_signals_1 and on_preprocessed_signals_2, that handle the messages from their respective clusters. These functions perform any required processing, in this case, just calling the to_predictions function.

The exciting part is that the to_predictions function acts as a producer, sending the processed results to the “predictions" topic on kafka_brokers_1 cluster. By doing so, we effectively aggregate the data from multiple sources into a single topic on a specific cluster.

This approach enables you to consume data from multiple Kafka clusters, process it, and produce the aggregated results to a designated topic. Whether you’re generating predictions, performing aggregations, or any other form of data processing, FastKafka empowers you to harness the full potential of multiple clusters.

Testing​

Let’s take a look at the testing code snippet:

from fastkafka.testing import Tester

async with Tester(app) as tester:
await tester.mirrors[app.on_preprocessed_signals_1](TestMsg(msg="signal"))
await tester.mirrors[app.on_preprocessed_signals_2](TestMsg(msg="signal"))
await tester.on_predictions.assert_called(timeout=5)
23-06-23 12:16:41.222 [INFO] fastkafka._testing.in_memory_broker: InMemoryBroker._patch_consumers_and_producers(): Patching consumers and producers!
23-06-23 12:16:41.223 [INFO] fastkafka._testing.in_memory_broker: InMemoryBroker starting
23-06-23 12:16:41.224 [INFO] fastkafka._application.app: _create_producer() : created producer using the config: '{'bootstrap_servers': 'server_1:9092'}'
23-06-23 12:16:41.224 [INFO] fastkafka._testing.in_memory_broker: AIOKafkaProducer patched start() called()
23-06-23 12:16:41.239 [INFO] fastkafka._application.app: _create_producer() : created producer using the config: '{'bootstrap_servers': 'server_1:9092'}'
23-06-23 12:16:41.239 [INFO] fastkafka._testing.in_memory_broker: AIOKafkaProducer patched start() called()
23-06-23 12:16:41.240 [INFO] fastkafka._application.app: _create_producer() : created producer using the config: '{'bootstrap_servers': 'server_2:9092'}'
23-06-23 12:16:41.240 [INFO] fastkafka._testing.in_memory_broker: AIOKafkaProducer patched start() called()
23-06-23 12:16:41.241 [INFO] fastkafka._components.aiokafka_consumer_loop: aiokafka_consumer_loop() starting...
23-06-23 12:16:41.241 [INFO] fastkafka._components.aiokafka_consumer_loop: aiokafka_consumer_loop(): Consumer created using the following parameters: {'auto_offset_reset': 'earliest', 'max_poll_records': 100, 'bootstrap_servers': 'server_1:9092'}
23-06-23 12:16:41.241 [INFO] fastkafka._testing.in_memory_broker: AIOKafkaConsumer patched start() called()
23-06-23 12:16:41.242 [INFO] fastkafka._components.aiokafka_consumer_loop: aiokafka_consumer_loop(): Consumer started.
23-06-23 12:16:41.242 [INFO] fastkafka._testing.in_memory_broker: AIOKafkaConsumer patched subscribe() called
23-06-23 12:16:41.242 [INFO] fastkafka._testing.in_memory_broker: AIOKafkaConsumer.subscribe(), subscribing to: ['preprocessed_signals']
23-06-23 12:16:41.243 [INFO] fastkafka._components.aiokafka_consumer_loop: aiokafka_consumer_loop(): Consumer subscribed.
23-06-23 12:16:41.243 [INFO] fastkafka._components.aiokafka_consumer_loop: aiokafka_consumer_loop() starting...
23-06-23 12:16:41.245 [INFO] fastkafka._components.aiokafka_consumer_loop: aiokafka_consumer_loop(): Consumer created using the following parameters: {'auto_offset_reset': 'earliest', 'max_poll_records': 100, 'bootstrap_servers': 'server_2:9092'}
23-06-23 12:16:41.245 [INFO] fastkafka._testing.in_memory_broker: AIOKafkaConsumer patched start() called()
23-06-23 12:16:41.245 [INFO] fastkafka._components.aiokafka_consumer_loop: aiokafka_consumer_loop(): Consumer started.
23-06-23 12:16:41.246 [INFO] fastkafka._testing.in_memory_broker: AIOKafkaConsumer patched subscribe() called
23-06-23 12:16:41.246 [INFO] fastkafka._testing.in_memory_broker: AIOKafkaConsumer.subscribe(), subscribing to: ['preprocessed_signals']
23-06-23 12:16:41.247 [INFO] fastkafka._components.aiokafka_consumer_loop: aiokafka_consumer_loop(): Consumer subscribed.
23-06-23 12:16:41.247 [INFO] fastkafka._components.aiokafka_consumer_loop: aiokafka_consumer_loop() starting...
23-06-23 12:16:41.248 [INFO] fastkafka._components.aiokafka_consumer_loop: aiokafka_consumer_loop(): Consumer created using the following parameters: {'auto_offset_reset': 'earliest', 'max_poll_records': 100, 'bootstrap_servers': 'server_1:9092'}
23-06-23 12:16:41.248 [INFO] fastkafka._testing.in_memory_broker: AIOKafkaConsumer patched start() called()
23-06-23 12:16:41.249 [INFO] fastkafka._components.aiokafka_consumer_loop: aiokafka_consumer_loop(): Consumer started.
23-06-23 12:16:41.249 [INFO] fastkafka._testing.in_memory_broker: AIOKafkaConsumer patched subscribe() called
23-06-23 12:16:41.249 [INFO] fastkafka._testing.in_memory_broker: AIOKafkaConsumer.subscribe(), subscribing to: ['predictions']
23-06-23 12:16:41.249 [INFO] fastkafka._components.aiokafka_consumer_loop: aiokafka_consumer_loop(): Consumer subscribed.
Default: msg=TestMsg(msg='signal')
Sending prediction: msg='signal'
Specified: msg=TestMsg(msg='signal')
Sending prediction: msg='signal'
23-06-23 12:16:45.241 [INFO] fastkafka._testing.in_memory_broker: AIOKafkaConsumer patched stop() called
23-06-23 12:16:45.242 [INFO] fastkafka._components.aiokafka_consumer_loop: aiokafka_consumer_loop(): Consumer stopped.
23-06-23 12:16:45.242 [INFO] fastkafka._components.aiokafka_consumer_loop: aiokafka_consumer_loop() finished.
23-06-23 12:16:45.242 [INFO] fastkafka._testing.in_memory_broker: AIOKafkaProducer patched stop() called
23-06-23 12:16:45.243 [INFO] fastkafka._testing.in_memory_broker: AIOKafkaProducer patched stop() called
23-06-23 12:16:45.243 [INFO] fastkafka._testing.in_memory_broker: AIOKafkaConsumer patched stop() called
23-06-23 12:16:45.244 [INFO] fastkafka._components.aiokafka_consumer_loop: aiokafka_consumer_loop(): Consumer stopped.
23-06-23 12:16:45.245 [INFO] fastkafka._components.aiokafka_consumer_loop: aiokafka_consumer_loop() finished.
23-06-23 12:16:45.245 [INFO] fastkafka._testing.in_memory_broker: AIOKafkaConsumer patched stop() called
23-06-23 12:16:45.245 [INFO] fastkafka._components.aiokafka_consumer_loop: aiokafka_consumer_loop(): Consumer stopped.
23-06-23 12:16:45.246 [INFO] fastkafka._components.aiokafka_consumer_loop: aiokafka_consumer_loop() finished.
23-06-23 12:16:45.246 [INFO] fastkafka._testing.in_memory_broker: AIOKafkaProducer patched stop() called
23-06-23 12:16:45.247 [INFO] fastkafka._testing.in_memory_broker: InMemoryBroker stopping

Here’s how the code above works:

  1. Within an async with block, create an instance of the Tester by passing in your app object, representing your FastKafka application.

  2. Using the tester.mirrors dictionary, you can send messages to specific Kafka broker and topic combinations. In this case, we use tester.mirrors[app.on_preprocessed_signals_1] and tester.mirrors[app.on_preprocessed_signals_2] to send TestMsg messages with the content “signal" to the corresponding Kafka broker and topic combinations.

  3. After sending the messages, you can perform assertions on the on_predictions function using tester.on_predictions.assert_called(timeout=5). This assertion ensures that the on_predictions function has been called within a specified timeout period (in this case, 5 seconds).

Example #3​

In some scenarios, you may need to produce messages to multiple Kafka clusters simultaneously. FastKafka simplifies this process by allowing you to configure your application to produce messages to multiple clusters effortlessly. Let’s explore how you can achieve this:

Consider the following code snippet that demonstrates producing messages to multiple clusters:

from pydantic import BaseModel, Field

from fastkafka import FastKafka

class TestMsg(BaseModel):
msg: str = Field(...)

kafka_brokers_1 = dict(localhost=dict(url="server_1", port=9092))
kafka_brokers_2 = dict(localhost=dict(url="server_2", port=9092))

app = FastKafka(kafka_brokers=kafka_brokers_1)


@app.consumes(topic="preprocessed_signals")
async def on_preprocessed_signals(msg: TestMsg):
print(f"{msg=}")
await to_predictions_1(TestMsg(msg="prediction"))
await to_predictions_2(TestMsg(msg="prediction"))


@app.produces(topic="predictions")
async def to_predictions_1(prediction: TestMsg) -> TestMsg:
print(f"Sending prediction to s1: {prediction}")
return [prediction]


@app.produces(topic="predictions", brokers=kafka_brokers_2)
async def to_predictions_2(prediction: TestMsg) -> TestMsg:
print(f"Sending prediction to s2: {prediction}")
return [prediction]

Here’s what you need to know about producing to multiple clusters:

  1. We define two Kafka broker configurations: kafka_brokers_1 and kafka_brokers_2, representing different clusters with their respective connection details.

  2. We create an instance of the FastKafka application, specifying kafka_brokers_1 as the primary cluster for producing messages.

  3. The on_preprocessed_signals function serves as a consumer, handling incoming messages from the “preprocessed_signals" topic. Within this function, we invoke two producer functions: to_predictions_1 and to_predictions_2.

  4. The to_predictions_1 function sends predictions to the “predictions" topic on kafka_brokers_1 cluster.

  5. Additionally, the to_predictions_2 function sends the same predictions to the “predictions" topic on kafka_brokers_2 cluster. This allows for producing the same data to multiple clusters simultaneously.

By utilizing this approach, you can seamlessly produce messages to multiple Kafka clusters, enabling you to distribute data across different environments or leverage the strengths of various clusters.

Feel free to customize the producer functions as per your requirements, performing any necessary data transformations or enrichment before sending the predictions.

With FastKafka, producing to multiple clusters becomes a breeze, empowering you to harness the capabilities of multiple environments effortlessly.

Testing​

Let’s take a look at the testing code snippet:

from fastkafka.testing import Tester

async with Tester(app) as tester:
await tester.to_preprocessed_signals(TestMsg(msg="signal"))
await tester.mirrors[to_predictions_1].assert_called(timeout=5)
await tester.mirrors[to_predictions_2].assert_called(timeout=5)
23-06-23 12:16:49.903 [INFO] fastkafka._testing.in_memory_broker: InMemoryBroker._patch_consumers_and_producers(): Patching consumers and producers!
23-06-23 12:16:49.904 [INFO] fastkafka._testing.in_memory_broker: InMemoryBroker starting
23-06-23 12:16:49.904 [INFO] fastkafka._application.app: _create_producer() : created producer using the config: '{'bootstrap_servers': 'server_1:9092'}'
23-06-23 12:16:49.905 [INFO] fastkafka._testing.in_memory_broker: AIOKafkaProducer patched start() called()
23-06-23 12:16:49.905 [INFO] fastkafka._application.app: _create_producer() : created producer using the config: '{'bootstrap_servers': 'server_2:9092'}'
23-06-23 12:16:49.906 [INFO] fastkafka._testing.in_memory_broker: AIOKafkaProducer patched start() called()
23-06-23 12:16:49.921 [INFO] fastkafka._application.app: _create_producer() : created producer using the config: '{'bootstrap_servers': 'server_1:9092'}'
23-06-23 12:16:49.921 [INFO] fastkafka._testing.in_memory_broker: AIOKafkaProducer patched start() called()
23-06-23 12:16:49.921 [INFO] fastkafka._components.aiokafka_consumer_loop: aiokafka_consumer_loop() starting...
23-06-23 12:16:49.922 [INFO] fastkafka._components.aiokafka_consumer_loop: aiokafka_consumer_loop(): Consumer created using the following parameters: {'auto_offset_reset': 'earliest', 'max_poll_records': 100, 'bootstrap_servers': 'server_1:9092'}
23-06-23 12:16:49.922 [INFO] fastkafka._testing.in_memory_broker: AIOKafkaConsumer patched start() called()
23-06-23 12:16:49.923 [INFO] fastkafka._components.aiokafka_consumer_loop: aiokafka_consumer_loop(): Consumer started.
23-06-23 12:16:49.923 [INFO] fastkafka._testing.in_memory_broker: AIOKafkaConsumer patched subscribe() called
23-06-23 12:16:49.924 [INFO] fastkafka._testing.in_memory_broker: AIOKafkaConsumer.subscribe(), subscribing to: ['preprocessed_signals']
23-06-23 12:16:49.924 [INFO] fastkafka._components.aiokafka_consumer_loop: aiokafka_consumer_loop(): Consumer subscribed.
23-06-23 12:16:49.924 [INFO] fastkafka._components.aiokafka_consumer_loop: aiokafka_consumer_loop() starting...
23-06-23 12:16:49.925 [INFO] fastkafka._components.aiokafka_consumer_loop: aiokafka_consumer_loop(): Consumer created using the following parameters: {'auto_offset_reset': 'earliest', 'max_poll_records': 100, 'bootstrap_servers': 'server_1:9092'}
23-06-23 12:16:49.925 [INFO] fastkafka._testing.in_memory_broker: AIOKafkaConsumer patched start() called()
23-06-23 12:16:49.926 [INFO] fastkafka._components.aiokafka_consumer_loop: aiokafka_consumer_loop(): Consumer started.
23-06-23 12:16:49.926 [INFO] fastkafka._testing.in_memory_broker: AIOKafkaConsumer patched subscribe() called
23-06-23 12:16:49.926 [INFO] fastkafka._testing.in_memory_broker: AIOKafkaConsumer.subscribe(), subscribing to: ['predictions']
23-06-23 12:16:49.927 [INFO] fastkafka._components.aiokafka_consumer_loop: aiokafka_consumer_loop(): Consumer subscribed.
23-06-23 12:16:49.927 [INFO] fastkafka._components.aiokafka_consumer_loop: aiokafka_consumer_loop() starting...
23-06-23 12:16:49.928 [INFO] fastkafka._components.aiokafka_consumer_loop: aiokafka_consumer_loop(): Consumer created using the following parameters: {'auto_offset_reset': 'earliest', 'max_poll_records': 100, 'bootstrap_servers': 'server_2:9092'}
23-06-23 12:16:49.928 [INFO] fastkafka._testing.in_memory_broker: AIOKafkaConsumer patched start() called()
23-06-23 12:16:49.928 [INFO] fastkafka._components.aiokafka_consumer_loop: aiokafka_consumer_loop(): Consumer started.
23-06-23 12:16:49.929 [INFO] fastkafka._testing.in_memory_broker: AIOKafkaConsumer patched subscribe() called
23-06-23 12:16:49.929 [INFO] fastkafka._testing.in_memory_broker: AIOKafkaConsumer.subscribe(), subscribing to: ['predictions']
23-06-23 12:16:49.929 [INFO] fastkafka._components.aiokafka_consumer_loop: aiokafka_consumer_loop(): Consumer subscribed.
msg=TestMsg(msg='signal')
Sending prediction to s1: msg='prediction'
Sending prediction to s2: msg='prediction'
23-06-23 12:16:53.922 [INFO] fastkafka._testing.in_memory_broker: AIOKafkaConsumer patched stop() called
23-06-23 12:16:53.922 [INFO] fastkafka._components.aiokafka_consumer_loop: aiokafka_consumer_loop(): Consumer stopped.
23-06-23 12:16:53.923 [INFO] fastkafka._components.aiokafka_consumer_loop: aiokafka_consumer_loop() finished.
23-06-23 12:16:53.923 [INFO] fastkafka._testing.in_memory_broker: AIOKafkaConsumer patched stop() called
23-06-23 12:16:53.923 [INFO] fastkafka._components.aiokafka_consumer_loop: aiokafka_consumer_loop(): Consumer stopped.
23-06-23 12:16:53.924 [INFO] fastkafka._components.aiokafka_consumer_loop: aiokafka_consumer_loop() finished.
23-06-23 12:16:53.924 [INFO] fastkafka._testing.in_memory_broker: AIOKafkaProducer patched stop() called
23-06-23 12:16:53.925 [INFO] fastkafka._testing.in_memory_broker: AIOKafkaConsumer patched stop() called
23-06-23 12:16:53.925 [INFO] fastkafka._components.aiokafka_consumer_loop: aiokafka_consumer_loop(): Consumer stopped.
23-06-23 12:16:53.925 [INFO] fastkafka._components.aiokafka_consumer_loop: aiokafka_consumer_loop() finished.
23-06-23 12:16:53.926 [INFO] fastkafka._testing.in_memory_broker: AIOKafkaProducer patched stop() called
23-06-23 12:16:53.926 [INFO] fastkafka._testing.in_memory_broker: AIOKafkaProducer patched stop() called
23-06-23 12:16:53.926 [INFO] fastkafka._testing.in_memory_broker: InMemoryBroker stopping

Here’s how you can perform the necessary tests:

  1. Within an async with block, create an instance of the Tester by passing in your app object, representing your FastKafka application.

  2. Using the tester.to_preprocessed_signals method, you can send a TestMsg message with the content “signal".

  3. After sending the message, you can perform assertions on the to_predictions_1 and to_predictions_2 functions using tester.mirrors[to_predictions_1].assert_called(timeout=5) and tester.mirrors[to_predictions_2].assert_called(timeout=5). These assertions ensure that the respective producer functions have produced data to their respective topic/broker combinations.

By employing this testing approach, you can verify that the producing functions correctly send messages to their respective clusters. The testing framework provided by FastKafka enables you to ensure the accuracy and reliability of your application’s producing logic.