Skip to content
Advertisement

How to collect wait()’d co-routines in a set?

I have been attempting to generate a ping scan that uses a limited number of processes. I tried as_completed without success and switched to asyncio.wait with asyncio.FIRST_COMPLETED.

The following complete script works if the offending line is commented out. I’d like to collect the tasks to a set in order to get rid of pending = list(pending) however pending_set.union(task) throws await wasn't used with future.

"""Test simultaneous pings, limiting processes."""
import asyncio
from time import asctime

pinglist = [
    '127.0.0.1', '192.168.1.10', '192.168.1.20', '192.168.1.254',
    '192.168.177.20', '192.168.177.100', '172.17.1.1'
]


async def ping(ip):
    """Run external ping."""
    p = await asyncio.create_subprocess_exec(
        'ping', '-n', '-c', '1', ip,
        stdout=asyncio.subprocess.DEVNULL,
        stderr=asyncio.subprocess.DEVNULL
    )
    return await p.wait()


async def run():
    """Run the test, uses some processes and will take a while."""
    iplist = pinglist[:]
    pending = []
    pending_set = set()
    tasks = {}
    while len(pending) or len(iplist):
        while len(pending) < 3 and len(iplist):
            ip = iplist.pop()
            print(f"{asctime()} adding {ip}")
            task = asyncio.create_task(ping(ip))
            tasks[task] = ip
            pending.append(task)
            pending_set.union(task)  # comment this line and no error
        done, pending = await asyncio.wait(
            pending, return_when=asyncio.FIRST_COMPLETED
        )
        pending = list(pending)
        for taskdone in done:
            print(' '.join([
                asctime(),
                ('BAD' if taskdone.result() else 'good'),
                tasks[taskdone]
            ]))

if __name__ == '__main__':
    asyncio.run(run())

Advertisement

Answer

There are two problems with pending_set.union(task):

  • union doesn’t update the set in-place, it returns a new set consisting of the original one and the one it receives as argument.

  • It accepts an iterable collection (such as another set), not a single element. Thus union attempts to iterate over task, which doesn’t make sense. To make things more confusing, task objects are technically iterable in order to be usable in yield from expressions, but they detect iteration attempts in non-async contexts, and report the error you’ve observed.

To fix both issues, you should use the add method instead, which operates by side effect and accepts a single element to add to the set:

pending_set.add(task)

Note that a more idiomatic way to limit concurrency in asyncio is using a Semaphore. For example (untested):

async def run():
    limit = asyncio.Semaphore(3)
    async def wait_and_ping(ip):
        async with limit:
            print(f"{asctime()} adding {ip}")
            result = await ping(ip)
        print(asctime(), ip, ('BAD' if result else 'good'))
    await asyncio.gather(*[wait_and_ping(ip) for ip in pinglist])
User contributions licensed under: CC BY-SA
6 People found this is helpful
Advertisement