Skip to content
Advertisement

asyncio.sleep required after cancelling tasks?

To test a man-in-the-middle tcp proxy I have coded an echo tcp server and a tcp client. After each one of the tests I want the proxy and the server to go down, to make sure each test starts on a clean environment, so I have coded:

class TestConnections(TestCase):

    loop = None

    @classmethod
    def setUpClass(cls) -> None:
        TestConnections.loop = asyncio.new_event_loop()
        asyncio.set_event_loop(TestConnections.loop)

    def setUp(self) -> None:
        EnergyAgent.GP = GrowattParser.GrowattParser()
        self.server = self.loop.create_task(tcp_server(1235))
        self.gp = self.loop.create_task(EnergyAgentProxy(1234, 1235))

    def tearDown(self) -> None:
        logger.debug('Cancelling the tasks')
        self.server.cancel()
        self.gp.cancel()
        self.loop.run_until_complete(asyncio.sleep(0.5))

    @classmethod
    def tearDownClass(cls) -> None:
        logger.debug('Closing the event loop')
        TestConnections.loop.close()

    def test_client2server(self):
        # start the client process, and wait until done
        result = self.loop.run_until_complete(asyncio.wait_for(tcp_client(1235, message), timeout=2))
        self.assertEqual(message, result)

Now, the problem is: unless I add in the method tearDown the last line

self.loop.run_until_complete(asyncio.sleep(0.5))

I get an error about tasks on the proxy not having finished:

ERROR:asyncio:Task was destroyed but it is pending!

Is there any way to just for all the tasks to finish? I have tried running

self.loop.run_until_complete(asyncio.wait([self.server, self.gp]))

and asyncio.gather… unsuccessfully.

Advertisement

Answer

You’re manually managing your event loop, but not waiting for the scheduled tasks to finish before it’s exited and the synchronous teardown never calls into the event loop by itself. Because of this the tasks are still running when Python exits and forces them to be deleted.

I would recommend using unittest.IsolatedAsyncioTestCase instead of managing this yourself, which will appropriately cancel and await left-over tasks when exiting a test case.

Your above code would look something like this

class TestConnections(unittest.IsolatedAsyncioTestCase):

    async def asyncSetUp(self) -> None:
        EnergyAgent.GP = GrowattParser.GrowattParser()
        self.server = asyncio.create_task(tcp_server(1235))
        self.gp = asyncio.create_task(EnergyAgentProxy(1234, 1235))

    async def test_client2server(self):
        # start the client process, and wait until done
        result = await asyncio.wait_for(tcp_client(1235, message), timeout=2)
        self.assertEqual(message, result)

If creating new event loops for each test case is an issue, you can clean up the event loop yourself before the closing by getting all the tasks through asyncio.all_tasks, cancelling them, waiting for them to finish their execution (getting cancelled or raising an exception) through run_until_complete, and then running loop.shutdown_asyncgens.

User contributions licensed under: CC BY-SA
6 People found this is helpful
Advertisement