Skip to content
Advertisement

Using a class decorator to automatically run a method with a child process

I was asked to develop a consistent way to run(train, make predictions, etc.) any ML model from the command line. I also need to periodically check the DB for requests related to training, like abort requests. To minimize the effect checking the DB has on training, I want to create a separate process for fetching requests from the DB.

So I created an abstract class RunnerBaseClass which requires its child classes to implement _train() for each ML model, and it will run _train() with _check_db() using the multiprocessing module when you call run().

I also want to get rid of the need for the boilerplate

if __name__ == '__main__':
   ...

code, and make argument parsing, creating an instance, and calling the run() method done automatically.

So I created a class decorator @autorun which calls the run() method of the class when the script is run directly from the command line. When run, the decorator successfully calls run(), but there seems to be a problem creating a subprocess with the class’ method and the following error occurs:

Traceback (most recent call last):
  File "run.py", line 4, in <module>
    class Runner(RunnerBaseClass):
  File "/Users/yongsinp/Downloads/runner_base.py", line 27, in class_decorator
    instance.run()
  File "/Users/yongsinp/Downloads/runner_base.py", line 16, in run
    db_check_process.start()
  File "/Users/yongsinp/miniforge3/envs/py3.8/lib/python3.8/multiprocessing/process.py", line 121, in start
    self._popen = self._Popen(self)
  File "/Users/yongsinp/miniforge3/envs/py3.8/lib/python3.8/multiprocessing/context.py", line 224, in _Popen
    return _default_context.get_context().Process._Popen(process_obj)
  File "/Users/yongsinp/miniforge3/envs/py3.8/lib/python3.8/multiprocessing/context.py", line 284, in _Popen
    return Popen(process_obj)
  File "/Users/yongsinp/miniforge3/envs/py3.8/lib/python3.8/multiprocessing/popen_spawn_posix.py", line 32, in __init__
    super().__init__(process_obj)
  File "/Users/yongsinp/miniforge3/envs/py3.8/lib/python3.8/multiprocessing/popen_fork.py", line 19, in __init__
    self._launch(process_obj)
  File "/Users/yongsinp/miniforge3/envs/py3.8/lib/python3.8/multiprocessing/popen_spawn_posix.py", line 47, in _launch
    reduction.dump(process_obj, fp)
  File "/Users/yongsinp/miniforge3/envs/py3.8/lib/python3.8/multiprocessing/reduction.py", line 60, in dump
    ForkingPickler(file, protocol).dump(obj)
_pickle.PicklingError: Can't pickle <class '__main__.Runner'>: attribute lookup Runner on __main__ failed

Here’s a minimal code that can be used to reproduce the error.

runner_base.py:

from abc import ABC, abstractmethod
from multiprocessing import Process


class RunnerBaseClass(ABC):
    @abstractmethod
    def _train(self) -> None:
        ...

    def _check_db(self):
        print("Checking DB")

    def run(self) -> None:
        db_check_process = Process(target=self._check_db)

        db_check_process.start()
        self._train()

        db_check_process.join()


def autorun(env_name: str):
    def class_decorator(class_):
        instance = class_()

        if env_name == '__main__':
            instance.run()

        return instance

    return class_decorator

run.py:

from runner_base import RunnerBaseClass, autorun

@autorun(__name__)
class Runner(RunnerBaseClass):
    def _train(self) -> None:
       print("Training")

I have looked up the cause for this error and can simply fix it by not using the decorator, or turning the method into a function.

runner_base.py:

from abc import ABC, abstractmethod
from multiprocessing import Process


class RunnerBaseClass(ABC):
    @abstractmethod
    def _train(self) -> None:
        ...

    def run(self) -> None:
        db_check_process = Process(target=check_db)

        db_check_process.start()
        self._train()

        db_check_process.join()


def autorun(env_name: str):
    def class_decorator(class_):
        instance = class_()

        if env_name == '__main__':
            instance.run()

        return instance

    return class_decorator


def check_db():
    print("Checking DB")

I can just use the function instead of the method and be done with it, but I don’t like the idea of passing configurations and objects for inter-process communication(like Queue) to the function which I don’t have to when using a method. So, is there a way for me to keep _check_db() a method, and use the @autorun decorator?

