Skip to main content
Version: dev 🚧

Using Tester to test FastKafka

In order to speed up development and make testing easier, we have implemented the Tester class.

The Tester instance starts in-memory implementation of Kafka broker i.e. there is no need for starting localhost Kafka service for testing FastKafka apps. The Tester will redirect consumes and produces decorated functions to the in-memory Kafka broker so that you can quickly test FasKafka apps without the need of a running Kafka broker and all its dependencies. Also, for each FastKafka consumes and produces function, Tester will create it’s mirrored fuction i.e. if the consumes function is implemented, the Tester will create the produces function (and the other way - if the produces function is implemented, Tester will create consumes function).

Basic example​

To showcase the functionalities of FastKafka and illustrate the concepts discussed, we can use a simple test message called TestMsg. Here’s the definition of the TestMsg class:

class TestMsg(BaseModel):
msg: str = Field(...)


test_msg = TestMsg(msg="signal")

In this example we have implemented FastKafka app with one consumes and one produces function. on_input function consumes messages from the input topic and to_output function produces messages to the output topic.

Note: it is necessary to define parameter and return types in the produces and consumes functions

from pydantic import BaseModel, Field

app = FastKafka()


@app.consumes()
async def on_input(msg: TestMsg):
await to_output(TestMsg(msg=f"Hello {msg.msg}"))


@app.produces()
async def to_output(msg: TestMsg) -> TestMsg:
return msg

Testing the application​

In this example app has imlemented on_input and to_output functions. We can now use Tester to create their mirrored functions: to_input and on_output.

Testing process for this example could look like this:

  1. tester produces the message to the input topic

  2. Assert that the app consumed the message by calling on_input with the accurate argument

  3. Within on_input function, to_output function is called - and message is produced to the output topic

  4. Assert that the tester consumed the message by calling on_output with the accurate argument

async with Tester(app).using_inmemory_broker() as tester:
input_msg = TestMsg(msg="Mickey")

# tester produces message to the input topic
await tester.to_input(input_msg)
# previous line is equal to
# await tester.mirrors[app.on_input](input_msg)

