I have overridden multiprocess.Process
(fork of multiprocessing library) like so:
# source.Process.py class Process(multiprocess.Process): def __init__(self, test_name: str, *args, **kwargs) -> None: multiprocess.Process.__init__(self, *args, **kwargs) self._parent_conn, self._child_conn = multiprocess.Pipe() self._exception = None self._test_name = test_name def run(self) -> None: try: start = time.perf_counter() logger = S_Logger(self._test_name).get_logger() logger.info('EXECUTION OF %s HAS STARTED.', self._test_name) multiprocess.Process.run(self) self._child_conn.send(None) except Exception as e: tb = traceback.format_exc() logger.error(f'EXCEPTION OCCURRED: {tb}') self._child_conn.send((e, tb)) finally: logger.info('EXECUTION OF %s HAS ENDED.', self._test_name) end = time.perf_counter() logger.info(f'FINISHED in {round(end-start, 2)} second(s)')
When I create normal Process using this class everything works perfectly, including creating logs.
Now I want to create a Process Pool of such customized processes but I encountered problem with respawning such processes after they life comes to an end. Here is how I create pool with additional maxtasksperchild=1
argument.
from source.process import Process ctx = multiprocess.get_context() def run_tests(self): def worker(x): print(x**2) time.sleep(1) with ctx.Pool(processes=4, maxtasksperchild=1) as pool: nums = range(10) ctx.Process = Process(test_name='test_name') pool.map(worker, nums)
This gives me such output:
0 1 4 9 Exception in thread Thread-1 (_handle_workers): Traceback (most recent call last): File "C:Program FilesWindowsAppsPythonSoftwareFoundation.Python.3.10_3.10.2032.0_x64__qbz5n2kfra8p0libthreading.py", line 1016, in _bootstrap_inner self.run() File "C:Program FilesWindowsAppsPythonSoftwareFoundation.Python.3.10_3.10.2032.0_x64__qbz5n2kfra8p0libthreading.py", line 953, in run self._target(*self._args, **self._kwargs) File "C:Users<user>DocumentsProjectssprinter.venvlibsite-packagesmultiprocesspool.py", line 513, in _handle_workers cls._maintain_pool(ctx, Process, processes, pool, inqueue, File "C:Users<user>DocumentsProjectssprinter.venvlibsite-packagesmultiprocesspool.py", line 337, in _maintain_pool Pool._repopulate_pool_static(ctx, Process, processes, pool, File "C:Users<user>DocumentsProjectssprinter.venvlibsite-packagesmultiprocesspool.py", line 319, in _repopulate_pool_static w = Process(ctx, target=worker, File "C:Users<user>DocumentsProjectssprinter.venvlibsite-packagesmultiprocesspool.py", line 181, in Process return ctx.Process(*args, **kwds) TypeError: 'Process' object is not callable
And this brings to my mind two questions:
- Why there is no logging? If I don’t use pool, logs appear correctly.
- Why after four processes being executed, the new ones that should be respawned have problem to be created? (Not callable error). If I remove the
maxtasksperchild
argument it works perfectly (0, 1, 4, 9, 16, 25…)
Advertisement
Answer
This gives me such output
The error here is because you are replacing ctx.Process (a class) with an instance of your own subclass. Instances, unless they have __call__
method defined, are not callable. But even if you were to replace it with your subclass, it wouldn’t work. This is because you will get a recursion or attribute error since you are replacing a class with a subclass of that same class.
Why there is no logging? If I don’t use pool, logs appear correctly.
This is because you never really successfully patched the pool class to use your subclass of Process, this also ties into your second question (read on).
Why after four processes being executed, the new ones that should be respawned have problem to be created? (Not callable error). If I remove the maxtasksperchild argument it works perfectly (0, 1, 4, 9, 16, 25…)
The reason this happens is because pool creates the processes when you start the context manager itself (on line ctx.Pool(processes=4, maxtasksperchild=1) as pool
). Since you are applying your patch after the processes start, it won’t have much of an effect unless the pool was to start the processes again (this is where maxtasksperchild
comes in). Hence if you provide maxtasksperchild
then the pool will attempt to start another process, but because of the faulty patch, it will return error. If you don’t set a maxtasksperchild
then the pool won’t care about the patch you applied since it doesn’t have to start a process again.
Regardless, here’s a better patch to do what you want
from multiprocess.pool import Pool from functools import partial import multiprocess import time class Process(multiprocess.Process): def __init__(self, *args, test_name='', **kwargs) -> None: multiprocess.Process.__init__(self, *args, **kwargs) self._parent_conn, self._child_conn = multiprocess.Pipe() self._exception = None self._test_name = test_name def run(self) -> None: # Have your own implementation here pass def _Process(ctx, *args, **kwds): return ctx.MyProcess(*args, **kwds) def worker(x): print(x ** 2) time.sleep(1) if __name__ == "__main__": ctx = multiprocess.get_context() # Some patching, we add our subclass as an attribute to the context ctx.MyProcess = Process # Fix test_name to be passed as a kwarg whenever the pool starts a process. Pretty lazy but gets the job done. test_name = 'test_name' Pool.Process = partial(_Process, test_name=test_name) with ctx.Pool(processes=4, maxtasksperchild=1) as pool: nums = range(10) pool.map(worker, nums)
Note how test_name
is now a keyword argument and also optional. This is so to make it work with functools.partial
. You probably want to perform checks so that the value is passed and is valid.