Skip to content
Advertisement

asyncio run_until_complete does not wait that all coroutines finish

I am making my first steps in Python and I have a bit of struggle trying to understand why I do not have the expected result with this one. Here is what I am trying to achieve :

I have a function that consumes an API. While waiting for the API to answer and given that I am going through a proxy that creates additional lag, I though that sending concurrent request will speed up the process (I run 100 concurrent requests). It does. But asyncio run_until_complete always returns some unfinished coroutines.

Here the code (simplified):

import aiohttp
import asyncio
    
async def consume_api(parameter):
    url = "someurl" #it is actually based on the parameter
    try:
        async with aiohttp.ClientSession() as session:
            async with session.get(URL, proxy="someproxy") as asyncresponse:
                r = await asyncresponse.read()
    except:
        global error_count 
        error_count += 1
        if error_count > 50:
            return "Exceeded 50 try on same request"
        else:
            return consume_api(parameter)
    return r.decode("utf-8") 

def loop_on_api(list_of_parameter):
    loop = asyncio.get_event_loop()

    coroutines = [consume_api(list_of_parameter[i]) for i in range(len(list_of_parameter))]
    results = loop.run_until_complete(asyncio.gather(*coroutines))
    return results

When I run the debugger, the results returned by the loop_on_api function include a list of string corresponding to the results of consume_api and some occurence of <coroutine objects consume_api at 0x00...>. Those variables have a cr_running parameter at False and a cr_Frame. Though if I check the coroutines variables, I can find all the 100 coroutines but none seems to have a cr_Frame.

Any idea what I am doing wrong?

I’m also thinking my way of counting the 50 error will be shared by all coroutines.

Any idea how I can make it specific?

Advertisement

Answer

It seems the issue is coming from the proxy I am using, which sometimes do not carry the request or response. Hence forcing a rerun seems to be the best answer. Hence I now check if the results returned have some coroutines remaining and re-run the loop_on_api() on them

def loop_on_api(list_of_parameter):
    loop = asyncio.get_event_loop()

    coroutines = [consume_api(list_of_parameter[i]) for i in range(len(list_of_parameter))]
    results = loop.run_until_complete(asyncio.gather(*coroutines))

    undone = []
    rerun_list_of_parameter = []
    
    for i in range(len(results)):
        if str(type(results[i])) == "<class 'coroutine'>": #not very elegant >> is there a better way?
            undone.append(i)
            rerun_list_of_parameter.append(list_of_parameter[i])

    if len(undone) > 0:
        undone_results = loop_on_api(rerun_list_of_parameter)
        for i in range(len(undone_results)):
            results[undone[i]] = undone_results[i]

    return results
User contributions licensed under: CC BY-SA
8 People found this is helpful
Advertisement