I have been attempting to generate a ping scan that uses a limited number of processes. I tried as_completed without success and switched to asyncio.wait with asyncio.FIRST_COMPLETED.
The following complete script works if the offending line is commented out. I’d like to collect the tasks to a set in order to get rid of pending = list(pending) however pending_set.union(task) throws await wasn't used with future.
"""Test simultaneous pings, limiting processes."""
import asyncio
from time import asctime
pinglist = [
'127.0.0.1', '192.168.1.10', '192.168.1.20', '192.168.1.254',
'192.168.177.20', '192.168.177.100', '172.17.1.1'
]
async def ping(ip):
"""Run external ping."""
p = await asyncio.create_subprocess_exec(
'ping', '-n', '-c', '1', ip,
stdout=asyncio.subprocess.DEVNULL,
stderr=asyncio.subprocess.DEVNULL
)
return await p.wait()
async def run():
"""Run the test, uses some processes and will take a while."""
iplist = pinglist[:]
pending = []
pending_set = set()
tasks = {}
while len(pending) or len(iplist):
while len(pending) < 3 and len(iplist):
ip = iplist.pop()
print(f"{asctime()} adding {ip}")
task = asyncio.create_task(ping(ip))
tasks[task] = ip
pending.append(task)
pending_set.union(task) # comment this line and no error
done, pending = await asyncio.wait(
pending, return_when=asyncio.FIRST_COMPLETED
)
pending = list(pending)
for taskdone in done:
print(' '.join([
asctime(),
('BAD' if taskdone.result() else 'good'),
tasks[taskdone]
]))
if __name__ == '__main__':
asyncio.run(run())
Advertisement
Answer
There are two problems with pending_set.union(task):
uniondoesn’t update the set in-place, it returns a new set consisting of the original one and the one it receives as argument.It accepts an iterable collection (such as another set), not a single element. Thus
unionattempts to iterate overtask, which doesn’t make sense. To make things more confusing, task objects are technically iterable in order to be usable inyield fromexpressions, but they detect iteration attempts in non-async contexts, and report the error you’ve observed.
To fix both issues, you should use the add method instead, which operates by side effect and accepts a single element to add to the set:
pending_set.add(task)
Note that a more idiomatic way to limit concurrency in asyncio is using a Semaphore. For example (untested):
async def run():
limit = asyncio.Semaphore(3)
async def wait_and_ping(ip):
async with limit:
print(f"{asctime()} adding {ip}")
result = await ping(ip)
print(asctime(), ip, ('BAD' if result else 'good'))
await asyncio.gather(*[wait_and_ping(ip) for ip in pinglist])