I want to implement retry logic with Python’s concurrent.futures.ThreadPoolExecutor
. I would like the following properties:
- A new future is added to the work queue as soon as it fails.
- A retried future can be retried again, either indefinitely or up to a maximum retry count.
A lot of existing code I found online basically operates in “rounds”, where they call as_completed
on an initial list of futures, resubmits failed futures, gathers those futures in a new list, and goes back to calling as_completed
on the new list if it’s not empty. Basically something like this:
with concurrent.futures.ThreadPoolExecutor(...) as executor: futures = {executor.submit(fn, job): job for job in jobs} while len(futures) > 0: new_futures = {} for fut in concurrent.futures.as_completed(futures): if fut.exception(): job = futures[fut] new_futures[executor.submit(fn, job)] = job else: ... # logic to handle successful job futures = new_futures
However, I think that doesn’t satisfy the first property, since it’s possible that a retried future completes before the initial futures, but we won’t process it until all the initial futures complete.
Here’s a hypothetical pathological case. Let’s say we have two jobs, the first runs for 1 second but has a 90% chance of failure, while the second runs for 100 seconds. If our executor has 2 workers, and the first job fails after 1 second, we’ll retry it immediately. But if it failed again, we won’t be able to retry until the second job completes.
So my question is, is it possible to implement retry logic with these desired properties, without using external libraries or rewriting low-level executor logic? One thing I tried is putting the retry logic in the code sent to the worker:
def worker_job(fn): try: return fn() except Exception: executor.submit(fn) with concurrent.futures.ThreadPoolExecutor(...) as executor: jobs = [functools.partial(fn, arg) for arg in args] executor.map(worker_job, jobs)
But it seems like submitting new jobs from within a job doesn’t work.
Advertisement
Answer
Retry using as_completed
Simple way
Loop with wait(..., return_when=FIRST_COMPLETED)
instead of as_completed(...)
.
Trade-offs:
- Overhead of
pending
futures (re-adding waiter, buildingnew_futures
). - Troublesome if want to specify overall
timeout
.
with concurrent.futures.ThreadPoolExecutor() as executor: futures = {executor.submit(fn, job): job for job in jobs} while len(futures) > 0: new_futures = {} done, pending = concurrent.futures.wait(futures, return_when=FIRST_COMPLETED) for fut in done: if fut.exception(): job = futures[fut] new_futures[executor.submit(fn, job)] = job else: ... # logic to handle successful job for fut in pending: job = futures[fut] new_futures[fut] = job futures = new_futures
Efficient way
Tweak as_completed(...)
to add to fs
and pending
, and use waiter
.
Trade-off: Maintenance.
Advantage: Ability to specify overall timeout
if wanted.
class AsCompletedWaiterWrapper: def __init__(self): self.fs = None self.pending = None self.waiter = None def listen(self, fut): with self.waiter.lock: self.fs.add(fut) self.pending.add(fut) fut._waiters.append(self.waiter) def as_completed(self, fs, timeout=None): """ concurrent.futures.as_completed plus the 3 lines marked with +. """ if timeout is not None: end_time = timeout + time.monotonic() fs = set(fs) total_futures = len(fs) with _AcquireFutures(fs): finished = set( f for f in fs if f._state in [CANCELLED_AND_NOTIFIED, FINISHED]) pending = fs - finished waiter = _create_and_install_waiters(fs, _AS_COMPLETED) self.fs = fs # + self.pending = pending # + self.waiter = waiter # + finished = list(finished) try: yield from _yield_finished_futures(finished, waiter, ref_collect=(fs,)) while pending: if timeout is None: wait_timeout = None else: wait_timeout = end_time - time.monotonic() if wait_timeout < 0: raise TimeoutError( '%d (of %d) futures unfinished' % ( len(pending), total_futures)) waiter.event.wait(wait_timeout) with waiter.lock: finished = waiter.finished_futures waiter.finished_futures = [] waiter.event.clear() # reverse to keep finishing order finished.reverse() yield from _yield_finished_futures(finished, waiter, ref_collect=(fs, pending)) finally: # Remove waiter from unfinished futures for f in fs: with f._condition: f._waiters.remove(waiter)
Usage:
with concurrent.futures.ThreadPoolExecutor() as executor: futures = {executor.submit(fn, job): job for job in jobs} w = AsCompletedWaiterWrapper() for fut in w.as_completed(futures): if fut.exception(): job = futures[fut] new_fut = executor.submit(fn, job) futures[new_fut] = job w.listen(new_fut) else: ... # logic to handle successful job
Retry from job helper
Wait for events
in with ... executor:
as ThreadPoolExecutor.__exit__
shuts down executor
so it cannot schedule new futures.
Trade-offs:
- Would not work with
ProcessPoolExecutor
due toexecutor
reference in main process. - Troublesome if want to specify overall timeout.
def worker_job(fn, event): try: rv = fn() event.set() return rv except Exception: executor.submit(worker_job, fn, event) with concurrent.futures.ThreadPoolExecutor() as executor: jobs = [functools.partial(fn, arg) for arg in args] events = [threading.Event() for _ in range(len(jobs))] executor.map(worker_job, jobs, events) for e in events: e.wait()