I am trying to write unit tests using pytest for my Faust application. I have referred to the documentation here but it does not mention what to do when my Faust agent is sending data to a sink.
Without a sink, my tests are working fine but when I use a sink, I get this error:
RuntimeError: Task <Task pending name='Task-2' coro=<Agent._execute_actor() running at /Library/Frameworks/Python.framework/Versions/3.8/lib/python3.8/site-packages/faust/agents/agent.py:647> cb=[<TaskWakeupMethWrapper object at 0x7fc28967c5b0>()]> got Future <Future pending> attached to a different loop INFO faust.agents.agent:logging.py:265 [^-AgentTestWrapper: ml_exporter.processDetections]: Stopping...
I have tried various methods: such as patching out the decorator in my Faust app that sends the data to the sink, trying to test my function without the decorator (by trying to bypass it), patching out the sink parameter in my Faust app to have a None value (so it doesn’t send my data to the sink), etc. I have had no luck with any of these.
Here is my Faust agent:
app = faust.App('ml-exporter', broker=dx_broker, value_serializer='json') detection_topic = app.topic(dx_topic) graph_topic = app.topic(gwh_topic) @app.agent(detection_topic, sink=[graph_topic]) async def processDetections(detections): detection_count = 0 async for detection in detections: detection_count += 1 # r.set("detection_count", detection_count) yield detection
Here is my current testing code:
import ml_exporter patch('ml_exporter.graph_topic', None) def create_app(): return faust.App('ml-exporter', value_serializer='json') @pytest.fixture() def test_app(event_loop): app = create_app() app.finalize() app.flow_control.resume() return app @pytest.mark.asyncio() async def test_processDetections(test_app): async with ml_exporter.processDetections.test_context() as agent: event = await agent.put('hey') assert agent.results[event.message.offset] == 'hey'
I get the same error as mentioned above when I run this test. Is there any way to test my Faust app successfully?
Thank you!
Advertisement
Answer
Force pytest to use Faust’s asyncio event loop as the default global loop. Add the following fixture to your test code:
@pytest.mark.asyncio() @pytest.fixture() def event_loop(): yield app.loop
As described in the pytest documentation:
The
event_loop
fixture can be easily overridden in any of the standard pytest locations (e.g. directly in the test file, or in conftest.py) to use a non-default event loop. If thepytest.mark.asyncio
marker is applied, a pytest hook will ensure the produced loop is set as the default global loop.