I’ve been trying to get to grips with how I can use concurrent.futures to call a function 3 times every second, without waiting for it to return. I will collect the results after I’ve made all the calls I need to make.
Here is where I am at the moment, and I’m surprised that sleep() within this example function prevents my code from launching the next chunk of 3 function calls. I’m obviously not understanding the documentation well enough here :)
def print_something(thing): print(thing) time.sleep(10) # define a generator def chunks(l, n): """Yield successive n-sized chunks from l.""" for i in range(0, len(l), n): yield l[i:i + n] def main(): chunk_number = 0 alphabet = ['a','b','c','d','e','f','g','h','i','j','k','l','m','n','o','p','q','r','s','t','u','v','w','x','y','z'] for current_chunk in chunks(alphabet, 3): # Restrict to calling the function 3 times per second with ProcessPoolExecutor(max_workers=3) as executor: futures = { executor.submit(print_something, thing): thing for thing in current_chunk } chunk_number += 1 print('chunk %s' % chunk_number) time.sleep(1) for result in as_completed(futures): print(result.result())
This code results in chunks of 3 being printed with a sleep time of 10s between each chunk.How can I change this to ensure I’m not waiting for the function to return before calling for the next batch ?
Thanks
Advertisement
Answer
First, for each iteration of for current_chunk in chunks(alphabet, 3):
, you are creating a new ProcessPoolExecutor
instance and futures
dictionary instance clobbering the previous one. So the final loop for result in as_completed(futures):
would only be printing the results from the last chunk submitted. Second, and the reason why I believe you are hanging, your block that is governed by with ProcessPoolExecutor(max_workers=3) as executor:
will not terminate until the tasks that are submitted by the executor
are completed and that will take at least 10 seconds. So, the next iteration of the for current_chunk in chunks(alphabet, 3):
block won’t be executed more frequently than once every 10 seconds.
Note also that the block for result in as_completed(futures):
needs to be moved within the with ThreadPoolExecutor(max_workers=26) as executor:
block for the same reason. That is, if it is placed after, it will not be executed until all the tasks have completed and so you will not be able to get results “as they complete.”
You need to do a bit of rearranging as shown below (I have also modified print_something
to return something other than None
. There should be no hangs now if you have enough workers (26) to run the 26 tasks being submitted. I doubt your desktop (if you are running this on your PC) has 26 cores to support 26 concurrently executing processes. But I note that print_something
only prints a short string and then sleeps for 10 seconds, which allows it to relinquish its processor to another process in the pool. So, while with cpu-intensive tasks, little is to be gained by specifying a max_workers
value greater than the number of actual physical processors/cores you have on your computer, in this case it’s OK. But more efficient when you have tasks that spend little time executing actual Python byte code is to use threading instead of processes, since the cost of creating threads is much less than the cost of creating processes. However, threading is notoriously poor when the tasks you are running largely consists of Python byte code since such code cannot be executed concurrently due to serialization of the Global Interpreter Lock (GIL).
Topic for you to research: The Global Interpreter Lock (GIL) and Python byte code execution
Update to use threads:
So we should substitute the ThreadPoolExecutor
with 26 or more light-weight threads for the ProcessPoolExecutor
. The beauty of the concurrent.futures
module is that no other code needs to be changed. But most important is to change the block structure and have a single executor
.
from concurrent.futures import ThreadPoolExecutor, as_completed import time def print_something(thing): # NOT cpu-intensive, so threads should work well here print(thing) time.sleep(10) return thing # so there is a non-None result # define a generator def chunks(l, n): """Yield successive n-sized chunks from l.""" for i in range(0, len(l), n): yield l[i:i + n] def main(): chunk_number = 0 alphabet = ['a','b','c','d','e','f','g','h','i','j','k','l','m','n','o','p','q','r','s','t','u','v','w','x','y','z'] futures = {} with ThreadPoolExecutor(max_workers=26) as executor: for current_chunk in chunks(alphabet, 3): # Restrict to calling the function 3 times per second futures.update({executor.submit(print_something, thing): thing for thing in current_chunk }) chunk_number += 1 print('chunk %s' % chunk_number) time.sleep(1) # needs to be within the executor block else it won't run until all futures are complete for result in as_completed(futures): print(result.result()) if __name__ == '__main__': main()