Skip to content
Advertisement

Add unknown number of jobs to pool until a job returns None

Assume that we want to send a lot of web requests, and do something with the data that we get back. The data crunching that we have to do on the response is quite heavy, so we’d want to parallelize this: a main process distributes query URLs to child processes, which then fetch the data and do some processing. Simple enough! But the issue that I am having is that it is not known in advance how many URLs there are. The only way that we’ll know is when one of the child processes returns an error (well, actually when it returns None).

I have two possible approaches in mind but both have the same issue: I cannot seem to figure out how to deal with the fact that the input list (URLs to query) is unknown in size (its dynamically generated).

Attempt #1: create a queue, set up separate process that generates URLs (here it generates indices) and puts those in a queue. Then create child processes that listen to this queue and process the links (here as simple as get the corresponding item of a list, but return None when out of index – the exit scenario). The problem is that I do not know how I can tell the writer that it should stop putting things on the queue as soon as one of the processes yields None.

Note: this script will run infinitely as there is no break on the writer.

from multiprocessing import Pool, Process, Queue

RESPONSES = ["hello", "this", "is", "a", "response", "to", "your", "request"]  # We do not know this length


def send_request(idx):
    try:
        return RESPONSES[idx]
    except IndexError:
        return None


def worker(q):
    while True:
        # Get work from the working queue
        idx = q.get()
        resp = send_request(idx)
        print(resp)


def writer(q):
    idx = 0
    while True:  # How can I stop this when any response is None
        q.put(idx)
        idx += 1


def main():
    work_q = Queue()
    writer_proc = Process(target=writer, args=(work_q,))
    writer_proc.start()

    pool = Pool(3, worker, (work_q,))
    pool.close()
    pool.join()

    writer_proc.join()
    writer_proc.terminate()


if __name__ == '__main__':
    main()

Attempt #2: this is a less involved approach (no queues) and it uses apply_async on the pool to just continuously add new URLs that can be checked. But here as well, I do not know how to break that while-loop as soon as one of the workers sends back None.

from multiprocessing import Pool

RESPONSES = ["hello", "this", "is", "a", "response", "to", "your", "request"]


def send_request(idx):
    try:
        return RESPONSES[idx]
    except IndexError:
        return None


def main():
    with Pool(3) as pool:
        idx = 0
        while True:  # Can't make this loop within fixed range because we do not know how many responses there are
            pool.apply_async(send_request, (idx,))
            idx += 1
            # How do I break out here as soon as a request returns None


if __name__ == '__main__':
    main()

So in short: if I do not know in advance how many jobs to complete, or rather – it depends on the responses of the workers whether or not to add more jobs, how can I make this work with a pool of workers in Python?

Advertisement

Answer

You can use multiprocessing.Value

These create shared memory which can be accessed through child processes and reflect any changes made to them from any process. So, create a shared flag like this:

import multiprocessing
from ctypes import c_bool

.
.
.

if __name__ == "__main__":

    # Create flag with inital value True
    flag = multiprocessing.Value(c_bool, "True")

And then pass this flag to each process you create. Then, when you no longer want to send requests, you can do the below from inside send_request.

flag.value = False

Also, edit your writer to include a check whether the flag is set to True or not in the while statement (remember the value stored needs to be accessed using .value attribute!):

while flag.value:

Keep in mind that shared memory is not thread-safe. While that should not affect your case since you are using it just as a flag, you can specify lock=True keyword argument when creating the flag to use locks internally

Advertisement