Skip to content
Advertisement

Using concurrent.futures to call a fn in parallel every second

I’ve been trying to get to grips with how I can use concurrent.futures to call a function 3 times every second, without waiting for it to return. I will collect the results after I’ve made all the calls I need to make.

Here is where I am at the moment, and I’m surprised that sleep() within this example function prevents my code from launching the next chunk of 3 function calls. I’m obviously not understanding the documentation well enough here :)

def print_something(thing):
    print(thing)
    time.sleep(10)

# define a generator 
def chunks(l, n):
    """Yield successive n-sized chunks from l."""    
    for i in range(0, len(l), n):
        yield l[i:i + n]

def main():    
    chunk_number = 0
    alphabet = ['a','b','c','d','e','f','g','h','i','j','k','l','m','n','o','p','q','r','s','t','u','v','w','x','y','z']
    for current_chunk in chunks(alphabet, 3):  # Restrict to calling the function 3 times per second
        with ProcessPoolExecutor(max_workers=3) as executor:        
            futures = { executor.submit(print_something, thing): thing for thing in current_chunk }
            chunk_number += 1            
            print('chunk %s' % chunk_number)
            time.sleep(1)            
        
    for result in as_completed(futures): 
        print(result.result())

This code results in chunks of 3 being printed with a sleep time of 10s between each chunk.How can I change this to ensure I’m not waiting for the function to return before calling for the next batch ?

Thanks

Advertisement

Answer

First, for each iteration of for current_chunk in chunks(alphabet, 3):, you are creating a new ProcessPoolExecutor instance and futures dictionary instance clobbering the previous one. So the final loop for result in as_completed(futures): would only be printing the results from the last chunk submitted. Second, and the reason why I believe you are hanging, your block that is governed by with ProcessPoolExecutor(max_workers=3) as executor: will not terminate until the tasks that are submitted by the executor are completed and that will take at least 10 seconds. So, the next iteration of the for current_chunk in chunks(alphabet, 3): block won’t be executed more frequently than once every 10 seconds.

Note also that the block for result in as_completed(futures): needs to be moved within the with ThreadPoolExecutor(max_workers=26) as executor: block for the same reason. That is, if it is placed after, it will not be executed until all the tasks have completed and so you will not be able to get results “as they complete.”

You need to do a bit of rearranging as shown below (I have also modified print_something to return something other than None. There should be no hangs now if you have enough workers (26) to run the 26 tasks being submitted. I doubt your desktop (if you are running this on your PC) has 26 cores to support 26 concurrently executing processes. But I note that print_something only prints a short string and then sleeps for 10 seconds, which allows it to relinquish its processor to another process in the pool. So, while with cpu-intensive tasks, little is to be gained by specifying a max_workers value greater than the number of actual physical processors/cores you have on your computer, in this case it’s OK. But more efficient when you have tasks that spend little time executing actual Python byte code is to use threading instead of processes, since the cost of creating threads is much less than the cost of creating processes. However, threading is notoriously poor when the tasks you are running largely consists of Python byte code since such code cannot be executed concurrently due to serialization of the Global Interpreter Lock (GIL).

Topic for you to research: The Global Interpreter Lock (GIL) and Python byte code execution

Update to use threads:

So we should substitute the ThreadPoolExecutor with 26 or more light-weight threads for the ProcessPoolExecutor. The beauty of the concurrent.futures module is that no other code needs to be changed. But most important is to change the block structure and have a single executor.

from concurrent.futures import ThreadPoolExecutor, as_completed
import time

def print_something(thing):
    # NOT cpu-intensive, so threads should work well here
    print(thing)
    time.sleep(10)
    return thing # so there is a non-None result
    

# define a generator
def chunks(l, n):
    """Yield successive n-sized chunks from l."""
    for i in range(0, len(l), n):
        yield l[i:i + n]

def main():
    chunk_number = 0
    alphabet = ['a','b','c','d','e','f','g','h','i','j','k','l','m','n','o','p','q','r','s','t','u','v','w','x','y','z']
    futures = {}
    with ThreadPoolExecutor(max_workers=26) as executor:
        for current_chunk in chunks(alphabet, 3):  # Restrict to calling the function 3 times per second
            futures.update({executor.submit(print_something, thing): thing for thing in current_chunk })
            chunk_number += 1
            print('chunk %s' % chunk_number)
            time.sleep(1)

        # needs to be within the executor block else it won't run until all futures are complete    
        for result in as_completed(futures):
            print(result.result())

if __name__ == '__main__':
    main()
User contributions licensed under: CC BY-SA
2 People found this is helpful
Advertisement