Skip to content
Advertisement

Retrying failed futures in Python’s ThreadPoolExecutor

I want to implement retry logic with Python’s concurrent.futures.ThreadPoolExecutor. I would like the following properties:

  1. A new future is added to the work queue as soon as it fails.
  2. A retried future can be retried again, either indefinitely or up to a maximum retry count.

A lot of existing code I found online basically operates in “rounds”, where they call as_completed on an initial list of futures, resubmits failed futures, gathers those futures in a new list, and goes back to calling as_completed on the new list if it’s not empty. Basically something like this:

with concurrent.futures.ThreadPoolExecutor(...) as executor:
    futures = {executor.submit(fn, job): job for job in jobs}
    while len(futures) > 0:
        new_futures = {}
        for fut in concurrent.futures.as_completed(futures):
            if fut.exception():
                job = futures[fut]
                new_futures[executor.submit(fn, job)] = job
            else:
                ...  # logic to handle successful job
        futures = new_futures

However, I think that doesn’t satisfy the first property, since it’s possible that a retried future completes before the initial futures, but we won’t process it until all the initial futures complete.

Here’s a hypothetical pathological case. Let’s say we have two jobs, the first runs for 1 second but has a 90% chance of failure, while the second runs for 100 seconds. If our executor has 2 workers, and the first job fails after 1 second, we’ll retry it immediately. But if it failed again, we won’t be able to retry until the second job completes.


So my question is, is it possible to implement retry logic with these desired properties, without using external libraries or rewriting low-level executor logic? One thing I tried is putting the retry logic in the code sent to the worker:

def worker_job(fn):
    try:
        return fn()
    except Exception:
        executor.submit(fn)

with concurrent.futures.ThreadPoolExecutor(...) as executor:
    jobs = [functools.partial(fn, arg) for arg in args]
    executor.map(worker_job, jobs)

But it seems like submitting new jobs from within a job doesn’t work.

Advertisement

Answer

Retry using as_completed

Simple way

Loop with wait(..., return_when=FIRST_COMPLETED) instead of as_completed(...).

Trade-offs:

  1. Overhead of pending futures (re-adding waiter, building new_futures).
  2. Troublesome if want to specify overall timeout.
with concurrent.futures.ThreadPoolExecutor() as executor:
    futures = {executor.submit(fn, job): job for job in jobs}
    while len(futures) > 0:
        new_futures = {}
        done, pending = concurrent.futures.wait(futures, return_when=FIRST_COMPLETED)
        for fut in done:
            if fut.exception():
                job = futures[fut]
                new_futures[executor.submit(fn, job)] = job
            else:
                ...  # logic to handle successful job
        for fut in pending:
            job = futures[fut]
            new_futures[fut] = job
        futures = new_futures

Efficient way

Tweak as_completed(...) to add to fs and pending, and use waiter.

Trade-off: Maintenance.

Advantage: Ability to specify overall timeout if wanted.

class AsCompletedWaiterWrapper:
    def __init__(self):
        self.fs = None
        self.pending = None
        self.waiter = None

    def listen(self, fut):
        with self.waiter.lock:
            self.fs.add(fut)
            self.pending.add(fut)
            fut._waiters.append(self.waiter)

    def as_completed(self, fs, timeout=None):
        """
        concurrent.futures.as_completed plus the 3 lines marked with +.
        """
        if timeout is not None:
            end_time = timeout + time.monotonic()

        fs = set(fs)
        total_futures = len(fs)
        with _AcquireFutures(fs):
            finished = set(
                    f for f in fs
                    if f._state in [CANCELLED_AND_NOTIFIED, FINISHED])
            pending = fs - finished
            waiter = _create_and_install_waiters(fs, _AS_COMPLETED)
        self.fs = fs            # +
        self.pending = pending  # +
        self.waiter = waiter    # +
        finished = list(finished)
        try:
            yield from _yield_finished_futures(finished, waiter,
                                               ref_collect=(fs,))

            while pending:
                if timeout is None:
                    wait_timeout = None
                else:
                    wait_timeout = end_time - time.monotonic()
                    if wait_timeout < 0:
                        raise TimeoutError(
                                '%d (of %d) futures unfinished' % (
                                len(pending), total_futures))

                waiter.event.wait(wait_timeout)

                with waiter.lock:
                    finished = waiter.finished_futures
                    waiter.finished_futures = []
                    waiter.event.clear()

                # reverse to keep finishing order
                finished.reverse()
                yield from _yield_finished_futures(finished, waiter,
                                                   ref_collect=(fs, pending))

        finally:
            # Remove waiter from unfinished futures
            for f in fs:
                with f._condition:
                    f._waiters.remove(waiter)

Usage:

with concurrent.futures.ThreadPoolExecutor() as executor:
    futures = {executor.submit(fn, job): job for job in jobs}
    w = AsCompletedWaiterWrapper()
    for fut in w.as_completed(futures):
        if fut.exception():
            job = futures[fut]
            new_fut = executor.submit(fn, job)
            futures[new_fut] = job
            w.listen(new_fut)
        else:
            ...  # logic to handle successful job

Retry from job helper

Wait for events in with ... executor: as ThreadPoolExecutor.__exit__ shuts down executor so it cannot schedule new futures.

Trade-offs:

  1. Would not work with ProcessPoolExecutor due to executor reference in main process.
  2. Troublesome if want to specify overall timeout.
def worker_job(fn, event):
    try:
        rv = fn()
        event.set()
        return rv
    except Exception:
        executor.submit(worker_job, fn, event)

with concurrent.futures.ThreadPoolExecutor() as executor:
    jobs = [functools.partial(fn, arg) for arg in args]
    events = [threading.Event() for _ in range(len(jobs))]
    executor.map(worker_job, jobs, events)
    for e in events:
        e.wait()
3 People found this is helpful
Advertisement