I have a simple (but large) task Graph in Dask. This is a code example
results = [] for params in SomeIterable: a = dask.delayed(my_function)(**params) b = dask.delayed(my_other_function)(a) results.append(b) dask.compute(**results)
Here SomeIterable
is a list of dict
, where each are arguments to my_function
. In each iteration b
depends on a
, so if the task that produces a
fails, b
can’t be computed. But, each element of results
are independent, so I expect if one fails, the other can continue running. This does not happen in practice, if an element of results
fails, then the execution of the script ends.
EDIT:
This also happen when using the submit
(or map
) method of the client class dask.distributed.Client
, for example
futures = [client.submit(my_other_function_2, **params) for params in MyOtherIterable] results = [ft.result() for ft in futures]
In the code above if one task fails when I try to gather a result, all code fails as in the docs
Advertisement
Answer
An easy way out of this is to wrap your functions in try/except
, so something like this:
def try_f(params): try: a = my_function(**params) b = my_other_function(a) except: b = f"Failed for: {params}" return b results = [dask.delayed(try_f)(params) for params in SomeIterable] computed = dask.compute(results)
However, depending on your case, you might want to use the client.submit
API, since it will give you some further flexibility, e.g. specifying some conditional retries.