Skip to content
Advertisement

Multiprocessing where new process starts hafway through other process

I have a Python script that does two things; 1) it downloads a large file by making an API call, and 2) preprocess that large file. I want to use Multiprocessing to run my script. Each individual part (1 and 2) takes quite long. Everything happens in-memory due to the large size of the files, so ideally a single core would do both (1) and (2) consecutively. I have a large amount of cores available (100+), but I can only have 4 API calls running at the same time (limitation set by the API developers). So what I want to do is spawn 4 cores that start downloading by making an API-call, and as soon as one of those cores is done downloading and starts preprocessing I want a new core to start the whole process as well. This so there’s always 4 cores downloading, and as many cores as needed doing the pre-processing. I do not know however how to have a new core spawn as soon as another core is finished with the first part of the script.

My actual code is way too complex to just dump here, but let’s say I have the following two functions:

import requests 

def make_api_call(val):
    """Function that does part 1); makes an API call, stores it in memory and returns a large 
       satellite GeoTIFF
    """
    large_image = requests.get(val)
    return(large_image)

def preprocess_large_image(large_image):
    """Function that does part 2); preprocesses a large image, and returns the relevant data
    """
    results = preprocess(large_image)
    return(results)

how then can I make sure that as soon as a single core/process is finished with ‘make_api_call’ and starts with ‘preprocess_large_image’, another core spawns and starts the entire process as well? This so there is always 4 images downloading side-by-side. Thank you in advance for the help!

Advertisement

Answer

This is a perfect application for a multiprocessing.Semaphore (or for safety, use a BoundedSemaphore)! Basically you put a lock around the api call part of the process, but let up to 4 worker processes hold the lock at any given time. For various reasons, things like Lock, Semaphore, Queue, etc all need to be passed at the creation of a Pool, rather than when a method like map or imap is called. This is done by specifying an initialization function in the pool constructor.

def api_call(arg):
    return foo

def process_data(foo):
    return "done"

def map_func(arg):
    global semaphore
    with semaphore:
        foo = api_call(arg)
    return process_data(foo)

def init_pool(s):
    global semaphore = s

if __name__ == "__main__":
    s = mp.BoundedSemaphore(4)  #max concurrent API calls
    with mp.Pool(n_workers, init_pool, (s,)) as p:  #n_workers should be great enough that you always have a free worker waiting on semaphore.acquire()
        for result in p.imap(map_func, arglist):
            print(result)
User contributions licensed under: CC BY-SA
4 People found this is helpful
Advertisement