# assert that app consumed from the input topic and it was called with the accurate argument
await app.awaited_mocks.on_input.assert_called_with(
TestMsg(msg="Mickey"), timeout=5
)
# assert that tester consumed from the output topic and it was called with the accurate argument
await tester.on_output.assert_called_with(TestMsg(msg="Hello Mickey"), timeout=5)
print("ok")
23-07-31 10:38:30.810 [INFO] fastkafka._testing.in_memory_broker: InMemoryBroker._patch_consumers_and_producers(): Patching consumers and producers!
23-07-31 10:38:30.811 [INFO] fastkafka._testing.in_memory_broker: InMemoryBroker starting
23-07-31 10:38:30.812 [INFO] fastkafka._application.app: _create_producer() : created producer using the config: '{'bootstrap_servers': 'localhost:9092'}'
23-07-31 10:38:30.812 [INFO] fastkafka._testing.in_memory_broker: AIOKafkaProducer patched start() called()
23-07-31 10:38:30.826 [INFO] fastkafka._application.app: _create_producer() : created producer using the config: '{'bootstrap_servers': 'localhost:9092'}'
23-07-31 10:38:30.827 [INFO] fastkafka._testing.in_memory_broker: AIOKafkaProducer patched start() called()
23-07-31 10:38:30.827 [INFO] fastkafka._components.aiokafka_consumer_loop: aiokafka_consumer_loop() starting...
23-07-31 10:38:30.828 [INFO] fastkafka._components.aiokafka_consumer_loop: aiokafka_consumer_loop(): Consumer created using the following parameters: {'auto_offset_reset': 'earliest', 'max_poll_records': 100, 'bootstrap_servers': 'localhost:9092'}
23-07-31 10:38:30.828 [INFO] fastkafka._testing.in_memory_broker: AIOKafkaConsumer patched start() called()
23-07-31 10:38:30.829 [INFO] fastkafka._components.aiokafka_consumer_loop: aiokafka_consumer_loop(): Consumer started.
23-07-31 10:38:30.829 [INFO] fastkafka._testing.in_memory_broker: AIOKafkaConsumer patched subscribe() called
23-07-31 10:38:30.830 [INFO] fastkafka._testing.in_memory_broker: AIOKafkaConsumer.subscribe(), subscribing to: ['input']
23-07-31 10:38:30.830 [INFO] fastkafka._components.aiokafka_consumer_loop: aiokafka_consumer_loop(): Consumer subscribed.
23-07-31 10:38:30.835 [INFO] fastkafka._components.aiokafka_consumer_loop: aiokafka_consumer_loop() starting...
23-07-31 10:38:30.835 [INFO] fastkafka._components.aiokafka_consumer_loop: aiokafka_consumer_loop(): Consumer created using the following parameters: {'auto_offset_reset': 'earliest', 'max_poll_records': 100, 'bootstrap_servers': 'localhost:9092'}
23-07-31 10:38:30.836 [INFO] fastkafka._testing.in_memory_broker: AIOKafkaConsumer patched start() called()
23-07-31 10:38:30.836 [INFO] fastkafka._components.aiokafka_consumer_loop: aiokafka_consumer_loop(): Consumer started.
23-07-31 10:38:30.836 [INFO] fastkafka._testing.in_memory_broker: AIOKafkaConsumer patched subscribe() called
23-07-31 10:38:30.837 [INFO] fastkafka._testing.in_memory_broker: AIOKafkaConsumer.subscribe(), subscribing to: ['output']
23-07-31 10:38:30.837 [INFO] fastkafka._components.aiokafka_consumer_loop: aiokafka_consumer_loop(): Consumer subscribed.
23-07-31 10:38:34.828 [INFO] fastkafka._testing.in_memory_broker: AIOKafkaConsumer patched stop() called
23-07-31 10:38:34.828 [INFO] fastkafka._components.aiokafka_consumer_loop: aiokafka_consumer_loop(): Consumer stopped.
23-07-31 10:38:34.829 [INFO] fastkafka._components.aiokafka_consumer_loop: aiokafka_consumer_loop() finished.
23-07-31 10:38:34.829 [INFO] fastkafka._testing.in_memory_broker: AIOKafkaProducer patched stop() called
23-07-31 10:38:34.830 [INFO] fastkafka._testing.in_memory_broker: AIOKafkaConsumer patched stop() called
23-07-31 10:38:34.831 [INFO] fastkafka._components.aiokafka_consumer_loop: aiokafka_consumer_loop(): Consumer stopped.
23-07-31 10:38:34.831 [INFO] fastkafka._components.aiokafka_consumer_loop: aiokafka_consumer_loop() finished.
23-07-31 10:38:34.832 [INFO] fastkafka._testing.in_memory_broker: AIOKafkaProducer patched stop() called
23-07-31 10:38:34.832 [INFO] fastkafka._testing.in_memory_broker: InMemoryBroker stopping
ok

Final script​

import asyncio
from fastkafka._application.app import FastKafka
from fastkafka._application.tester import Tester
from pydantic import BaseModel, Field


class TestMsg(BaseModel):
msg: str = Field(...)


app = FastKafka()


@app.consumes()
async def on_input(msg: TestMsg):
await to_output(TestMsg(msg=f"Hello {msg.msg}"))


@app.produces()
async def to_output(msg: TestMsg) -> TestMsg:
return msg


async def async_tests():
async with Tester(app).using_inmemory_broker() as tester:
input_msg = TestMsg(msg="Mickey")

# tester produces message to the input topic
await tester.to_input(input_msg)

# assert that app consumed from the input topic and it was called with the accurate argument
await app.awaited_mocks.on_input.assert_called_with(
TestMsg(msg="Mickey"), timeout=5
)
# assert that tester consumed from the output topic and it was called with the accurate argument
await tester.awaited_mocks.on_output.assert_called_with(
TestMsg(msg="Hello Mickey"), timeout=5
)
print("ok")


