I try to parallelize a part of a huge project. I got a lattice consistent of points and I perform a calculation at each individual point of the lattice. To speed up the calculation I want to subdivide the points of the lattice into different sublists and perform the calculations on individual processes via a ProcessPoolExecutor. However, if I pass the list to ProcessPoolExecutor it seems like the list gets copied and no reference is passed.
A minimal version of my code is as follows:
from time import time from concurrent.futures import ProcessPoolExecutor class Lattice(): def __init__(self, rndl): self.Points = [] for val in rndl: P = Point(val) self.Points.append(P) class Point(): def __init__(self, val): self.val = val def calculate(self, it = 100): for i in range(it): some_tmp_value = self.val**(1.0/10) some_tmp_value = some_tmp_value**(1.0/10) self.val -= 1 #update values def do_calculation_ser_2(lattice, it = 100): workers = 2 splitted_list = [lattice.Points[i::workers] for i in range(workers)] for sublist in splitted_list: for P in sublist: P.calculate(it) def par_calc_helper(sublist, it): for i,P in enumerate(sublist): P.calculate(it) return None def do_calculation_par_partitioning(lattice, it): workers = 2 #define number of subprocesses #split list in chunks splitted_list = [lattice.Points[i::workers] for i in range(workers)] with ProcessPoolExecutor(max_workers = workers) as executor: for i, sublist in enumerate(splitted_list): future = executor.submit(par_calc_helper, sublist, it) def check_calc(lattice, rndl): for i,(P, val_rnd) in enumerate(zip(lattice.Points, rndl)): error = False P.val += 1 if P.val != val_rnd: print("ERROR - Calulation gone wrong") error = True break if not error: print("Calculation gone right") if __name__ == '__main__': max_iter = 2*8 rndl = [4,4,4,4] lat = Lattice(rndl) do_calculation_ser_2(lat, max_iter) check_calc(lat, rndl) do_calculation_par_partitioning(lat, max_iter) check_calc(lat, rndl)
Output:
Calculation gone right ERROR - Calulation gone wrong
The lattice class is just a container for all points. The point class has just a value and a calculate method. The for loop inside the calculate method is just that it takes some time to execute the function. The result is the point vale decremented by 1 so i can check easily if the calculation is done right later on.
Then the do_calculation_ser_2 method is the serial version of the calculation. I split the point list into a number of sublists and iterate over them and perform the calculation. I know the splitting is useless here, but I wanted to keep it similar to the parallel version, so I can tackle errors better.
The do_calculation_par_partitioning method is used to parallelize my calculation. First, I subdivide the points of my lattice into sublists. Then I use the helper function to iterate over the sublist and pass the helper function along with the sublist to my ProcessPoolExecutor.
Lastly, the check_calc function is used to check if the calculation is done right and it increments the point values again by one to get the lattice from before.
As a test, I just initiate 4 points with the value 4 and perform the calculation. When I try to run it, the serial version works perfectly fine. However, the parallel version does not work for the points. The calculation methods are called correctly (I can see that if I insert print statements inside the methods), but the resulting value is not set correctly and stays 4 instead of the expected 3.
I assume that passing a list of objects towards a ProcessPoolExecutor will copy the objects in the list instead of just referencing them (as in the serial version). Is this the case? If yes, how do I pass a list of objects without copying them each time? (Would be bad for really large calculations). Would passing a copy of the lists and replacing the main lists with the result of the calculations work best or is there a better way?
If my approach is completely wrong and you got better ideas to do this in Python please let me know.
Advertisement
Answer
You are knocking your head on a process boundary… More seriously, the ProcessPoolExecutor starts a number of worker processes, serializes the value that will be passed to them and pass the serialized value in a pipe. Then each worker does it work in its own memory zone and the master process data cannot be updated.
A possible way is that each worker returns its modified list (which will again be serialized) and the parent concatenates them. Alternatively, the multiprocessing module provides shared types using shared memory, but it comes at the price of additional synchronization. In your use case, I would probably just return the modified lists.