Skip to content
Advertisement

How to create a continuous stream of Python’s concurrent.futures.ProcessPoolExecutor.submits()?

I am able to submit batches of concurrent.futures.ProcessPoolExecutor.submits() where each batch may contain several submit(). However, I noticed that if each batch of submits consumes a significant about of RAM, there can be quite a bit of RAM usage inefficiencies; need to wait for all futures in the batch to be completed before another batch of submit() can be submitted.

How does one create a continuous stream of Python’s concurrent.futures.ProcessPoolExecutor.submit() until some condition is satisfied?

Test Script:

#!/usr/bin/env python3

import numpy as np
from numpy.random import default_rng, SeedSequence
import concurrent.futures as cf
from itertools import count


def dojob( process, iterations, samples, rg ):
    # Do some tasks
    result = []
    for i in range( iterations ):
        a = rg.standard_normal( samples )
        b = rg.integers( -3, 3, samples )
        mean = np.mean( a + b )
        result.append( ( i, mean ) )
        return { process : result }

if __name__ == '__main__':

    cpus = 2
    iterations = 10000
    samples = 1000

    # Setup NumPy Random Generator
    ss = SeedSequence( 1234567890 )
    child_seeds = ss.spawn( cpus )
    rg_streams = [ default_rng(s) for s in child_seeds ]

    # Peform concurrent analysis by batches
    counter = count( start=0, step=1 )

    # Serial Run of dojob
    process = next( counter )
    for cpu in range( cpus ):
        process = next( counter )
        rg = rg_streams[ cpu ]
        rdict = dojob( process, iterations, samples, rg )
    print( 'rdict', rdict )

    
    # Concurrent Run of dojob
    futures = []
    results = []
    with cf.ProcessPoolExecutor( max_workers=cpus ) as executor:

        while True:
            
            for cpu in range( cpus ):
                process = next( counter )
                rg = rg_streams[ cpu ]
                futures.append( executor.submit( dojob, process, iterations, samples, rg ) )
                
            for future in cf.as_completed( futures ):
                # Do some post processing
                r = future.result()
                for k, v in r.items():
                    if len( results ) < 5000:
                        results.append( np.std( v ) )
                        print( k, len(results) )

            if len(results) <= 100: #Put a huge number to simulate continuous streaming 
                futures = []
                child_seeds = child_seeds[0].spawn( cpus )
                rg_streams = [ default_rng(s) for s in child_seeds ]
            else:
                break

    print( 'n*** Concurrent Analyses Ended ***' )

Advertisement

Answer

To expand on my comment, how about something like this, using the completion callback and a threading.Condition? I took the liberty of adding a progress indicator too.

EDIT: I refactored this into a neat function you pass your desired concurrency and queue depth, as well as a function that generates new jobs, and another function that processes a result and lets the executor know whether you’ve had enough.

import concurrent.futures as cf
import threading
import time
from itertools import count

import numpy as np
from numpy.random import SeedSequence, default_rng


def dojob(process, iterations, samples, rg):
    # Do some tasks
    result = []
    for i in range(iterations):
        a = rg.standard_normal(samples)
        b = rg.integers(-3, 3, samples)
        mean = np.mean(a + b)
        result.append((i, mean))
    return {process: result}


def execute_concurrently(cpus, max_queue_length, get_job_fn, process_result_fn):
    running_futures = set()
    jobs_complete = 0
    job_cond = threading.Condition()
    all_complete_event = threading.Event()

    def on_complete(future):
        nonlocal jobs_complete
        if process_result_fn(future.result()):
            all_complete_event.set()
        running_futures.discard(future)
        jobs_complete += 1
        with job_cond:
            job_cond.notify_all()

    time_since_last_status = 0
    start_time = time.time()
    with cf.ProcessPoolExecutor(cpus) as executor:
        while True:
            while len(running_futures) < max_queue_length:
                fn, args = get_job_fn()
                fut = executor.submit(fn, *args)
                fut.add_done_callback(on_complete)
                running_futures.add(fut)

            with job_cond:
                job_cond.wait()

            if all_complete_event.is_set():
                break

            if time.time() - time_since_last_status > 1.0:
                rps = jobs_complete / (time.time() - start_time)
                print(
                    f"{len(running_futures)} running futures on {cpus} CPUs, "
                    f"{jobs_complete} complete. RPS: {rps:.2f}"
                )
                time_since_last_status = time.time()


def main():
    ss = SeedSequence(1234567890)
    counter = count(start=0, step=1)
    iterations = 10000
    samples = 1000
    results = []

    def get_job():
        seed = ss.spawn(1)[0]
        rg = default_rng(seed)
        process = next(counter)
        return dojob, (process, iterations, samples, rg)

    def process_result(result):
        for k, v in result.items():
            results.append(np.std(v))
        if len(results) >= 10000:
            return True  # signal we're complete

    execute_concurrently(
        cpus=16,
        max_queue_length=20,
        get_job_fn=get_job,
        process_result_fn=process_result,
    )


if __name__ == "__main__":
    main()
Advertisement