So given a bit of a complex setup, which is used to generate a list of queries to be run semi-parallel (using a semaphore to not run too many queries at the same time, to not DDoS the server).
i have an (in itself async) function that creates a number of queries:
async def run_query(self, url): async with self.semaphore: return await some_http_lib(url) async def create_queries(self, base_url): # ...gathering logic is ofc a bit more complex in the real setting urls = await some_http_lib(base_url).json() coros = [self.run_query(url) for url in urls] # note: not executed just yet return coros async def execute_queries(self): queries = await self.create_queries('/seom/url') _logger.info(f'prepared {len(queries)} queries') results = [] done = 0 # note: ofc, in this simple example call these would not actually be asynchronously executed. # in the real case i'm using asyncio.gather, this just makes for a slightly better # understandable example. for query in queries: # at this point, the request is actually triggered result = await query # ...some postprocessing if not result['success']: raise QueryException(result['message']) # ...internal exception done += 1 _logger.info(f'{done} of {len(queries)} queries done') results.append(result) return results
Now this works very nicely, executing exactly as i planned and i can handle an exception in one of the queries by aborting the whole operation.
async def run(): try: return await QueryRunner.execute_queries() except QueryException: _logger.error('something went horribly wrong') return None
The only problem is that the program is terminated, but leaves me with the usual RuntimeWarning: coroutine QueryRunner.run_query was never awaited
, because the queries later in the queue are (rightfully) not executed and as such not awaited.
Is there any way to cancel these unawaited coroutines? Would it be otherwise possible to supress this warning?
[Edit] a bit more context as of how the queries are executed outside this simple example: the queries are usually grouped together, so there is multiple calls to create_queries() with different parameters. then all collected groups are looped and the queries are executed using asyncio.gather(group). This awaits all the queries of one group, but if one fails, the other groups are canceled aswell, which results in the error being thrown.
Advertisement
Answer
So you are asking how to cancel a coroutine that has not yet been either awaited or passed to gather
. There are two options:
- you can call
asyncio.create_task(c).cancel()
- you can directly call
c.close()
on the coroutine object
The first option is a bit more heavyweight (it creates a task only to immediately cancel it), but it uses the documented asyncio functionality. The second option is more lightweight, but also more low-level.
The above applies to coroutine objects that have never been converted to tasks (by passing them to gather
or wait
, for example). If they have, for example if you called asyncio.gather(*coros)
, one of them raised and you want to cancel the rest, you should change the code to first convert them to tasks using asyncio.create_task()
, then call gather
, and use finally
to cancel the unfinished ones:
tasks = list(map(asyncio.create_task, coros)) try: results = await asyncio.gather(*tasks) finally: # if there are unfinished tasks, that is because one of them # raised - cancel the rest for t in tasks: if not t.done(): t.cancel()