Assume that we want to send a lot of web requests, and do something with the data that we get back. The data crunching that we have to do on the response is quite heavy, so we’d want to parallelize this: a main process distributes query URLs to child processes, which then fetch the data and do some processing. Simple enough! But the issue that I am having is that it is not known in advance how many URLs there are. The only way that we’ll know is when one of the child processes returns an error (well, actually when it returns None
).
I have two possible approaches in mind but both have the same issue: I cannot seem to figure out how to deal with the fact that the input list (URLs to query) is unknown in size (its dynamically generated).
Attempt #1: create a queue, set up separate process that generates URLs (here it generates indices) and puts those in a queue. Then create child processes that listen to this queue and process the links (here as simple as get the corresponding item of a list, but return None when out of index – the exit scenario). The problem is that I do not know how I can tell the writer that it should stop putting things on the queue as soon as one of the processes yields None
.
Note: this script will run infinitely as there is no break
on the writer.
from multiprocessing import Pool, Process, Queue RESPONSES = ["hello", "this", "is", "a", "response", "to", "your", "request"] # We do not know this length def send_request(idx): try: return RESPONSES[idx] except IndexError: return None def worker(q): while True: # Get work from the working queue idx = q.get() resp = send_request(idx) print(resp) def writer(q): idx = 0 while True: # How can I stop this when any response is None q.put(idx) idx += 1 def main(): work_q = Queue() writer_proc = Process(target=writer, args=(work_q,)) writer_proc.start() pool = Pool(3, worker, (work_q,)) pool.close() pool.join() writer_proc.join() writer_proc.terminate() if __name__ == '__main__': main()
Attempt #2: this is a less involved approach (no queues) and it uses apply_async
on the pool to just continuously add new URLs that can be checked. But here as well, I do not know how to break that while-loop as soon as one of the workers sends back None
.
from multiprocessing import Pool RESPONSES = ["hello", "this", "is", "a", "response", "to", "your", "request"] def send_request(idx): try: return RESPONSES[idx] except IndexError: return None def main(): with Pool(3) as pool: idx = 0 while True: # Can't make this loop within fixed range because we do not know how many responses there are pool.apply_async(send_request, (idx,)) idx += 1 # How do I break out here as soon as a request returns None if __name__ == '__main__': main()
So in short: if I do not know in advance how many jobs to complete, or rather – it depends on the responses of the workers whether or not to add more jobs, how can I make this work with a pool of workers in Python?
Advertisement
Answer
You can use multiprocessing.Value
These create shared memory which can be accessed through child processes and reflect any changes made to them from any process. So, create a shared flag like this:
import multiprocessing from ctypes import c_bool . . . if __name__ == "__main__": # Create flag with inital value True flag = multiprocessing.Value(c_bool, "True")
And then pass this flag to each process you create. Then, when you no longer want to send requests, you can do the below from inside send_request
.
flag.value = False
Also, edit your writer
to include a check whether the flag is set to True
or not in the while
statement (remember the value stored needs to be accessed using .value
attribute!):
while flag.value:
Keep in mind that shared memory is not thread-safe. While that should not affect your case since you are using it just as a flag, you can specify lock=True
keyword argument when creating the flag to use locks internally