(I am aware of using dill and other modules, but I’d like to stick with the builtin ones if possible.)

Advertisement

Answer

There might be a couple of misunderstandings here.

I can just use the function instead of the method and be done with it, but I don’t like the idea of passing configurations and an object for communication in between processes to the function which I don’t have to when using a method

It’s understandable why you might think this, but your logic for using a method rather a function is flawed if you are planning to modify objects of Runner, in either the child or the parent processes. When you spawn processes using start method “spawn” (the default on Windows and macOS), the child processes don’t have access to the parent’s memory space. Therefore, if you create an object of Runner, and pass it to a process, that process will have a duplicate of that object with a different memory address than the one present in the parent. Any modifications made to these objects will not be propagated across processes. Same goes for start method “fork” (default on Unix), the only difference being that it uses copy-on-write, where, rather than on start, the duplicate will only be created if you attempt to modify the object in the child process.

So just keep in mind that sharing objects like you are trying to do only makes sense if you aim to use the objects as read-only (like passing configurations and data from one process to another), and don’t care about whether the changes made to them are reflected in the other processes. If you also want them to be writable, you can simply use managers like this answer mentions. Keep in mind that using managers will negatively impact your code’s performance (as communication will require all data to be serialized).

This brings us to the next question: can you even pass complex objects to another process?

Multiprocessing uses pickle to transfer data from one process to another. This means that any object passed as an argument must be picklable. Whether or not pickle can serialize complex objects like instances of Runner is then very much dependent on the instance attributes the object has. In your case, the problem isn’t with pickling your instance, it’s with the fact that you are attempting to do so when the class Runner hasn’t even been added to the top-module. To check this, change your decorator to print whether the class exists in the global attributes before it attempts to create an instance:

def autorun(env_name: str):
    def class_decorator(class_):
        print(class_.__name__ in globals())
        instance = class_()

        if env_name == '__main__':
            instance.run()

        return instance

    return class_decorator

Output:

False

In general, attributes not defined at the top module are not picklable with pickle, and this is why your code fails with a pickling error. Additionally, you also won’t be able to use class ABC, since that can’t be pickled either.

So what’s the solution?

I recommend you to look outside the builtins to achieve what you want, or, like you mentioned, change the method check_db into a function. Apart from that, there is also a rather unintuitive workaround that you can use.

Method 1

If you do decide to use something better, like multiprocess, which uses dill rather than pickle, your code will become like this:

from multiprocess import Process


class RunnerBaseClass:

    def _train(self) -> None:
        ...

    def _check_db(self):
        print("Checking DB")

    def run(self) -> None:
        db_check_process = Process(target=self._check_db)

        db_check_process.start()
        self._train()

        db_check_process.join()


def autorun(env_name: str):
    def class_decorator(class_):
        instance = class_()

        if env_name == '__main__':
            instance.run()

        return instance

    return class_decorator


@autorun(__name__)
class Runner(RunnerBaseClass):
    def _train(self) -> None:
       print("Training")

Output

Training
Checking DB

Method 2

The second method relies on you changing the decorator to create an instance of the passed class’s parent class instead, and attach it to a child of Runner. Consider this code:

from multiprocessing import Process

    class RunnerBaseClass:

    def _train(self) -> None:
        ...

    def _check_db(self):
        print("Checking DB")

    def run(self) -> None:
        db_check_process = Process(target=self._check_db)

        db_check_process.start()
        self._train()

        db_check_process.join()


def autorun(env_name: str):
    def class_decorator(class_):

        # Create instance of parent class
        instance = class_.__bases__[0]()

        if env_name == '__main__':
            instance.run()

        return instance

    return class_decorator


class Runner(RunnerBaseClass):
    def _train(self) -> None:
        print("Training")


@autorun(__name__)
class RunnerChild(Runner):
    pass

Here, we attach the decorator to RunnerChild, a child of class Runner. The decorator then creates an instance of RunnerChild‘s parent class and executes run(). By doing it in this order, the Runner class has already been added to the top-module and can therefore be pickled.

Output

Training
Checking DB
2 People found this is helpful
Advertisement