I am able to submit batches of concurrent.futures.ProcessPoolExecutor.submits()
where each batch may contain several submit()
. However, I noticed that if each batch of submits consumes a significant about of RAM, there can be quite a bit of RAM usage inefficiencies; need to wait for all futures in the batch to be completed before another batch of submit()
can be submitted.
How does one create a continuous stream of Python’s concurrent.futures.ProcessPoolExecutor.submit()
until some condition is satisfied?
Test Script:
#!/usr/bin/env python3 import numpy as np from numpy.random import default_rng, SeedSequence import concurrent.futures as cf from itertools import count def dojob( process, iterations, samples, rg ): # Do some tasks result = [] for i in range( iterations ): a = rg.standard_normal( samples ) b = rg.integers( -3, 3, samples ) mean = np.mean( a + b ) result.append( ( i, mean ) ) return { process : result } if __name__ == '__main__': cpus = 2 iterations = 10000 samples = 1000 # Setup NumPy Random Generator ss = SeedSequence( 1234567890 ) child_seeds = ss.spawn( cpus ) rg_streams = [ default_rng(s) for s in child_seeds ] # Peform concurrent analysis by batches counter = count( start=0, step=1 ) # Serial Run of dojob process = next( counter ) for cpu in range( cpus ): process = next( counter ) rg = rg_streams[ cpu ] rdict = dojob( process, iterations, samples, rg ) print( 'rdict', rdict ) # Concurrent Run of dojob futures = [] results = [] with cf.ProcessPoolExecutor( max_workers=cpus ) as executor: while True: for cpu in range( cpus ): process = next( counter ) rg = rg_streams[ cpu ] futures.append( executor.submit( dojob, process, iterations, samples, rg ) ) for future in cf.as_completed( futures ): # Do some post processing r = future.result() for k, v in r.items(): if len( results ) < 5000: results.append( np.std( v ) ) print( k, len(results) ) if len(results) <= 100: #Put a huge number to simulate continuous streaming futures = [] child_seeds = child_seeds[0].spawn( cpus ) rg_streams = [ default_rng(s) for s in child_seeds ] else: break print( 'n*** Concurrent Analyses Ended ***' )
Advertisement
Answer
To expand on my comment, how about something like this, using the completion callback and a threading.Condition
? I took the liberty of adding a progress indicator too.
EDIT: I refactored this into a neat function you pass your desired concurrency and queue depth, as well as a function that generates new jobs, and another function that processes a result and lets the executor know whether you’ve had enough.
import concurrent.futures as cf import threading import time from itertools import count import numpy as np from numpy.random import SeedSequence, default_rng def dojob(process, iterations, samples, rg): # Do some tasks result = [] for i in range(iterations): a = rg.standard_normal(samples) b = rg.integers(-3, 3, samples) mean = np.mean(a + b) result.append((i, mean)) return {process: result} def execute_concurrently(cpus, max_queue_length, get_job_fn, process_result_fn): running_futures = set() jobs_complete = 0 job_cond = threading.Condition() all_complete_event = threading.Event() def on_complete(future): nonlocal jobs_complete if process_result_fn(future.result()): all_complete_event.set() running_futures.discard(future) jobs_complete += 1 with job_cond: job_cond.notify_all() time_since_last_status = 0 start_time = time.time() with cf.ProcessPoolExecutor(cpus) as executor: while True: while len(running_futures) < max_queue_length: fn, args = get_job_fn() fut = executor.submit(fn, *args) fut.add_done_callback(on_complete) running_futures.add(fut) with job_cond: job_cond.wait() if all_complete_event.is_set(): break if time.time() - time_since_last_status > 1.0: rps = jobs_complete / (time.time() - start_time) print( f"{len(running_futures)} running futures on {cpus} CPUs, " f"{jobs_complete} complete. RPS: {rps:.2f}" ) time_since_last_status = time.time() def main(): ss = SeedSequence(1234567890) counter = count(start=0, step=1) iterations = 10000 samples = 1000 results = [] def get_job(): seed = ss.spawn(1)[0] rg = default_rng(seed) process = next(counter) return dojob, (process, iterations, samples, rg) def process_result(result): for k, v in result.items(): results.append(np.std(v)) if len(results) >= 10000: return True # signal we're complete execute_concurrently( cpus=16, max_queue_length=20, get_job_fn=get_job, process_result_fn=process_result, ) if __name__ == "__main__": main()