Skip to content
Advertisement

Pickle/unpickle only once per worker

I am using python multiprocessing module to spread say 10000 steps of a given task on 4 workers using a Pool. The task that is sent on the workers is a method of a complex object. If I understand right the documentation, pickle is used to dump and load the object at each step which means 10000 pickling/unpickling calls. My problem is that the object that is pickled is quite complex (it contains a lot of aggregations of nested complex objects) and the picking/unpickling process take some times. Hence, my job is very much slower when it is run using multiprocessor regarding a monoprocessor call. My question is the following: is there a way to do the pickle/unpickle process only once per worker instead of once per step ?

EDIT: The code I try to parallelize has the following (simplified) structure:

import time
import multiprocessing

class Analysis:

    def run_step(self):

        print('run_step')

    def __getstate__(self):
        print('I dump')
        return self.__dict__

    def __setstate__(self,state):
        print('I load')
        self.__dict__ = state

a = Analysis()

pool = multiprocessing.Pool(4)

for i in range(10):
    pool.apply_async(a.run_step)
pool.close()
pool.join()

Advertisement

Answer

How about using Processes instead?

If such a structure is feasible for your use case, you can create another function for workers which run any target function you require. Then start the worker functions using multiprocessing.Process like below:

import math
import multiprocessing

class Analysis:

    def run_step(self):

        print('run_step')

    def __getstate__(self):
        print('I dump')
        return self.__dict__

    def __setstate__(self,state):
        print('I load')
        self.__dict__ = state

def worker(target, num):
    for _ in range(num):
        target()

if __name__ == "__main__":
    a = Analysis()
    proc = []
    proc_num = 4
    runs = 10
    per_proc_run = math.ceil(runs/proc_num)  # A little inaccurate but I am sure you can figure something out :)

    for _ in range(proc_num):
        proc.append(multiprocessing.Process(target=worker, args=(a.run_step, per_proc_run)))
        proc[-1].start()

    for process in proc:
        process.join()

Output:

I dump
I dump
I dump
I dump
I load
run_step
run_step
run_step
I load
run_step
run_step
run_step
I load
run_step
run_step
run_step
I load
run_step
run_step
run_step

Pickles/Unpickles only once per worker. You could probably replicate the same thing in pools but I find this more straightforward.

Advertisement