I have been attempting to generate a ping scan that uses a limited number of processes. I tried as_completed without success and switched to asyncio.wait
with asyncio.FIRST_COMPLETED
.
The following complete script works if the offending line is commented out. I’d like to collect the tasks to a set in order to get rid of pending = list(pending)
however pending_set.union(task)
throws await wasn't used with future
.
"""Test simultaneous pings, limiting processes.""" import asyncio from time import asctime pinglist = [ '127.0.0.1', '192.168.1.10', '192.168.1.20', '192.168.1.254', '192.168.177.20', '192.168.177.100', '172.17.1.1' ] async def ping(ip): """Run external ping.""" p = await asyncio.create_subprocess_exec( 'ping', '-n', '-c', '1', ip, stdout=asyncio.subprocess.DEVNULL, stderr=asyncio.subprocess.DEVNULL ) return await p.wait() async def run(): """Run the test, uses some processes and will take a while.""" iplist = pinglist[:] pending = [] pending_set = set() tasks = {} while len(pending) or len(iplist): while len(pending) < 3 and len(iplist): ip = iplist.pop() print(f"{asctime()} adding {ip}") task = asyncio.create_task(ping(ip)) tasks[task] = ip pending.append(task) pending_set.union(task) # comment this line and no error done, pending = await asyncio.wait( pending, return_when=asyncio.FIRST_COMPLETED ) pending = list(pending) for taskdone in done: print(' '.join([ asctime(), ('BAD' if taskdone.result() else 'good'), tasks[taskdone] ])) if __name__ == '__main__': asyncio.run(run())
Advertisement
Answer
There are two problems with pending_set.union(task)
:
union
doesn’t update the set in-place, it returns a new set consisting of the original one and the one it receives as argument.It accepts an iterable collection (such as another set), not a single element. Thus
union
attempts to iterate overtask
, which doesn’t make sense. To make things more confusing, task objects are technically iterable in order to be usable inyield from
expressions, but they detect iteration attempts in non-async contexts, and report the error you’ve observed.
To fix both issues, you should use the add
method instead, which operates by side effect and accepts a single element to add to the set:
pending_set.add(task)
Note that a more idiomatic way to limit concurrency in asyncio is using a Semaphore
. For example (untested):
async def run(): limit = asyncio.Semaphore(3) async def wait_and_ping(ip): async with limit: print(f"{asctime()} adding {ip}") result = await ping(ip) print(asctime(), ip, ('BAD' if result else 'good')) await asyncio.gather(*[wait_and_ping(ip) for ip in pinglist])