I have a Python script that does two things; 1) it downloads a large file by making an API call, and 2) preprocess that large file. I want to use Multiprocessing to run my script. Each individual part (1 and 2) takes quite long. Everything happens in-memory due to the large size of the files, so ideally a single core would do both (1) and (2) consecutively. I have a large amount of cores available (100+), but I can only have 4 API calls running at the same time (limitation set by the API developers). So what I want to do is spawn 4 cores that start downloading by making an API-call, and as soon as one of those cores is done downloading and starts preprocessing I want a new core to start the whole process as well. This so there’s always 4 cores downloading, and as many cores as needed doing the pre-processing. I do not know however how to have a new core spawn as soon as another core is finished with the first part of the script.
My actual code is way too complex to just dump here, but let’s say I have the following two functions:
import requests def make_api_call(val): """Function that does part 1); makes an API call, stores it in memory and returns a large satellite GeoTIFF """ large_image = requests.get(val) return(large_image) def preprocess_large_image(large_image): """Function that does part 2); preprocesses a large image, and returns the relevant data """ results = preprocess(large_image) return(results)
how then can I make sure that as soon as a single core/process is finished with ‘make_api_call’ and starts with ‘preprocess_large_image’, another core spawns and starts the entire process as well? This so there is always 4 images downloading side-by-side. Thank you in advance for the help!
Advertisement
Answer
This is a perfect application for a multiprocessing.Semaphore
(or for safety, use a BoundedSemaphore
)! Basically you put a lock around the api call part of the process, but let up to 4 worker processes hold the lock at any given time. For various reasons, things like Lock
, Semaphore
, Queue
, etc all need to be passed at the creation of a Pool
, rather than when a method like map
or imap
is called. This is done by specifying an initialization function in the pool constructor.
def api_call(arg): return foo def process_data(foo): return "done" def map_func(arg): global semaphore with semaphore: foo = api_call(arg) return process_data(foo) def init_pool(s): global semaphore = s if __name__ == "__main__": s = mp.BoundedSemaphore(4) #max concurrent API calls with mp.Pool(n_workers, init_pool, (s,)) as p: #n_workers should be great enough that you always have a free worker waiting on semaphore.acquire() for result in p.imap(map_func, arglist): print(result)