Skip to content
Advertisement

Python – getting lost around async

As the title said – I’m having a problem with async stuff. What I’m trying to achieve is written under every function. Sadly in this state of code I’m getting and error:

TypeError: object StreamReader can't be used in 'await' expression and at the end RuntimeError: Event loop is closed

I was googling for a while and didn’t really find a solution for my thing. Is there anyone who can help me a bit and clear what am I doing wrong? Am I allowed to have 2x async with .. in one async function?

Thanks!

 
def load_file(file_path):
    with open(file_path, "r") as f:
        content = f.readlines()
        content = [a.strip() for a in content]
    return content

### --> Getting list of urls

async def task(session, item, urls):
    async with session.get(item) as resp:
        image_bytes = BytesIO(await resp.content)

### --> Downloading the image and getting image bytes

    async with session.post(
        TORCH_URL, data=image_bytes, headers={"authorization": TOKEN}
    ) as resp:
        response = await resp.json()
        print(response)

### --> Sending the image bytes to an API and getting a little json file as a response 

async def asyncmain(urls, path, content):

    tasks = []
    async with aiohttp.ClientSession() as session:
        tasks = [task(session, url, urls) for url in content]
        await asyncio.gather(*tasks)

### --> Gathering the tasks with .gather()

@click.command()
@click.option("--urls", "-u", is_flag=True, help="Use this if you have urls")
@click.option(
    "--path",
    "-p",
    help="Path to file with variant IDs, can be combined with -u (having urls in file)",
)
def main(urls, path):
    tasks = []
    content = load_file(path)
    asyncio.run(asyncmain(urls, path, content), debug=True)

### --> Fire asyncio.run with some params

if __name__ == "__main__":
    main()

Advertisement

Answer

Your issue is the improper use of the resp.content variable below.

async def task(session, item, urls):
    async with session.get(item) as resp:
        image_bytes = BytesIO(await resp.content)

See aiohttp’s streaming response content documentation.

While methods read(), json() and text() are very convenient you should use them carefully. All these methods load the whole response in memory. For example if you want to download several gigabyte sized files, these methods will load all the data in memory. Instead you can use the content attribute. It is an instance of the aiohttp.StreamReader class. The gzip and deflate transfer-encodings are automatically decoded for you:

async with session.get('https://api.github.com/events') as resp:
    await resp.content.read(10)

You can either

  • (a) download chunks and store them into disk, or
  • (b) if the binary is not large and can be stored into memory — which seems to be the case based on your use of BytesIO — use io.BytesIO(await resp.read()) (see binary response content.)
User contributions licensed under: CC BY-SA
6 People found this is helpful
Advertisement