Skip to content
Advertisement

set a time limit on the Pool Map operation when using multiprocessing?

Is it possible to set a time limit on the pool map() operation when using multiprocessing in Python. When the time limit is reached, all child processes stop and return the results they already have.

import multiprocessing as mp

def task(v):
    do something
    return result

if __name__ == '__main__':
    vs = [...]
    p= mp.Pool()
    results = p.map(task, vs)

In the example above, I have a very large list vs. Ideally, all elements in the list vs will be sent to function task(), and all results will be saved in results list.

However, as the list vs is very large, and I only have a limited time to conduct this process (say 5min). What I need is to stop the map process when 5 minutes is reached, and return the calculated results to the list results.

EDIT1:

I’m not going to kill a task that needs more than 5 min to finish. Assuming I have 1000 tasks in the list vs, and only 600 tasks are finished after 5 mins. What I need is to kill all child processes, and save the results of these 600 tasks to the results list.

Advertisement

Answer

I looked at the answer referred to by ForceBru, which uses something called Pebble. First, I don’t understand the comment about “Python standard Pool does not support timeouts.” It does in a fashion in that you can wait for a specified amount of time for a result to be returned and be notified via an exception whether it has or you can just issue a wait on on a result object specifying a timeout. No exception will be returned in that case but you can test whether the “job” processing the result has completed or not. It is, however, true that you cannot terminate individual timed-out jobs. But when when you have processed all the results that have not timed out, you can call terminate on the pool itself, which will terminate all the processes with the pool whether they are idle or not. This leads to the second comment made in that answer, “and terminating processes abruptly might lead to weird behaviour within your applications.” This is true depending on what the job that timed out was doing. So, we agree that we should not be timing out jobs and terminating them prematurely if doing so could lead to weird behavior. But I don’t see how Pebble can deal with that issue any better.

The question to which that answer was the response actually has one technique for doing what you wish buried within it. You need to give up on using the map function and switch to using apply_async specifying a callback function so that results can be saved as they become available. In the example below, I am using a TIMEOUT value of 5 seconds just for demo purposes and have arranged for approximately half of the 10 jobs I am submitting to timeout. I have pre-allocated a results list named squares that will hold the 10 results and this has been initialized wit 10 None values. If the ith value is None when we are all done, it is because the job that was processing value i timed out. My workder function also return its argument, v, as well as its computed value, v ** 2, so that the callback function knows what slot in the squares list the computed result should go:

import multiprocessing as mp
import time

def my_task(v):
    time.sleep(v)
    return v, v ** 2

squares = [None] * 10

def my_callback(t):
    i, s = t
    squares[i] = s


TIMEOUT = 5

if __name__ == '__main__':
    vs = range(10)
    pool = mp.Pool()
    results = [pool.apply_async(my_task, args=(v,), callback=my_callback) for v in vs]
    time.sleep(TIMEOUT)
    pool.terminate() # all processes, busy or idle, will be terminated
    print(squares)

Prints:

[0, 1, 4, 9, 16, None, None, None, None, None]

A second, more complicated method does not use a callback function. Rather it does a get call on each AsynchResult instance returned by the calls to pool.apply_async specifying a timeout value. The tricky bit here is that for the initial call you have to use the full timeout value. But by time the result has been returned or you have gotten a timeout exception, you have already waited some amount of time, t. That means the next time you are getting the result with a timeout, the timeout value you specify should be reduced by t:

import multiprocessing as mp
import time

def my_task(v):
    time.sleep(6 if v == 0 else v)
    return v ** 2


TIMEOUT = 5

if __name__ == '__main__':
    vs = range(mp.cpu_count() - 1) # 7 on my desktop
    pool = mp.Pool() # poolsize is 8
    results = [pool.apply_async(my_task, args=(v,)) for v in vs]
    time_to_wait = TIMEOUT # initial time to wait
    start_time = time.time()
    for i, result in enumerate(results):
        try:
            return_value = result.get(time_to_wait) # wait for up to time_to_wait seconds
        except mp.TimeoutError:
            print('Timeout for v = ', i)
        else:
            print(f'Return value for v = {i} is {return_value}')
        # how much time has exprired since we began waiting?
        t = time.time() - start_time
        time_to_wait = TIMEOUT - t
        if time_to_wait < 0:
            time_to_wait = 0
    pool.terminate() # all processes, busy or idle, will be terminated

Prints:

Timeout for v =  0
Return value for v = 1 is 1
Return value for v = 2 is 4
Return value for v = 3 is 9
Return value for v = 4 is 16
Timeout for v =  5
Timeout for v =  6

Note

By using apply_async rather than map, jobs are being submitted effectively with a chunksize of 1 (see the chunksize parameter for map, which determines how the iterable argument is broken up into “chunks” to be put on each process’s input queue to minimize the number of shared memory transfers. For large iterables, apply_async can be inefficient compared to map, which uses a “reasonable” default chunksize based on the size of your pool and the number of jobs to be processed.

User contributions licensed under: CC BY-SA
1 People found this is helpful
Advertisement