if __name__ == "__main__":
loop = asyncio.get_event_loop()
loop.run_until_complete(async_tests())
23-07-31 10:38:34.855 [INFO] fastkafka._testing.in_memory_broker: InMemoryBroker._patch_consumers_and_producers(): Patching consumers and producers!
23-07-31 10:38:34.856 [INFO] fastkafka._testing.in_memory_broker: InMemoryBroker starting
23-07-31 10:38:34.856 [INFO] fastkafka._application.app: _create_producer() : created producer using the config: '{'bootstrap_servers': 'localhost:9092'}'
23-07-31 10:38:34.857 [INFO] fastkafka._testing.in_memory_broker: AIOKafkaProducer patched start() called()
23-07-31 10:38:34.871 [INFO] fastkafka._application.app: _create_producer() : created producer using the config: '{'bootstrap_servers': 'localhost:9092'}'
23-07-31 10:38:34.872 [INFO] fastkafka._testing.in_memory_broker: AIOKafkaProducer patched start() called()
23-07-31 10:38:34.872 [INFO] fastkafka._components.aiokafka_consumer_loop: aiokafka_consumer_loop() starting...
23-07-31 10:38:34.873 [INFO] fastkafka._components.aiokafka_consumer_loop: aiokafka_consumer_loop(): Consumer created using the following parameters: {'auto_offset_reset': 'earliest', 'max_poll_records': 100, 'bootstrap_servers': 'localhost:9092'}
23-07-31 10:38:34.874 [INFO] fastkafka._testing.in_memory_broker: AIOKafkaConsumer patched start() called()
23-07-31 10:38:34.875 [INFO] fastkafka._components.aiokafka_consumer_loop: aiokafka_consumer_loop(): Consumer started.
23-07-31 10:38:34.877 [INFO] fastkafka._testing.in_memory_broker: AIOKafkaConsumer patched subscribe() called
23-07-31 10:38:34.877 [INFO] fastkafka._testing.in_memory_broker: AIOKafkaConsumer.subscribe(), subscribing to: ['input']
23-07-31 10:38:34.878 [INFO] fastkafka._components.aiokafka_consumer_loop: aiokafka_consumer_loop(): Consumer subscribed.
23-07-31 10:38:34.878 [INFO] fastkafka._components.aiokafka_consumer_loop: aiokafka_consumer_loop() starting...
23-07-31 10:38:34.879 [INFO] fastkafka._components.aiokafka_consumer_loop: aiokafka_consumer_loop(): Consumer created using the following parameters: {'auto_offset_reset': 'earliest', 'max_poll_records': 100, 'bootstrap_servers': 'localhost:9092'}
23-07-31 10:38:34.879 [INFO] fastkafka._testing.in_memory_broker: AIOKafkaConsumer patched start() called()
23-07-31 10:38:34.879 [INFO] fastkafka._components.aiokafka_consumer_loop: aiokafka_consumer_loop(): Consumer started.
23-07-31 10:38:34.880 [INFO] fastkafka._testing.in_memory_broker: AIOKafkaConsumer patched subscribe() called
23-07-31 10:38:34.880 [INFO] fastkafka._testing.in_memory_broker: AIOKafkaConsumer.subscribe(), subscribing to: ['output']
23-07-31 10:38:34.881 [INFO] fastkafka._components.aiokafka_consumer_loop: aiokafka_consumer_loop(): Consumer subscribed.
23-07-31 10:38:38.873 [INFO] fastkafka._testing.in_memory_broker: AIOKafkaConsumer patched stop() called
23-07-31 10:38:38.873 [INFO] fastkafka._components.aiokafka_consumer_loop: aiokafka_consumer_loop(): Consumer stopped.
23-07-31 10:38:38.874 [INFO] fastkafka._components.aiokafka_consumer_loop: aiokafka_consumer_loop() finished.
23-07-31 10:38:38.874 [INFO] fastkafka._testing.in_memory_broker: AIOKafkaProducer patched stop() called
23-07-31 10:38:38.875 [INFO] fastkafka._testing.in_memory_broker: AIOKafkaConsumer patched stop() called
23-07-31 10:38:38.876 [INFO] fastkafka._components.aiokafka_consumer_loop: aiokafka_consumer_loop(): Consumer stopped.
23-07-31 10:38:38.877 [INFO] fastkafka._components.aiokafka_consumer_loop: aiokafka_consumer_loop() finished.
23-07-31 10:38:38.877 [INFO] fastkafka._testing.in_memory_broker: AIOKafkaProducer patched stop() called
23-07-31 10:38:38.878 [INFO] fastkafka._testing.in_memory_broker: InMemoryBroker stopping
ok

