Using Tester to test FastKafka
In order to speed up development and make testing easier, we have implemented the Tester class.
The Tester instance starts
in-memory implementation of Kafka broker i.e. there is no need for
starting localhost Kafka service for testing FastKafka apps. The
Tester will redirect consumes
and produces
decorated functions to the in-memory Kafka broker so that
you can quickly test FasKafka apps without the need of a running Kafka
broker and all its dependencies. Also, for each FastKafka consumes
and
produces
function, Tester will
create it’s mirrored fuction i.e. if the consumes
function is
implemented, the Tester will
create the produces
function (and the other way - if the produces
function is implemented, Tester
will create consumes
function).
Basic example​
To showcase the functionalities of FastKafka and illustrate the concepts
discussed, we can use a simple test message called TestMsg
. Here’s the
definition of the TestMsg
class:
class TestMsg(BaseModel):
msg: str = Field(...)
test_msg = TestMsg(msg="signal")
In this example we have implemented
FastKafka
app with one consumes
and one produces
function. on_input
function
consumes messages from the input
topic and to_output
function
produces messages to the output
topic.
Note: it is necessary to define parameter and return types in the produces and consumes functions
from pydantic import BaseModel, Field
app = FastKafka()
@app.consumes()
async def on_input(msg: TestMsg):
await to_output(TestMsg(msg=f"Hello {msg.msg}"))
@app.produces()
async def to_output(msg: TestMsg) -> TestMsg:
return msg
Testing the application​
In this example app
has imlemented on_input
and to_output
functions. We can now use Tester
to create their mirrored functions: to_input
and on_output
.
Testing process for this example could look like this:
tester
produces the message to theinput
topicAssert that the
app
consumed the message by callingon_input
with the accurate argumentWithin
on_input
function,to_output
function is called - and message is produced to theoutput
topicAssert that the
tester
consumed the message by callingon_output
with the accurate argument
async with Tester(app).using_inmemory_broker() as tester:
input_msg = TestMsg(msg="Mickey")
# tester produces message to the input topic
await tester.to_input(input_msg)
# previous line is equal to
# await tester.mirrors[app.on_input](input_msg)
# assert that app consumed from the input topic and it was called with the accurate argument
await app.awaited_mocks.on_input.assert_called_with(
TestMsg(msg="Mickey"), timeout=5
)
# assert that tester consumed from the output topic and it was called with the accurate argument
await tester.on_output.assert_called_with(TestMsg(msg="Hello Mickey"), timeout=5)
print("ok")
23-07-31 10:38:30.810 [INFO] fastkafka._testing.in_memory_broker: InMemoryBroker._patch_consumers_and_producers(): Patching consumers and producers!
23-07-31 10:38:30.811 [INFO] fastkafka._testing.in_memory_broker: InMemoryBroker starting
23-07-31 10:38:30.812 [INFO] fastkafka._application.app: _create_producer() : created producer using the config: '{'bootstrap_servers': 'localhost:9092'}'
23-07-31 10:38:30.812 [INFO] fastkafka._testing.in_memory_broker: AIOKafkaProducer patched start() called()
23-07-31 10:38:30.826 [INFO] fastkafka._application.app: _create_producer() : created producer using the config: '{'bootstrap_servers': 'localhost:9092'}'
23-07-31 10:38:30.827 [INFO] fastkafka._testing.in_memory_broker: AIOKafkaProducer patched start() called()
23-07-31 10:38:30.827 [INFO] fastkafka._components.aiokafka_consumer_loop: aiokafka_consumer_loop() starting...
23-07-31 10:38:30.828 [INFO] fastkafka._components.aiokafka_consumer_loop: aiokafka_consumer_loop(): Consumer created using the following parameters: {'auto_offset_reset': 'earliest', 'max_poll_records': 100, 'bootstrap_servers': 'localhost:9092'}
23-07-31 10:38:30.828 [INFO] fastkafka._testing.in_memory_broker: AIOKafkaConsumer patched start() called()
23-07-31 10:38:30.829 [INFO] fastkafka._components.aiokafka_consumer_loop: aiokafka_consumer_loop(): Consumer started.
23-07-31 10:38:30.829 [INFO] fastkafka._testing.in_memory_broker: AIOKafkaConsumer patched subscribe() called
23-07-31 10:38:30.830 [INFO] fastkafka._testing.in_memory_broker: AIOKafkaConsumer.subscribe(), subscribing to: ['input']
23-07-31 10:38:30.830 [INFO] fastkafka._components.aiokafka_consumer_loop: aiokafka_consumer_loop(): Consumer subscribed.
23-07-31 10:38:30.835 [INFO] fastkafka._components.aiokafka_consumer_loop: aiokafka_consumer_loop() starting...
23-07-31 10:38:30.835 [INFO] fastkafka._components.aiokafka_consumer_loop: aiokafka_consumer_loop(): Consumer created using the following parameters: {'auto_offset_reset': 'earliest', 'max_poll_records': 100, 'bootstrap_servers': 'localhost:9092'}
23-07-31 10:38:30.836 [INFO] fastkafka._testing.in_memory_broker: AIOKafkaConsumer patched start() called()
23-07-31 10:38:30.836 [INFO] fastkafka._components.aiokafka_consumer_loop: aiokafka_consumer_loop(): Consumer started.
23-07-31 10:38:30.836 [INFO] fastkafka._testing.in_memory_broker: AIOKafkaConsumer patched subscribe() called
23-07-31 10:38:30.837 [INFO] fastkafka._testing.in_memory_broker: AIOKafkaConsumer.subscribe(), subscribing to: ['output']
23-07-31 10:38:30.837 [INFO] fastkafka._components.aiokafka_consumer_loop: aiokafka_consumer_loop(): Consumer subscribed.
23-07-31 10:38:34.828 [INFO] fastkafka._testing.in_memory_broker: AIOKafkaConsumer patched stop() called
23-07-31 10:38:34.828 [INFO] fastkafka._components.aiokafka_consumer_loop: aiokafka_consumer_loop(): Consumer stopped.
23-07-31 10:38:34.829 [INFO] fastkafka._components.aiokafka_consumer_loop: aiokafka_consumer_loop() finished.
23-07-31 10:38:34.829 [INFO] fastkafka._testing.in_memory_broker: AIOKafkaProducer patched stop() called
23-07-31 10:38:34.830 [INFO] fastkafka._testing.in_memory_broker: AIOKafkaConsumer patched stop() called
23-07-31 10:38:34.831 [INFO] fastkafka._components.aiokafka_consumer_loop: aiokafka_consumer_loop(): Consumer stopped.
23-07-31 10:38:34.831 [INFO] fastkafka._components.aiokafka_consumer_loop: aiokafka_consumer_loop() finished.
23-07-31 10:38:34.832 [INFO] fastkafka._testing.in_memory_broker: AIOKafkaProducer patched stop() called
23-07-31 10:38:34.832 [INFO] fastkafka._testing.in_memory_broker: InMemoryBroker stopping
ok
Final script​
import asyncio
from fastkafka._application.app import FastKafka
from fastkafka._application.tester import Tester
from pydantic import BaseModel, Field
class TestMsg(BaseModel):
msg: str = Field(...)
app = FastKafka()
@app.consumes()
async def on_input(msg: TestMsg):
await to_output(TestMsg(msg=f"Hello {msg.msg}"))
@app.produces()
async def to_output(msg: TestMsg) -> TestMsg:
return msg
async def async_tests():
async with Tester(app).using_inmemory_broker() as tester:
input_msg = TestMsg(msg="Mickey")
# tester produces message to the input topic
await tester.to_input(input_msg)
# assert that app consumed from the input topic and it was called with the accurate argument
await app.awaited_mocks.on_input.assert_called_with(
TestMsg(msg="Mickey"), timeout=5
)
# assert that tester consumed from the output topic and it was called with the accurate argument
await tester.awaited_mocks.on_output.assert_called_with(
TestMsg(msg="Hello Mickey"), timeout=5
)
print("ok")
if __name__ == "__main__":
loop = asyncio.get_event_loop()
loop.run_until_complete(async_tests())
23-07-31 10:38:34.855 [INFO] fastkafka._testing.in_memory_broker: InMemoryBroker._patch_consumers_and_producers(): Patching consumers and producers!
23-07-31 10:38:34.856 [INFO] fastkafka._testing.in_memory_broker: InMemoryBroker starting
23-07-31 10:38:34.856 [INFO] fastkafka._application.app: _create_producer() : created producer using the config: '{'bootstrap_servers': 'localhost:9092'}'
23-07-31 10:38:34.857 [INFO] fastkafka._testing.in_memory_broker: AIOKafkaProducer patched start() called()
23-07-31 10:38:34.871 [INFO] fastkafka._application.app: _create_producer() : created producer using the config: '{'bootstrap_servers': 'localhost:9092'}'
23-07-31 10:38:34.872 [INFO] fastkafka._testing.in_memory_broker: AIOKafkaProducer patched start() called()
23-07-31 10:38:34.872 [INFO] fastkafka._components.aiokafka_consumer_loop: aiokafka_consumer_loop() starting...
23-07-31 10:38:34.873 [INFO] fastkafka._components.aiokafka_consumer_loop: aiokafka_consumer_loop(): Consumer created using the following parameters: {'auto_offset_reset': 'earliest', 'max_poll_records': 100, 'bootstrap_servers': 'localhost:9092'}
23-07-31 10:38:34.874 [INFO] fastkafka._testing.in_memory_broker: AIOKafkaConsumer patched start() called()
23-07-31 10:38:34.875 [INFO] fastkafka._components.aiokafka_consumer_loop: aiokafka_consumer_loop(): Consumer started.
23-07-31 10:38:34.877 [INFO] fastkafka._testing.in_memory_broker: AIOKafkaConsumer patched subscribe() called
23-07-31 10:38:34.877 [INFO] fastkafka._testing.in_memory_broker: AIOKafkaConsumer.subscribe(), subscribing to: ['input']
23-07-31 10:38:34.878 [INFO] fastkafka._components.aiokafka_consumer_loop: aiokafka_consumer_loop(): Consumer subscribed.
23-07-31 10:38:34.878 [INFO] fastkafka._components.aiokafka_consumer_loop: aiokafka_consumer_loop() starting...
23-07-31 10:38:34.879 [INFO] fastkafka._components.aiokafka_consumer_loop: aiokafka_consumer_loop(): Consumer created using the following parameters: {'auto_offset_reset': 'earliest', 'max_poll_records': 100, 'bootstrap_servers': 'localhost:9092'}
23-07-31 10:38:34.879 [INFO] fastkafka._testing.in_memory_broker: AIOKafkaConsumer patched start() called()
23-07-31 10:38:34.879 [INFO] fastkafka._components.aiokafka_consumer_loop: aiokafka_consumer_loop(): Consumer started.
23-07-31 10:38:34.880 [INFO] fastkafka._testing.in_memory_broker: AIOKafkaConsumer patched subscribe() called
23-07-31 10:38:34.880 [INFO] fastkafka._testing.in_memory_broker: AIOKafkaConsumer.subscribe(), subscribing to: ['output']
23-07-31 10:38:34.881 [INFO] fastkafka._components.aiokafka_consumer_loop: aiokafka_consumer_loop(): Consumer subscribed.
23-07-31 10:38:38.873 [INFO] fastkafka._testing.in_memory_broker: AIOKafkaConsumer patched stop() called
23-07-31 10:38:38.873 [INFO] fastkafka._components.aiokafka_consumer_loop: aiokafka_consumer_loop(): Consumer stopped.
23-07-31 10:38:38.874 [INFO] fastkafka._components.aiokafka_consumer_loop: aiokafka_consumer_loop() finished.
23-07-31 10:38:38.874 [INFO] fastkafka._testing.in_memory_broker: AIOKafkaProducer patched stop() called
23-07-31 10:38:38.875 [INFO] fastkafka._testing.in_memory_broker: AIOKafkaConsumer patched stop() called
23-07-31 10:38:38.876 [INFO] fastkafka._components.aiokafka_consumer_loop: aiokafka_consumer_loop(): Consumer stopped.
23-07-31 10:38:38.877 [INFO] fastkafka._components.aiokafka_consumer_loop: aiokafka_consumer_loop() finished.
23-07-31 10:38:38.877 [INFO] fastkafka._testing.in_memory_broker: AIOKafkaProducer patched stop() called
23-07-31 10:38:38.878 [INFO] fastkafka._testing.in_memory_broker: InMemoryBroker stopping
ok
Using external brokers​
If you have already running brokers e.g. kafka_brokers
, you can use
Tester method
using_external_broker
to set brokers which will be used in tests.
The same example as previous but with external kafka_brokers
:
# content of the "application_test.py" file
import asyncio
from fastkafka._application.app import FastKafka
from fastkafka._application.tester import Tester
from pydantic import BaseModel, Field
class TestMsg(BaseModel):
msg: str = Field(...)
kafka_brokers = {
"localhost": {
"url": "localhost",
"description": "local development kafka broker",
"port": 9092,
},
"production": {
"url": "kafka.airt.ai",
"description": "production kafka broker",
"port": 9092,
"protocol": "kafka-secure",
"security": {"type": "plain"},
},
}
app = FastKafka(
title="Demo Kafka app",
kafka_brokers=kafka_brokers,
)
@app.consumes()
async def on_input(msg: TestMsg):
await to_output(TestMsg(msg=f"Hello {msg.msg}"))
@app.produces()
async def to_output(msg: TestMsg) -> TestMsg:
return msg
async def async_tests():
async with Tester(app).using_external_broker(bootstrap_servers_id="production") as tester:
input_msg = TestMsg(msg="Mickey")
# tester produces message to the input topic
await tester.to_input(input_msg)
# assert that app consumed from the input topic and it was called with the accurate argument
await app.awaited_mocks.on_input.assert_called_with(
TestMsg(msg="Mickey"), timeout=5
)
# assert that tester consumed from the output topic and it was called with the accurate argument
await tester.awaited_mocks.on_output.assert_called_with(
TestMsg(msg="Hello Mickey"), timeout=5
)
print("ok")
if __name__ == "__main__":
loop = asyncio.get_event_loop()
loop.run_until_complete(async_tests())
Example: New Employee app​
In this example, our app
has one consumes and two produces functions.
Every time a company hires an Employee
, some employee data is sent to
the new_employee
topic.
That’s when our application comes into play! The app consumes this data
by calling on_new_employee
. Within this function, to_employee_email
and to_welcome_message
functions are called - and messages are
produced to the employee_email
and welcome_message
topic.
class Employee(BaseModel):
name: str
surname: str
email: Optional[str] = None
class EmaiMessage(BaseModel):
sender: str = "info@gmail.com"
receiver: str
subject: str
message: str
kafka_brokers = dict(localhost=[dict(url="server_1", port=9092)], production=[dict(url="production_server_1", port=9092)])
app = FastKafka(kafka_brokers=kafka_brokers)
@app.consumes()
async def on_new_employee(msg: Employee):
employee = await to_employee_email(msg)
await to_welcome_message(employee)
@app.produces()
async def to_employee_email(employee: Employee) -> Employee:
# generate new email
employee.email = employee.name + "." + employee.surname + "@gmail.com"
return employee
@app.produces()
async def to_welcome_message(employee: Employee) -> EmaiMessage:
message = f"Dear {employee.name},\nWelcome to the company"
return EmaiMessage(receiver=employee.email, subject="Welcome", message=message)
Testing the application​
In this example app
has imlemented on_new_employee
,
to_employee_email
and to_welcome_message
functions. We can now use
Tester to create their mirrored
functions: to_new_employee
, on_employee_email
and
on_welcome_message
.
Testing process:
tester
produces message to thenew_employee
topicAssert that the
app
consumed the message from thenew_employee
topic with the accurate argumentWithin
on_new_employee
function,to_employee_email
andto_welcome_message
functions are called - and messages are produced to theemployee_email
andwelcome_message
topicAssert that the
tester
consumed the message by callingon_employee_email
Assert that the
tester
consumed the message by callingon_welcome_message
assert app._kafka_config["bootstrap_servers_id"] == "localhost"
async with Tester(app).using_inmemory_broker(bootstrap_servers_id="production") as tester:
assert app._kafka_config["bootstrap_servers_id"] == "production"
assert tester._kafka_config["bootstrap_servers_id"] == "production"
# produce the message to new_employee topic
await tester.to_new_employee(Employee(name="Mickey", surname="Mouse"))
# previous line is equal to:
# await tester.mirrors[app.on_new_employee](Employee(name="Mickey", surname="Mouse"))
# Assert app consumed the message
await app.awaited_mocks.on_new_employee.assert_called_with(
Employee(name="Mickey", surname="Mouse"), timeout=5
)
# If the the previous assert is true (on_new_employee was called),
# to_employee_email and to_welcome_message were called inside on_new_employee function
# Now we can check if this two messages were consumed
await tester.awaited_mocks.on_employee_email.assert_called(timeout=5)
await tester.awaited_mocks.on_welcome_message.assert_called(timeout=5)
assert app._kafka_config["bootstrap_servers_id"] == "localhost"
print("ok")
23-07-31 10:38:40.069 [INFO] fastkafka._testing.in_memory_broker: InMemoryBroker._patch_consumers_and_producers(): Patching consumers and producers!
23-07-31 10:38:40.070 [INFO] fastkafka._testing.in_memory_broker: InMemoryBroker starting
23-07-31 10:38:40.070 [INFO] fastkafka._application.app: _create_producer() : created producer using the config: '{'bootstrap_servers': ['production_server_1:9092']}'
23-07-31 10:38:40.071 [INFO] fastkafka._testing.in_memory_broker: AIOKafkaProducer patched start() called()
23-07-31 10:38:40.071 [INFO] fastkafka._application.app: _create_producer() : created producer using the config: '{'bootstrap_servers': ['production_server_1:9092']}'
23-07-31 10:38:40.072 [INFO] fastkafka._testing.in_memory_broker: AIOKafkaProducer patched start() called()
23-07-31 10:38:40.091 [INFO] fastkafka._application.app: _create_producer() : created producer using the config: '{'bootstrap_servers': ['production_server_1:9092']}'
23-07-31 10:38:40.091 [INFO] fastkafka._testing.in_memory_broker: AIOKafkaProducer patched start() called()
23-07-31 10:38:40.092 [INFO] fastkafka._components.aiokafka_consumer_loop: aiokafka_consumer_loop() starting...
23-07-31 10:38:40.092 [INFO] fastkafka._components.aiokafka_consumer_loop: aiokafka_consumer_loop(): Consumer created using the following parameters: {'auto_offset_reset': 'earliest', 'max_poll_records': 100, 'bootstrap_servers': ['production_server_1:9092']}
23-07-31 10:38:40.093 [INFO] fastkafka._testing.in_memory_broker: AIOKafkaConsumer patched start() called()
23-07-31 10:38:40.093 [INFO] fastkafka._components.aiokafka_consumer_loop: aiokafka_consumer_loop(): Consumer started.
23-07-31 10:38:40.094 [INFO] fastkafka._testing.in_memory_broker: AIOKafkaConsumer patched subscribe() called
23-07-31 10:38:40.094 [INFO] fastkafka._testing.in_memory_broker: AIOKafkaConsumer.subscribe(), subscribing to: ['new_employee']
23-07-31 10:38:40.095 [INFO] fastkafka._components.aiokafka_consumer_loop: aiokafka_consumer_loop(): Consumer subscribed.
23-07-31 10:38:40.096 [INFO] fastkafka._components.aiokafka_consumer_loop: aiokafka_consumer_loop() starting...
23-07-31 10:38:40.097 [INFO] fastkafka._components.aiokafka_consumer_loop: aiokafka_consumer_loop(): Consumer created using the following parameters: {'auto_offset_reset': 'earliest', 'max_poll_records': 100, 'bootstrap_servers': ['production_server_1:9092']}
23-07-31 10:38:40.098 [INFO] fastkafka._testing.in_memory_broker: AIOKafkaConsumer patched start() called()
23-07-31 10:38:40.099 [INFO] fastkafka._components.aiokafka_consumer_loop: aiokafka_consumer_loop(): Consumer started.
23-07-31 10:38:40.099 [INFO] fastkafka._testing.in_memory_broker: AIOKafkaConsumer patched subscribe() called
23-07-31 10:38:40.100 [INFO] fastkafka._testing.in_memory_broker: AIOKafkaConsumer.subscribe(), subscribing to: ['employee_email']
23-07-31 10:38:40.100 [INFO] fastkafka._components.aiokafka_consumer_loop: aiokafka_consumer_loop(): Consumer subscribed.
23-07-31 10:38:40.101 [INFO] fastkafka._components.aiokafka_consumer_loop: aiokafka_consumer_loop() starting...
23-07-31 10:38:40.101 [INFO] fastkafka._components.aiokafka_consumer_loop: aiokafka_consumer_loop(): Consumer created using the following parameters: {'auto_offset_reset': 'earliest', 'max_poll_records': 100, 'bootstrap_servers': ['production_server_1:9092']}
23-07-31 10:38:40.102 [INFO] fastkafka._testing.in_memory_broker: AIOKafkaConsumer patched start() called()
23-07-31 10:38:40.103 [INFO] fastkafka._components.aiokafka_consumer_loop: aiokafka_consumer_loop(): Consumer started.
23-07-31 10:38:40.103 [INFO] fastkafka._testing.in_memory_broker: AIOKafkaConsumer patched subscribe() called
23-07-31 10:38:40.103 [INFO] fastkafka._testing.in_memory_broker: AIOKafkaConsumer.subscribe(), subscribing to: ['welcome_message']
23-07-31 10:38:40.104 [INFO] fastkafka._components.aiokafka_consumer_loop: aiokafka_consumer_loop(): Consumer subscribed.
23-07-31 10:38:44.092 [INFO] fastkafka._testing.in_memory_broker: AIOKafkaConsumer patched stop() called
23-07-31 10:38:44.093 [INFO] fastkafka._components.aiokafka_consumer_loop: aiokafka_consumer_loop(): Consumer stopped.
23-07-31 10:38:44.093 [INFO] fastkafka._components.aiokafka_consumer_loop: aiokafka_consumer_loop() finished.
23-07-31 10:38:44.094 [INFO] fastkafka._testing.in_memory_broker: AIOKafkaConsumer patched stop() called
23-07-31 10:38:44.094 [INFO] fastkafka._components.aiokafka_consumer_loop: aiokafka_consumer_loop(): Consumer stopped.
23-07-31 10:38:44.095 [INFO] fastkafka._components.aiokafka_consumer_loop: aiokafka_consumer_loop() finished.
23-07-31 10:38:44.095 [INFO] fastkafka._testing.in_memory_broker: AIOKafkaProducer patched stop() called
23-07-31 10:38:44.096 [INFO] fastkafka._testing.in_memory_broker: AIOKafkaConsumer patched stop() called
23-07-31 10:38:44.096 [INFO] fastkafka._components.aiokafka_consumer_loop: aiokafka_consumer_loop(): Consumer stopped.
23-07-31 10:38:44.096 [INFO] fastkafka._components.aiokafka_consumer_loop: aiokafka_consumer_loop() finished.
23-07-31 10:38:44.097 [INFO] fastkafka._testing.in_memory_broker: AIOKafkaProducer patched stop() called
23-07-31 10:38:44.097 [INFO] fastkafka._testing.in_memory_broker: AIOKafkaProducer patched stop() called
23-07-31 10:38:44.097 [INFO] fastkafka._testing.in_memory_broker: InMemoryBroker stopping
ok
Final script​
import asyncio
from fastkafka._application.app import FastKafka
from fastkafka._application.tester import Tester
from pydantic import BaseModel, Field
from typing import Optional
class Employee(BaseModel):
name: str
surname: str
email: Optional[str] = None
class EmaiMessage(BaseModel):
sender: str = "info@gmail.com"
receiver: str
subject: str
message: str
kafka_brokers = dict(localhost=[dict(url="server_1", port=9092)], production=[dict(url="production_server_1", port=9092)])
app = FastKafka(kafka_brokers=kafka_brokers)
@app.consumes()
async def on_new_employee(msg: Employee):
employee = await to_employee_email(msg)
await to_welcome_message(employee)
@app.produces()
async def to_employee_email(employee: Employee) -> Employee:
# generate new email
employee.email = employee.name + "." + employee.surname + "@gmail.com"
return employee
@app.produces()
async def to_welcome_message(employee: Employee) -> EmaiMessage:
message = f"Dear {employee.name},\nWelcome to the company"
return EmaiMessage(receiver=employee.email, subject="Welcome", message=message)
async def async_tests():
assert app._kafka_config["bootstrap_servers_id"] == "localhost"
async with Tester(app).using_inmemory_broker(bootstrap_servers_id="production") as tester:
assert app._kafka_config["bootstrap_servers_id"] == "production"
assert tester._kafka_config["bootstrap_servers_id"] == "production"
# produce the message to new_employee topic
await tester.to_new_employee(Employee(name="Mickey", surname="Mouse"))
# previous line is equal to:
# await tester.mirrors[app.on_new_employee](Employee(name="Mickey", surname="Mouse"))
# Assert app consumed the message
await app.awaited_mocks.on_new_employee.assert_called_with(
Employee(name="Mickey", surname="Mouse"), timeout=5
)
# If the the previous assert is true (on_new_employee was called),
# to_employee_email and to_welcome_message were called inside on_new_employee function
# Now we can check if this two messages were consumed
await tester.awaited_mocks.on_employee_email.assert_called(timeout=5)
await tester.awaited_mocks.on_welcome_message.assert_called(timeout=5)
assert app._kafka_config["bootstrap_servers_id"] == "localhost"
print("ok")
if __name__ == "__main__":
loop = asyncio.get_event_loop()
loop.run_until_complete(async_tests())
23-07-31 10:38:47.045 [INFO] fastkafka._testing.in_memory_broker: InMemoryBroker._patch_consumers_and_producers(): Patching consumers and producers!
23-07-31 10:38:47.046 [INFO] fastkafka._testing.in_memory_broker: InMemoryBroker starting
23-07-31 10:38:47.046 [INFO] fastkafka._application.app: _create_producer() : created producer using the config: '{'bootstrap_servers': ['production_server_1:9092']}'
23-07-31 10:38:47.047 [INFO] fastkafka._testing.in_memory_broker: AIOKafkaProducer patched start() called()
23-07-31 10:38:47.048 [INFO] fastkafka._application.app: _create_producer() : created producer using the config: '{'bootstrap_servers': ['production_server_1:9092']}'
23-07-31 10:38:47.048 [INFO] fastkafka._testing.in_memory_broker: AIOKafkaProducer patched start() called()
23-07-31 10:38:47.067 [INFO] fastkafka._application.app: _create_producer() : created producer using the config: '{'bootstrap_servers': ['production_server_1:9092']}'
23-07-31 10:38:47.067 [INFO] fastkafka._testing.in_memory_broker: AIOKafkaProducer patched start() called()
23-07-31 10:38:47.068 [INFO] fastkafka._components.aiokafka_consumer_loop: aiokafka_consumer_loop() starting...
23-07-31 10:38:47.070 [INFO] fastkafka._components.aiokafka_consumer_loop: aiokafka_consumer_loop(): Consumer created using the following parameters: {'auto_offset_reset': 'earliest', 'max_poll_records': 100, 'bootstrap_servers': ['production_server_1:9092']}
23-07-31 10:38:47.070 [INFO] fastkafka._testing.in_memory_broker: AIOKafkaConsumer patched start() called()
23-07-31 10:38:47.071 [INFO] fastkafka._components.aiokafka_consumer_loop: aiokafka_consumer_loop(): Consumer started.
23-07-31 10:38:47.071 [INFO] fastkafka._testing.in_memory_broker: AIOKafkaConsumer patched subscribe() called
23-07-31 10:38:47.072 [INFO] fastkafka._testing.in_memory_broker: AIOKafkaConsumer.subscribe(), subscribing to: ['new_employee']
23-07-31 10:38:47.072 [INFO] fastkafka._components.aiokafka_consumer_loop: aiokafka_consumer_loop(): Consumer subscribed.
23-07-31 10:38:47.072 [INFO] fastkafka._components.aiokafka_consumer_loop: aiokafka_consumer_loop() starting...
23-07-31 10:38:47.073 [INFO] fastkafka._components.aiokafka_consumer_loop: aiokafka_consumer_loop(): Consumer created using the following parameters: {'auto_offset_reset': 'earliest', 'max_poll_records': 100, 'bootstrap_servers': ['production_server_1:9092']}
23-07-31 10:38:47.074 [INFO] fastkafka._testing.in_memory_broker: AIOKafkaConsumer patched start() called()
23-07-31 10:38:47.074 [INFO] fastkafka._components.aiokafka_consumer_loop: aiokafka_consumer_loop(): Consumer started.
23-07-31 10:38:47.074 [INFO] fastkafka._testing.in_memory_broker: AIOKafkaConsumer patched subscribe() called
23-07-31 10:38:47.075 [INFO] fastkafka._testing.in_memory_broker: AIOKafkaConsumer.subscribe(), subscribing to: ['employee_email']
23-07-31 10:38:47.075 [INFO] fastkafka._components.aiokafka_consumer_loop: aiokafka_consumer_loop(): Consumer subscribed.
23-07-31 10:38:47.076 [INFO] fastkafka._components.aiokafka_consumer_loop: aiokafka_consumer_loop() starting...
23-07-31 10:38:47.076 [INFO] fastkafka._components.aiokafka_consumer_loop: aiokafka_consumer_loop(): Consumer created using the following parameters: {'auto_offset_reset': 'earliest', 'max_poll_records': 100, 'bootstrap_servers': ['production_server_1:9092']}
23-07-31 10:38:47.076 [INFO] fastkafka._testing.in_memory_broker: AIOKafkaConsumer patched start() called()
23-07-31 10:38:47.077 [INFO] fastkafka._components.aiokafka_consumer_loop: aiokafka_consumer_loop(): Consumer started.
23-07-31 10:38:47.077 [INFO] fastkafka._testing.in_memory_broker: AIOKafkaConsumer patched subscribe() called
23-07-31 10:38:47.078 [INFO] fastkafka._testing.in_memory_broker: AIOKafkaConsumer.subscribe(), subscribing to: ['welcome_message']
23-07-31 10:38:47.078 [INFO] fastkafka._components.aiokafka_consumer_loop: aiokafka_consumer_loop(): Consumer subscribed.
23-07-31 10:38:51.068 [INFO] fastkafka._testing.in_memory_broker: AIOKafkaConsumer patched stop() called
23-07-31 10:38:51.069 [INFO] fastkafka._components.aiokafka_consumer_loop: aiokafka_consumer_loop(): Consumer stopped.
23-07-31 10:38:51.069 [INFO] fastkafka._components.aiokafka_consumer_loop: aiokafka_consumer_loop() finished.
23-07-31 10:38:51.070 [INFO] fastkafka._testing.in_memory_broker: AIOKafkaConsumer patched stop() called
23-07-31 10:38:51.070 [INFO] fastkafka._components.aiokafka_consumer_loop: aiokafka_consumer_loop(): Consumer stopped.
23-07-31 10:38:51.071 [INFO] fastkafka._components.aiokafka_consumer_loop: aiokafka_consumer_loop() finished.
23-07-31 10:38:51.071 [INFO] fastkafka._testing.in_memory_broker: AIOKafkaProducer patched stop() called
23-07-31 10:38:51.072 [INFO] fastkafka._testing.in_memory_broker: AIOKafkaConsumer patched stop() called
23-07-31 10:38:51.072 [INFO] fastkafka._components.aiokafka_consumer_loop: aiokafka_consumer_loop(): Consumer stopped.
23-07-31 10:38:51.073 [INFO] fastkafka._components.aiokafka_consumer_loop: aiokafka_consumer_loop() finished.
23-07-31 10:38:51.073 [INFO] fastkafka._testing.in_memory_broker: AIOKafkaProducer patched stop() called
23-07-31 10:38:51.074 [INFO] fastkafka._testing.in_memory_broker: AIOKafkaProducer patched stop() called
23-07-31 10:38:51.074 [INFO] fastkafka._testing.in_memory_broker: InMemoryBroker stopping
ok