Skip to content
Advertisement

Python asyncio cancel unawaited coroutines

So given a bit of a complex setup, which is used to generate a list of queries to be run semi-parallel (using a semaphore to not run too many queries at the same time, to not DDoS the server).

i have an (in itself async) function that creates a number of queries:

async def run_query(self, url):
    async with self.semaphore:
        return await some_http_lib(url)

async def create_queries(self, base_url):
   # ...gathering logic is ofc a bit more complex in the real setting
   urls = await some_http_lib(base_url).json()
 
   coros = [self.run_query(url) for url in urls]  # note: not executed just yet
   return coros

async def execute_queries(self):
   queries = await self.create_queries('/seom/url')
   _logger.info(f'prepared {len(queries)} queries')
   results = []
   done = 0

   # note: ofc, in this simple example call these would not actually be asynchronously executed.
   # in the real case i'm using asyncio.gather, this just makes for a slightly better
   # understandable example.
   for query in queries:
       # at this point, the request is actually triggered
       result = await query

       # ...some postprocessing

       if not result['success']:
           raise QueryException(result['message'])  # ...internal exception
       
       done  += 1
       _logger.info(f'{done} of {len(queries)} queries done')
       results.append(result)
    
   return results

Now this works very nicely, executing exactly as i planned and i can handle an exception in one of the queries by aborting the whole operation.

async def run():
    try:
       return await QueryRunner.execute_queries()
    except QueryException:
       _logger.error('something went horribly wrong')
       return None

The only problem is that the program is terminated, but leaves me with the usual RuntimeWarning: coroutine QueryRunner.run_query was never awaited, because the queries later in the queue are (rightfully) not executed and as such not awaited.

Is there any way to cancel these unawaited coroutines? Would it be otherwise possible to supress this warning?

[Edit] a bit more context as of how the queries are executed outside this simple example: the queries are usually grouped together, so there is multiple calls to create_queries() with different parameters. then all collected groups are looped and the queries are executed using asyncio.gather(group). This awaits all the queries of one group, but if one fails, the other groups are canceled aswell, which results in the error being thrown.

Advertisement

Answer

So you are asking how to cancel a coroutine that has not yet been either awaited or passed to gather. There are two options:

  • you can call asyncio.create_task(c).cancel()
  • you can directly call c.close() on the coroutine object

The first option is a bit more heavyweight (it creates a task only to immediately cancel it), but it uses the documented asyncio functionality. The second option is more lightweight, but also more low-level.

The above applies to coroutine objects that have never been converted to tasks (by passing them to gather or wait, for example). If they have, for example if you called asyncio.gather(*coros), one of them raised and you want to cancel the rest, you should change the code to first convert them to tasks using asyncio.create_task(), then call gather, and use finally to cancel the unfinished ones:

tasks = list(map(asyncio.create_task, coros))
try:
    results = await asyncio.gather(*tasks)
finally:
    # if there are unfinished tasks, that is because one of them
    # raised - cancel the rest
    for t in tasks:
        if not t.done():
            t.cancel()
User contributions licensed under: CC BY-SA
8 People found this is helpful
Advertisement