I am running a multiprocessing pool, mapped over a number of inputs. My worker processes have an initialization step that spins up a connection to selenium and a database. When the pool finishes its job, what is the graceful way to close these connections rather than just relying on python’s memory management and del definitions?
EDIT:
class WebDriver(): def close(): //close logic def __del__(): self.driver.close() def init(): global DRIVER DRIVER=WebDriver() def shutdown(): DRIVER.close() if __name__=='__main__': with multiprocessing.Pool(initializer=init) as pool: pool.map(some_function, some_args)
Because some_args is large, I only want to call shutdown when the worker processes have no other jobs to do. I don’t want to close / reopen connections to my database until everything is done.
As of right now, I would expect the memory manager to call __del__
if the worker process shutsdown, but I don’t know if it does occur. I’ve gotten strange scenarios where it hasn’t been called. I’m hoping to better understand how to manage shutdown.
Advertisement
Answer
I think you have a good chance of closing your drivers if you first wait for your pool processes to terminate and then force a garbage collection:
if __name__=='__main__': with multiprocessing.Pool(initializer=init) as pool: try: pool.map(some_function, some_args) finally: # Wait for all tasks to complete and all processes to terminate: pool.close() pool.join() # Processes should be done now: import gc gc.collect() # ensure garbage collection
Solution With User-created Pool
import multiprocessing class WebDriver(): def close(self): ... print('driver is now closed') def do_something(self, i): import time time.sleep(.1) print(i, flush=True) def __enter__(self): self.driver = [] # this would be an actual driver return self def __exit__(self, exc_type, exc_val, exc_tb): self.close() def some_function(i): # Do something with DRIVER: ... DRIVER.do_something(i) def worker(in_q): global DRIVER with WebDriver() as DRIVER: # Iterate until we get special None record and then cleanup: for i in iter(in_q.get, None): try: some_function(i) except BaseException as e: pass if __name__=='__main__': POOL_SIZE = multiprocessing.cpu_count() # Create pool: # Assumption is that we don't need an output queue for output in_q = multiprocessing.Queue() processes = [multiprocessing.Process(target=worker, args=(in_q,)) for _ in range(POOL_SIZE) ] for p in processes: p.start() # Write arguments to input_queue: some_args = range(16) for arg in some_args: in_q.put(arg) # Now write POOL_SIZE "quit" messages: for _ in range(POOL_SIZE): in_q.put(None) # Wait for processes to terminate: for p in processes: p.join()
Prints:
0 1 2 3 4 5 6 7 8 driver is now closed 9 driver is now closed 10 driver is now closed 11 driver is now closed 12 driver is now closed 14 13 driver is now closed driver is now closed 15 driver is now closed