Skip to content
Advertisement

How to test a Faust agent that sends data to a sink?

I am trying to write unit tests using pytest for my Faust application. I have referred to the documentation here but it does not mention what to do when my Faust agent is sending data to a sink.

Without a sink, my tests are working fine but when I use a sink, I get this error:

RuntimeError: Task <Task pending name='Task-2' coro=<Agent._execute_actor() running at /Library/Frameworks/Python.framework/Versions/3.8/lib/python3.8/site-packages/faust/agents/agent.py:647> cb=[<TaskWakeupMethWrapper object at 0x7fc28967c5b0>()]> got Future <Future pending> attached to a different loop
INFO     faust.agents.agent:logging.py:265 [^-AgentTestWrapper: ml_exporter.processDetections]: Stopping...

I have tried various methods: such as patching out the decorator in my Faust app that sends the data to the sink, trying to test my function without the decorator (by trying to bypass it), patching out the sink parameter in my Faust app to have a None value (so it doesn’t send my data to the sink), etc. I have had no luck with any of these.

Here is my Faust agent:

app = faust.App('ml-exporter', broker=dx_broker, value_serializer='json')

detection_topic = app.topic(dx_topic)
graph_topic = app.topic(gwh_topic)

@app.agent(detection_topic, sink=[graph_topic])
async def processDetections(detections):
    detection_count = 0
    async for detection in detections:
        detection_count += 1
        # r.set("detection_count", detection_count)
        yield detection

Here is my current testing code:

import ml_exporter

patch('ml_exporter.graph_topic', None)

def create_app():
    return faust.App('ml-exporter', value_serializer='json')

@pytest.fixture()
def test_app(event_loop):
    app = create_app()
    app.finalize()
    app.flow_control.resume()
    return app

@pytest.mark.asyncio()
async def test_processDetections(test_app):
    async with ml_exporter.processDetections.test_context() as agent:
        event = await agent.put('hey')
        assert agent.results[event.message.offset] == 'hey'

I get the same error as mentioned above when I run this test. Is there any way to test my Faust app successfully?

Thank you!

Advertisement

Answer

Force pytest to use Faust’s asyncio event loop as the default global loop. Add the following fixture to your test code:

@pytest.mark.asyncio()
@pytest.fixture()
def event_loop():
    yield app.loop

As described in the pytest documentation:

The event_loop fixture can be easily overridden in any of the standard pytest locations (e.g. directly in the test file, or in conftest.py) to use a non-default event loop. If the pytest.mark.asyncio marker is applied, a pytest hook will ensure the produced loop is set as the default global loop.

User contributions licensed under: CC BY-SA
3 People found this is helpful
Advertisement