Using external brokers​

If you have already running brokers e.g. kafka_brokers, you can use Tester method using_external_broker to set brokers which will be used in tests.

The same example as previous but with external kafka_brokers:

# content of the "application_test.py" file

import asyncio
from fastkafka._application.app import FastKafka
from fastkafka._application.tester import Tester
from pydantic import BaseModel, Field


class TestMsg(BaseModel):
msg: str = Field(...)


kafka_brokers = {
"localhost": {
"url": "localhost",
"description": "local development kafka broker",
"port": 9092,
},
"production": {
"url": "kafka.airt.ai",
"description": "production kafka broker",
"port": 9092,
"protocol": "kafka-secure",
"security": {"type": "plain"},
},
}

app = FastKafka(
title="Demo Kafka app",
kafka_brokers=kafka_brokers,
)

@app.consumes()
async def on_input(msg: TestMsg):
await to_output(TestMsg(msg=f"Hello {msg.msg}"))


@app.produces()
async def to_output(msg: TestMsg) -> TestMsg:
return msg


async def async_tests():
async with Tester(app).using_external_broker(bootstrap_servers_id="production") as tester:
input_msg = TestMsg(msg="Mickey")

# tester produces message to the input topic
await tester.to_input(input_msg)

# assert that app consumed from the input topic and it was called with the accurate argument
await app.awaited_mocks.on_input.assert_called_with(
TestMsg(msg="Mickey"), timeout=5
)
# assert that tester consumed from the output topic and it was called with the accurate argument
await tester.awaited_mocks.on_output.assert_called_with(
TestMsg(msg="Hello Mickey"), timeout=5
)
print("ok")


if __name__ == "__main__":
loop = asyncio.get_event_loop()
loop.run_until_complete(async_tests())

Example: New Employee app​

In this example, our app has one consumes and two produces functions.

Every time a company hires an Employee, some employee data is sent to the new_employee topic.

That’s when our application comes into play! The app consumes this data by calling on_new_employee. Within this function, to_employee_email and to_welcome_message functions are called - and messages are produced to the employee_email and welcome_message topic.

class Employee(BaseModel):
name: str
surname: str
email: Optional[str] = None


class EmaiMessage(BaseModel):
sender: str = "info@gmail.com"
receiver: str
subject: str
message: str

kafka_brokers = dict(localhost=[dict(url="server_1", port=9092)], production=[dict(url="production_server_1", port=9092)])
app = FastKafka(kafka_brokers=kafka_brokers)


@app.consumes()
async def on_new_employee(msg: Employee):
employee = await to_employee_email(msg)
await to_welcome_message(employee)


@app.produces()
async def to_employee_email(employee: Employee) -> Employee:
# generate new email
employee.email = employee.name + "." + employee.surname + "@gmail.com"
return employee


@app.produces()
async def to_welcome_message(employee: Employee) -> EmaiMessage:
message = f"Dear {employee.name},\nWelcome to the company"
return EmaiMessage(receiver=employee.email, subject="Welcome", message=message)

Testing the application​

In this example app has imlemented on_new_employee, to_employee_email and to_welcome_message functions. We can now use Tester to create their mirrored functions: to_new_employee, on_employee_email and on_welcome_message.

Testing process:

  1. tester produces message to the new_employee topic

  2. Assert that the app consumed the message from the new_employee topic with the accurate argument

  3. Within on_new_employee function, to_employee_email and to_welcome_message functions are called - and messages are produced to the employee_email and welcome_message topic

  4. Assert that the tester consumed the message by calling on_employee_email

  5. Assert that the tester consumed the message by calling on_welcome_message

assert app._kafka_config["bootstrap_servers_id"] == "localhost"

async with Tester(app).using_inmemory_broker(bootstrap_servers_id="production") as tester:
assert app._kafka_config["bootstrap_servers_id"] == "production"
assert tester._kafka_config["bootstrap_servers_id"] == "production"

