I am running a multiprocessing pool, mapped over a number of inputs. My worker processes have an initialization step that spins up a connection to selenium and a database. When the pool finishes its job, what is the graceful way to close these connections rather than just relying on python’s memory management and del definitions?
EDIT:
class WebDriver():
def close():
//close logic
def __del__():
self.driver.close()
def init():
global DRIVER
DRIVER=WebDriver()
def shutdown():
DRIVER.close()
if __name__=='__main__':
with multiprocessing.Pool(initializer=init) as pool:
pool.map(some_function, some_args)
Because some_args is large, I only want to call shutdown when the worker processes have no other jobs to do. I don’t want to close / reopen connections to my database until everything is done.
As of right now, I would expect the memory manager to call __del__
if the worker process shutsdown, but I don’t know if it does occur. I’ve gotten strange scenarios where it hasn’t been called. I’m hoping to better understand how to manage shutdown.
Advertisement
Answer
I think you have a good chance of closing your drivers if you first wait for your pool processes to terminate and then force a garbage collection:
if __name__=='__main__':
with multiprocessing.Pool(initializer=init) as pool:
try:
pool.map(some_function, some_args)
finally:
# Wait for all tasks to complete and all processes to terminate:
pool.close()
pool.join()
# Processes should be done now:
import gc
gc.collect() # ensure garbage collection
Solution With User-created Pool
import multiprocessing
class WebDriver():
def close(self):
print('driver is now closed')
def do_something(self, i):
import time
time.sleep(.1)
print(i, flush=True)
def __enter__(self):
self.driver = [] # this would be an actual driver
return self
def __exit__(self, exc_type, exc_val, exc_tb):
self.close()
def some_function(i):
# Do something with DRIVER:
DRIVER.do_something(i)
def worker(in_q):
global DRIVER
with WebDriver() as DRIVER:
# Iterate until we get special None record and then cleanup:
for i in iter(in_q.get, None):
try:
some_function(i)
except BaseException as e:
pass
if __name__=='__main__':
POOL_SIZE = multiprocessing.cpu_count()
# Create pool:
# Assumption is that we don't need an output queue for output
in_q = multiprocessing.Queue()
processes = [multiprocessing.Process(target=worker, args=(in_q,))
for _ in range(POOL_SIZE)
]
for p in processes:
p.start()
# Write arguments to input_queue:
some_args = range(16)
for arg in some_args:
in_q.put(arg)
# Now write POOL_SIZE "quit" messages:
for _ in range(POOL_SIZE):
in_q.put(None)
# Wait for processes to terminate:
for p in processes:
p.join()
Prints:
0
1
2
3
4
5
6
7
8
driver is now closed
9
driver is now closed
10
driver is now closed
11
driver is now closed
12
driver is now closed
14
13
driver is now closed
driver is now closed
15
driver is now closed