I am using python multiprocessing module to spread say 10000 steps of a given task on 4 workers using a Pool. The task that is sent on the workers is a method of a complex object. If I understand right the documentation, pickle is used to dump and load the object at each step which means 10000 pickling/unpickling calls. My problem is that the object that is pickled is quite complex (it contains a lot of aggregations of nested complex objects) and the picking/unpickling process take some times. Hence, my job is very much slower when it is run using multiprocessor regarding a monoprocessor call. My question is the following: is there a way to do the pickle/unpickle process only once per worker instead of once per step ?
EDIT: The code I try to parallelize has the following (simplified) structure:
import time
import multiprocessing
class Analysis:
    def run_step(self):
        print('run_step')
    def __getstate__(self):
        print('I dump')
        return self.__dict__
    def __setstate__(self,state):
        print('I load')
        self.__dict__ = state
a = Analysis()
pool = multiprocessing.Pool(4)
for i in range(10):
    pool.apply_async(a.run_step)
pool.close()
pool.join()
Advertisement
Answer
How about using Processes instead?
If such a structure is feasible for your use case, you can create another function for workers which run any target function you require. Then start the worker functions using multiprocessing.Process like below:
import math
import multiprocessing
class Analysis:
    def run_step(self):
        print('run_step')
    def __getstate__(self):
        print('I dump')
        return self.__dict__
    def __setstate__(self,state):
        print('I load')
        self.__dict__ = state
def worker(target, num):
    for _ in range(num):
        target()
if __name__ == "__main__":
    a = Analysis()
    proc = []
    proc_num = 4
    runs = 10
    per_proc_run = math.ceil(runs/proc_num)  # A little inaccurate but I am sure you can figure something out :)
    for _ in range(proc_num):
        proc.append(multiprocessing.Process(target=worker, args=(a.run_step, per_proc_run)))
        proc[-1].start()
    for process in proc:
        process.join()
Output:
I dump I dump I dump I dump I load run_step run_step run_step I load run_step run_step run_step I load run_step run_step run_step I load run_step run_step run_step
Pickles/Unpickles only once per worker. You could probably replicate the same thing in pools but I find this more straightforward.