# produce the message to new_employee topic
await tester.to_new_employee(Employee(name="Mickey", surname="Mouse"))
# previous line is equal to:
# await tester.mirrors[app.on_new_employee](Employee(name="Mickey", surname="Mouse"))

# Assert app consumed the message
await app.awaited_mocks.on_new_employee.assert_called_with(
Employee(name="Mickey", surname="Mouse"), timeout=5
)

# If the the previous assert is true (on_new_employee was called),
# to_employee_email and to_welcome_message were called inside on_new_employee function

# Now we can check if this two messages were consumed
await tester.awaited_mocks.on_employee_email.assert_called(timeout=5)
await tester.awaited_mocks.on_welcome_message.assert_called(timeout=5)

assert app._kafka_config["bootstrap_servers_id"] == "localhost"

print("ok")
23-07-31 10:38:40.069 [INFO] fastkafka._testing.in_memory_broker: InMemoryBroker._patch_consumers_and_producers(): Patching consumers and producers!
23-07-31 10:38:40.070 [INFO] fastkafka._testing.in_memory_broker: InMemoryBroker starting
23-07-31 10:38:40.070 [INFO] fastkafka._application.app: _create_producer() : created producer using the config: '{'bootstrap_servers': ['production_server_1:9092']}'
23-07-31 10:38:40.071 [INFO] fastkafka._testing.in_memory_broker: AIOKafkaProducer patched start() called()
23-07-31 10:38:40.071 [INFO] fastkafka._application.app: _create_producer() : created producer using the config: '{'bootstrap_servers': ['production_server_1:9092']}'
23-07-31 10:38:40.072 [INFO] fastkafka._testing.in_memory_broker: AIOKafkaProducer patched start() called()
23-07-31 10:38:40.091 [INFO] fastkafka._application.app: _create_producer() : created producer using the config: '{'bootstrap_servers': ['production_server_1:9092']}'
23-07-31 10:38:40.091 [INFO] fastkafka._testing.in_memory_broker: AIOKafkaProducer patched start() called()
23-07-31 10:38:40.092 [INFO] fastkafka._components.aiokafka_consumer_loop: aiokafka_consumer_loop() starting...
23-07-31 10:38:40.092 [INFO] fastkafka._components.aiokafka_consumer_loop: aiokafka_consumer_loop(): Consumer created using the following parameters: {'auto_offset_reset': 'earliest', 'max_poll_records': 100, 'bootstrap_servers': ['production_server_1:9092']}
23-07-31 10:38:40.093 [INFO] fastkafka._testing.in_memory_broker: AIOKafkaConsumer patched start() called()
23-07-31 10:38:40.093 [INFO] fastkafka._components.aiokafka_consumer_loop: aiokafka_consumer_loop(): Consumer started.
23-07-31 10:38:40.094 [INFO] fastkafka._testing.in_memory_broker: AIOKafkaConsumer patched subscribe() called
23-07-31 10:38:40.094 [INFO] fastkafka._testing.in_memory_broker: AIOKafkaConsumer.subscribe(), subscribing to: ['new_employee']
23-07-31 10:38:40.095 [INFO] fastkafka._components.aiokafka_consumer_loop: aiokafka_consumer_loop(): Consumer subscribed.
23-07-31 10:38:40.096 [INFO] fastkafka._components.aiokafka_consumer_loop: aiokafka_consumer_loop() starting...
23-07-31 10:38:40.097 [INFO] fastkafka._components.aiokafka_consumer_loop: aiokafka_consumer_loop(): Consumer created using the following parameters: {'auto_offset_reset': 'earliest', 'max_poll_records': 100, 'bootstrap_servers': ['production_server_1:9092']}
23-07-31 10:38:40.098 [INFO] fastkafka._testing.in_memory_broker: AIOKafkaConsumer patched start() called()
23-07-31 10:38:40.099 [INFO] fastkafka._components.aiokafka_consumer_loop: aiokafka_consumer_loop(): Consumer started.
23-07-31 10:38:40.099 [INFO] fastkafka._testing.in_memory_broker: AIOKafkaConsumer patched subscribe() called
23-07-31 10:38:40.100 [INFO] fastkafka._testing.in_memory_broker: AIOKafkaConsumer.subscribe(), subscribing to: ['employee_email']
23-07-31 10:38:40.100 [INFO] fastkafka._components.aiokafka_consumer_loop: aiokafka_consumer_loop(): Consumer subscribed.
23-07-31 10:38:40.101 [INFO] fastkafka._components.aiokafka_consumer_loop: aiokafka_consumer_loop() starting...
23-07-31 10:38:40.101 [INFO] fastkafka._components.aiokafka_consumer_loop: aiokafka_consumer_loop(): Consumer created using the following parameters: {'auto_offset_reset': 'earliest', 'max_poll_records': 100, 'bootstrap_servers': ['production_server_1:9092']}
23-07-31 10:38:40.102 [INFO] fastkafka._testing.in_memory_broker: AIOKafkaConsumer patched start() called()
23-07-31 10:38:40.103 [INFO] fastkafka._components.aiokafka_consumer_loop: aiokafka_consumer_loop(): Consumer started.
23-07-31 10:38:40.103 [INFO] fastkafka._testing.in_memory_broker: AIOKafkaConsumer patched subscribe() called
23-07-31 10:38:40.103 [INFO] fastkafka._testing.in_memory_broker: AIOKafkaConsumer.subscribe(), subscribing to: ['welcome_message']
23-07-31 10:38:40.104 [INFO] fastkafka._components.aiokafka_consumer_loop: aiokafka_consumer_loop(): Consumer subscribed.
23-07-31 10:38:44.092 [INFO] fastkafka._testing.in_memory_broker: AIOKafkaConsumer patched stop() called
23-07-31 10:38:44.093 [INFO] fastkafka._components.aiokafka_consumer_loop: aiokafka_consumer_loop(): Consumer stopped.
23-07-31 10:38:44.093 [INFO] fastkafka._components.aiokafka_consumer_loop: aiokafka_consumer_loop() finished.
23-07-31 10:38:44.094 [INFO] fastkafka._testing.in_memory_broker: AIOKafkaConsumer patched stop() called
23-07-31 10:38:44.094 [INFO] fastkafka._components.aiokafka_consumer_loop: aiokafka_consumer_loop(): Consumer stopped.
23-07-31 10:38:44.095 [INFO] fastkafka._components.aiokafka_consumer_loop: aiokafka_consumer_loop() finished.
23-07-31 10:38:44.095 [INFO] fastkafka._testing.in_memory_broker: AIOKafkaProducer patched stop() called
23-07-31 10:38:44.096 [INFO] fastkafka._testing.in_memory_broker: AIOKafkaConsumer patched stop() called
23-07-31 10:38:44.096 [INFO] fastkafka._components.aiokafka_consumer_loop: aiokafka_consumer_loop(): Consumer stopped.
23-07-31 10:38:44.096 [INFO] fastkafka._components.aiokafka_consumer_loop: aiokafka_consumer_loop() finished.
23-07-31 10:38:44.097 [INFO] fastkafka._testing.in_memory_broker: AIOKafkaProducer patched stop() called
23-07-31 10:38:44.097 [INFO] fastkafka._testing.in_memory_broker: AIOKafkaProducer patched stop() called
23-07-31 10:38:44.097 [INFO] fastkafka._testing.in_memory_broker: InMemoryBroker stopping
ok

