I’m looking for a way to rate limit a recursive algorithm. To demonstrate the problem I’ll use the example of exploring a directory tree:
import asyncio from pathlib import Path async def explore_tree(path: Path) -> ExploreResult: tasks = [] for child in path.iterdir(): if child.is_dir(): tasks.add(explore_tree(child)) elif child.is_file(): tasks.add(parse_file(child)) results = await asyncio.gather(*tasks) return combine_results(results)
The problem with this code is the way the number of active tasks explode exponentially. Many of these tasks hold OS resources, so even though tasks themselves are theoretically cheap, running millions simultaneously is going to cause problems. Yet we don’t want to run one task at a time (without gather
) because there is a significant performance boost from running these tasks in parallel.
Python’s async Semaphore is first in first out. Python’s async Lock is first in first out and its semaphore is mostly first in first out except for a race condition. This leads the exploration to be approximately breadth first and the number of tasks explodes into millions even if most of them are waiting. As described, this is still causing problems and not fixing the root cause.
One example error from this is IOError: [Errno 24] Too many open files
. Obviously it’s possible to put limits around opening files via semaphores. But then this shift the problem onto other resources and we end up playing “wack a mole” with individual resource limits.
What I’m hunting for is something like a semaphore that is deliberately unfair and subject to starvation. I want a semaphore that imposes last-in-first-out instead of first-in-first-out. The aim is to use the resource constraints to squeeze the exploration into a more depth-first pattern instead of breadth-first.
Advertisement
Answer
To my surprise python’s semaphore is not fair. There’s a race condition which gives priority to new calls to acquire()
over tasks which have been released but not yet executed on the event loop. That is:
sem.realease() await sem.acquire()
Irrespective of the number of waiting tasks, the above code will never block and will even re-order the queue of waiting tasks as a result. So sadly this object is useless for enforcing a strict order.
I wrote my own:
class FairSemaphore: """ Semaphore with strictly controlled order. By default this will be first-in-first-out but can be configured to be last-in-first-out """ _queue: Deque[asyncio.Future] _value: int _fifo: bool def __init__(self, value: int, fifo=True): """ Initial value of the semaphore :param value: Initial value for the semaphore :param fifo: If True (default) the first task to call acquire() will be the first to be released. If False the last task to call acquire() at the moment release() is called will be the first to be released. """ self._value = value self._queue = collections.deque() self._fifo = fifo def locked(self) -> bool: """ Indicates if acquire() can be called without blocking. """ return not self._value async def acquire(self): if self._value: self._value -= 1 else: loop: asyncio.AbstractEventLoop = asyncio.get_running_loop() future = loop.create_future() self._queue.append(future) try: await future except: # This condition happens when the future's result was set but the task was cancelled # In other words another task completed and released this one... but this one got cancelled before it # could do anything. As a result we need to release another. if not future.cancelled(): self.release() # else: # But if we were NOT released then we do not have the right to release another. raise def release(self): # Tasks can get cancelled while in the queue. # Naively you would expect their _acquire() code to remove them from the queue. But that doesn't always work # because the event loop might not have given them chance execute the CancelledError except clause yet. # It's absolutely unavoidable that there could be cancelled tasks waiting on this queue. # When that happen the done() state of the future goes to True... while self._queue: future = self._queue.popleft() if self._fifo else self._queue.pop() if not future.done(): future.set_result(None) break # ... we discard any task which is already "done" because else: self._value += 1 async def __aenter__(self): await self.acquire() async def __aexit__(self, exc_type, exc, tb): self.release()