Is it possible to set a time limit on the pool map()
operation when using multiprocessing
in Python. When the time limit is reached, all child processes stop and return the results they already have.
import multiprocessing as mp def task(v): do something return result if __name__ == '__main__': vs = [...] p= mp.Pool() results = p.map(task, vs)
In the example above, I have a very large list vs
. Ideally, all elements in the list vs
will be sent to function task()
, and all results will be saved in results
list.
However, as the list vs
is very large, and I only have a limited time to conduct this process (say 5min). What I need is to stop the map
process when 5 minutes is reached, and return the calculated results to the list results
.
EDIT1:
I’m not going to kill a task that needs more than 5 min to finish. Assuming I have 1000 tasks in the list vs
, and only 600 tasks are finished after 5 mins. What I need is to kill all child processes, and save the results of these 600 tasks to the results
list.
Advertisement
Answer
I looked at the answer referred to by ForceBru, which uses something called Pebble
. First, I don’t understand the comment about “Python standard Pool does not support timeouts.” It does in a fashion in that you can wait for a specified amount of time for a result to be returned and be notified via an exception whether it has or you can just issue a wait on on a result object specifying a timeout. No exception will be returned in that case but you can test whether the “job” processing the result has completed or not. It is, however, true that you cannot terminate individual timed-out jobs. But when when you have processed all the results that have not timed out, you can call terminate
on the pool itself, which will terminate all the processes with the pool whether they are idle or not. This leads to the second comment made in that answer, “and terminating processes abruptly might lead to weird behaviour within your applications.” This is true depending on what the job that timed out was doing. So, we agree that we should not be timing out jobs and terminating them prematurely if doing so could lead to weird behavior. But I don’t see how Pebble
can deal with that issue any better.
The question to which that answer was the response actually has one technique for doing what you wish buried within it. You need to give up on using the map
function and switch to using apply_async
specifying a callback function so that results can be saved as they become available. In the example below, I am using a TIMEOUT value of 5 seconds just for demo purposes and have arranged for approximately half of the 10 jobs I am submitting to timeout. I have pre-allocated a results list named squares
that will hold the 10 results and this has been initialized wit 10 None
values. If the ith value is None
when we are all done, it is because the job that was processing value i
timed out. My workder function also return its argument, v
, as well as its computed value, v ** 2
, so that the callback function knows what slot in the squares
list the computed result should go:
import multiprocessing as mp import time def my_task(v): time.sleep(v) return v, v ** 2 squares = [None] * 10 def my_callback(t): i, s = t squares[i] = s TIMEOUT = 5 if __name__ == '__main__': vs = range(10) pool = mp.Pool() results = [pool.apply_async(my_task, args=(v,), callback=my_callback) for v in vs] time.sleep(TIMEOUT) pool.terminate() # all processes, busy or idle, will be terminated print(squares)
Prints:
[0, 1, 4, 9, 16, None, None, None, None, None]
A second, more complicated method does not use a callback function. Rather it does a get
call on each AsynchResult
instance returned by the calls to pool.apply_async
specifying a timeout value. The tricky bit here is that for the initial call you have to use the full timeout value. But by time the result has been returned or you have gotten a timeout exception, you have already waited some amount of time, t
. That means the next time you are getting the result with a timeout, the timeout value you specify should be reduced by t
:
import multiprocessing as mp import time def my_task(v): time.sleep(6 if v == 0 else v) return v ** 2 TIMEOUT = 5 if __name__ == '__main__': vs = range(mp.cpu_count() - 1) # 7 on my desktop pool = mp.Pool() # poolsize is 8 results = [pool.apply_async(my_task, args=(v,)) for v in vs] time_to_wait = TIMEOUT # initial time to wait start_time = time.time() for i, result in enumerate(results): try: return_value = result.get(time_to_wait) # wait for up to time_to_wait seconds except mp.TimeoutError: print('Timeout for v = ', i) else: print(f'Return value for v = {i} is {return_value}') # how much time has exprired since we began waiting? t = time.time() - start_time time_to_wait = TIMEOUT - t if time_to_wait < 0: time_to_wait = 0 pool.terminate() # all processes, busy or idle, will be terminated
Prints:
Timeout for v = 0 Return value for v = 1 is 1 Return value for v = 2 is 4 Return value for v = 3 is 9 Return value for v = 4 is 16 Timeout for v = 5 Timeout for v = 6
Note
By using apply_async
rather than map
, jobs are being submitted effectively with a chunksize of 1 (see the chunksize
parameter for map
, which determines how the iterable argument is broken up into “chunks” to be put on each process’s input queue to minimize the number of shared memory transfers. For large iterables, apply_async
can be inefficient compared to map
, which uses a “reasonable” default chunksize based on the size of your pool and the number of jobs to be processed.