Final script​

import asyncio
from fastkafka._application.app import FastKafka
from fastkafka._application.tester import Tester
from pydantic import BaseModel, Field
from typing import Optional


class Employee(BaseModel):
name: str
surname: str
email: Optional[str] = None


class EmaiMessage(BaseModel):
sender: str = "info@gmail.com"
receiver: str
subject: str
message: str


kafka_brokers = dict(localhost=[dict(url="server_1", port=9092)], production=[dict(url="production_server_1", port=9092)])
app = FastKafka(kafka_brokers=kafka_brokers)


@app.consumes()
async def on_new_employee(msg: Employee):
employee = await to_employee_email(msg)
await to_welcome_message(employee)


@app.produces()
async def to_employee_email(employee: Employee) -> Employee:
# generate new email
employee.email = employee.name + "." + employee.surname + "@gmail.com"
return employee


@app.produces()
async def to_welcome_message(employee: Employee) -> EmaiMessage:
message = f"Dear {employee.name},\nWelcome to the company"
return EmaiMessage(receiver=employee.email, subject="Welcome", message=message)


async def async_tests():
assert app._kafka_config["bootstrap_servers_id"] == "localhost"

async with Tester(app).using_inmemory_broker(bootstrap_servers_id="production") as tester:
assert app._kafka_config["bootstrap_servers_id"] == "production"
assert tester._kafka_config["bootstrap_servers_id"] == "production"

