Skip to content
Advertisement

Graceful cleanup for multiprocess pool python

I am running a multiprocessing pool, mapped over a number of inputs. My worker processes have an initialization step that spins up a connection to selenium and a database. When the pool finishes its job, what is the graceful way to close these connections rather than just relying on python’s memory management and del definitions?

EDIT:

class WebDriver():
  def close():
    //close logic

  def __del__():
    self.driver.close()

def init():
  global DRIVER
  DRIVER=WebDriver()

def shutdown():
  DRIVER.close()

if __name__=='__main__':
  with multiprocessing.Pool(initializer=init) as pool:
    pool.map(some_function, some_args)

Because some_args is large, I only want to call shutdown when the worker processes have no other jobs to do. I don’t want to close / reopen connections to my database until everything is done.

As of right now, I would expect the memory manager to call __del__ if the worker process shutsdown, but I don’t know if it does occur. I’ve gotten strange scenarios where it hasn’t been called. I’m hoping to better understand how to manage shutdown.

Advertisement

Answer

I think you have a good chance of closing your drivers if you first wait for your pool processes to terminate and then force a garbage collection:

if __name__=='__main__':
    with multiprocessing.Pool(initializer=init) as pool:
        try:
            pool.map(some_function, some_args)
        finally:
            # Wait for all tasks to complete and all processes to terminate:
            pool.close()
            pool.join()
            # Processes should be done now:
            import gc
            gc.collect() # ensure garbage collection

Solution With User-created Pool

import multiprocessing


class WebDriver():

    def close(self):
        ...
        print('driver is now closed')

    def do_something(self, i):
        import time
        time.sleep(.1)
        print(i, flush=True)

    def __enter__(self):
        self.driver = [] # this would be an actual driver
        return self

    def __exit__(self, exc_type, exc_val, exc_tb):
        self.close()



def some_function(i):
    # Do something with DRIVER:
    ...
    DRIVER.do_something(i)

def worker(in_q):
    global DRIVER

    with WebDriver() as DRIVER:
        # Iterate until we get special None record and then cleanup:
        for i in iter(in_q.get, None):
            try:
                some_function(i)
            except BaseException as e:
                pass

if __name__=='__main__':
    POOL_SIZE = multiprocessing.cpu_count()
    # Create pool:
    # Assumption is that we don't need an output queue for output
    in_q = multiprocessing.Queue()
    processes = [multiprocessing.Process(target=worker, args=(in_q,))
                 for _ in range(POOL_SIZE)
                 ]
    for p in processes:
        p.start()
    # Write arguments to input_queue:
    some_args = range(16)
    for arg in some_args:
        in_q.put(arg)
    # Now write POOL_SIZE "quit" messages:
    for _ in range(POOL_SIZE):
        in_q.put(None)
    # Wait for processes to terminate:
    for p in processes:
        p.join()

Prints:

0
1
2
3
4
5
6
7
8
driver is now closed
9
driver is now closed
10
driver is now closed
11
driver is now closed
12
driver is now closed
14
13
driver is now closed
driver is now closed
15
driver is now closed
User contributions licensed under: CC BY-SA
5 People found this is helpful
Advertisement