I’d like to define what essentially is an asynchronous __del__
that closes a resource. Here’s an example.
import asyncio class Async: async def close(self): print('closing') return self def __del__(self): print('destructing') asyncio.ensure_future(self.close()) async def amain(): Async() if __name__ == '__main__': asyncio.run(amain())
This works, printing destructing
and closing
as expected. However, if the resource is defined outside an asynchronous function, __del__
is called, but closing is never performed.
def main(): Async()
No warning is raised here, but the prints reveal that closing was not done. The warning is issued if an asynchronous function has been run, but any instance is created outside of it.
def main2(): Async() asyncio.run(amain())
RuntimeWarning: coroutine ‘Async.close’ was never awaited
This has been the subject in 1 and 2, but neither quite had what I was looking for, or maybe I didn’t know how to look. Particularly the first question was about deleting a resource, and its answer suggested using asyncio.ensure_future
, which was tested above. Python documentation suggests using the newer asyncio.create_task
, but it straight up raises an error in the non-async case, there being no current loop. My final, desperate attempt was to use asyncio.run
, which worked for the non-async case, but not for the asynchronous one, as calling run
is prohibited in a thread that already has a running loop. Additionally, the documentation states that it should only be called once in a program.
I’m still new to async things. How could this be achieved?
A word on the use case, since asynchronous context managers were mentioned as the preferred alternative in comments. I agree, using them for short-term resource management would be ideal. However, my use case is different for two reasons.
- Users of the class are not necessarily aware of the underlying resources. It is better user experience to hide closing the resource from a user who doesn’t fiddle with the resource itself.
- The class needs to be instantiated (or for it to be possible to instantiate it) in a synchronous context, and it is often created just once. For example, in a web server context the class would be instantiated in the global scope, after which its async functions would be used in the endpoint definitions.
For example:
asc = Async() server.route('/', 'GET') async def root(): return await asc.do_something(), 200
I’m open to other suggestions of implementing such a feature, but at this point even my curiosity for the possibility that this can be done is enough for me to want an answer to this specific question, not just the general problem.
Advertisement
Answer
Only thing that comes to mind is to run cleanup after the server shutdown. It’ll look something like this:
asc = Async() try: asyncio.run(run_server()) # You already do it now somewhere finally: asyncio.run(asc.close())
Since asyncio.run
creates new event loop each time, you may want to go even deeper and reuse the same event loop:
loop = asyncio.get_event_loop() asc = Async() try: loop.run_until_complete(run_server()) finally: loop.run_until_complete(asc.close())
It’s absolutely ok to call run_until_complete
multiple times as long as you know what you’re doing.
Full example with your snippet:
import asyncio class Async: async def close(self): print('closing') return self async def cleanup(self): print('destructing') await self.close() loop = asyncio.get_event_loop() asc = Async() async def amain(): await asyncio.sleep(1) # Do something if __name__ == '__main__': try: loop.run_until_complete(amain()) finally: loop.run_until_complete(asc.cleanup()) loop.close()