# produce the message to new_employee topic
await tester.to_new_employee(Employee(name="Mickey", surname="Mouse"))
# previous line is equal to:
# await tester.mirrors[app.on_new_employee](Employee(name="Mickey", surname="Mouse"))

# Assert app consumed the message
await app.awaited_mocks.on_new_employee.assert_called_with(
Employee(name="Mickey", surname="Mouse"), timeout=5
)

# If the the previous assert is true (on_new_employee was called),
# to_employee_email and to_welcome_message were called inside on_new_employee function

# Now we can check if this two messages were consumed
await tester.awaited_mocks.on_employee_email.assert_called(timeout=5)
await tester.awaited_mocks.on_welcome_message.assert_called(timeout=5)

assert app._kafka_config["bootstrap_servers_id"] == "localhost"
print("ok")


if __name__ == "__main__":
loop = asyncio.get_event_loop()
loop.run_until_complete(async_tests())
23-07-31 10:38:47.045 [INFO] fastkafka._testing.in_memory_broker: InMemoryBroker._patch_consumers_and_producers(): Patching consumers and producers!
23-07-31 10:38:47.046 [INFO] fastkafka._testing.in_memory_broker: InMemoryBroker starting
23-07-31 10:38:47.046 [INFO] fastkafka._application.app: _create_producer() : created producer using the config: '{'bootstrap_servers': ['production_server_1:9092']}'
23-07-31 10:38:47.047 [INFO] fastkafka._testing.in_memory_broker: AIOKafkaProducer patched start() called()
23-07-31 10:38:47.048 [INFO] fastkafka._application.app: _create_producer() : created producer using the config: '{'bootstrap_servers': ['production_server_1:9092']}'
23-07-31 10:38:47.048 [INFO] fastkafka._testing.in_memory_broker: AIOKafkaProducer patched start() called()
23-07-31 10:38:47.067 [INFO] fastkafka._application.app: _create_producer() : created producer using the config: '{'bootstrap_servers': ['production_server_1:9092']}'
23-07-31 10:38:47.067 [INFO] fastkafka._testing.in_memory_broker: AIOKafkaProducer patched start() called()
23-07-31 10:38:47.068 [INFO] fastkafka._components.aiokafka_consumer_loop: aiokafka_consumer_loop() starting...
23-07-31 10:38:47.070 [INFO] fastkafka._components.aiokafka_consumer_loop: aiokafka_consumer_loop(): Consumer created using the following parameters: {'auto_offset_reset': 'earliest', 'max_poll_records': 100, 'bootstrap_servers': ['production_server_1:9092']}
23-07-31 10:38:47.070 [INFO] fastkafka._testing.in_memory_broker: AIOKafkaConsumer patched start() called()
23-07-31 10:38:47.071 [INFO] fastkafka._components.aiokafka_consumer_loop: aiokafka_consumer_loop(): Consumer started.
23-07-31 10:38:47.071 [INFO] fastkafka._testing.in_memory_broker: AIOKafkaConsumer patched subscribe() called
23-07-31 10:38:47.072 [INFO] fastkafka._testing.in_memory_broker: AIOKafkaConsumer.subscribe(), subscribing to: ['new_employee']
23-07-31 10:38:47.072 [INFO] fastkafka._components.aiokafka_consumer_loop: aiokafka_consumer_loop(): Consumer subscribed.
23-07-31 10:38:47.072 [INFO] fastkafka._components.aiokafka_consumer_loop: aiokafka_consumer_loop() starting...
23-07-31 10:38:47.073 [INFO] fastkafka._components.aiokafka_consumer_loop: aiokafka_consumer_loop(): Consumer created using the following parameters: {'auto_offset_reset': 'earliest', 'max_poll_records': 100, 'bootstrap_servers': ['production_server_1:9092']}
23-07-31 10:38:47.074 [INFO] fastkafka._testing.in_memory_broker: AIOKafkaConsumer patched start() called()
23-07-31 10:38:47.074 [INFO] fastkafka._components.aiokafka_consumer_loop: aiokafka_consumer_loop(): Consumer started.
23-07-31 10:38:47.074 [INFO] fastkafka._testing.in_memory_broker: AIOKafkaConsumer patched subscribe() called
23-07-31 10:38:47.075 [INFO] fastkafka._testing.in_memory_broker: AIOKafkaConsumer.subscribe(), subscribing to: ['employee_email']
23-07-31 10:38:47.075 [INFO] fastkafka._components.aiokafka_consumer_loop: aiokafka_consumer_loop(): Consumer subscribed.
23-07-31 10:38:47.076 [INFO] fastkafka._components.aiokafka_consumer_loop: aiokafka_consumer_loop() starting...
23-07-31 10:38:47.076 [INFO] fastkafka._components.aiokafka_consumer_loop: aiokafka_consumer_loop(): Consumer created using the following parameters: {'auto_offset_reset': 'earliest', 'max_poll_records': 100, 'bootstrap_servers': ['production_server_1:9092']}
23-07-31 10:38:47.076 [INFO] fastkafka._testing.in_memory_broker: AIOKafkaConsumer patched start() called()
23-07-31 10:38:47.077 [INFO] fastkafka._components.aiokafka_consumer_loop: aiokafka_consumer_loop(): Consumer started.
23-07-31 10:38:47.077 [INFO] fastkafka._testing.in_memory_broker: AIOKafkaConsumer patched subscribe() called
23-07-31 10:38:47.078 [INFO] fastkafka._testing.in_memory_broker: AIOKafkaConsumer.subscribe(), subscribing to: ['welcome_message']
23-07-31 10:38:47.078 [INFO] fastkafka._components.aiokafka_consumer_loop: aiokafka_consumer_loop(): Consumer subscribed.
23-07-31 10:38:51.068 [INFO] fastkafka._testing.in_memory_broker: AIOKafkaConsumer patched stop() called
23-07-31 10:38:51.069 [INFO] fastkafka._components.aiokafka_consumer_loop: aiokafka_consumer_loop(): Consumer stopped.
23-07-31 10:38:51.069 [INFO] fastkafka._components.aiokafka_consumer_loop: aiokafka_consumer_loop() finished.
23-07-31 10:38:51.070 [INFO] fastkafka._testing.in_memory_broker: AIOKafkaConsumer patched stop() called
23-07-31 10:38:51.070 [INFO] fastkafka._components.aiokafka_consumer_loop: aiokafka_consumer_loop(): Consumer stopped.
23-07-31 10:38:51.071 [INFO] fastkafka._components.aiokafka_consumer_loop: aiokafka_consumer_loop() finished.
23-07-31 10:38:51.071 [INFO] fastkafka._testing.in_memory_broker: AIOKafkaProducer patched stop() called
23-07-31 10:38:51.072 [INFO] fastkafka._testing.in_memory_broker: AIOKafkaConsumer patched stop() called
23-07-31 10:38:51.072 [INFO] fastkafka._components.aiokafka_consumer_loop: aiokafka_consumer_loop(): Consumer stopped.
23-07-31 10:38:51.073 [INFO] fastkafka._components.aiokafka_consumer_loop: aiokafka_consumer_loop() finished.
23-07-31 10:38:51.073 [INFO] fastkafka._testing.in_memory_broker: AIOKafkaProducer patched stop() called
23-07-31 10:38:51.074 [INFO] fastkafka._testing.in_memory_broker: AIOKafkaProducer patched stop() called
23-07-31 10:38:51.074 [INFO] fastkafka._testing.in_memory_broker: InMemoryBroker stopping
ok