Skip to content
Advertisement

How to force close ProcessPoolExecutor even when there is deadlock

I’m trying to use a separate process to stream data by concurrent futures. However, on the otherside, sometimes the other party stops the datafeed. But as long as I restart this threadable then it would work again. So I design something like this, to be able to keep streaming data without intervention.

executor = concurrent.futures.ProcessPoolExecutor()
job2 = executor.submit(threadable,list_tmp_replace)
time.sleep(3600)
executor_tmp = executor
executor = concurrent.futures.ProcessPoolExecutor(1)
job2 = executor.submit(threadable, list_tmp_replace_2)
time.sleep(20). #warm up the new process 
executor_tmp.shutdown() #to avoid infinite number of pools in the long run, also threadable itself involves writing data to database. best to avoid duplicate tasks.

However, I got this error

File "/home/ubuntu/anaconda3/lib/python3.8/asyncio/tasks.py", line 280, in __step
  result = coro.send(None)
File "/home/ubuntu/anaconda3/lib/python3.8/site-packages/cryptofeed/backends/postgres.py", line 61, in writer
  await self.write_batch(updates)
File "/home/ubuntu/anaconda3/lib/python3.8/site-packages/cryptofeed/backends/postgres.py", line 75, in write_batch
  await self.conn.execute(f"INSERT INTO {self.table} VALUES {args_str}")
File "/home/ubuntu/anaconda3/lib/python3.8/site-packages/asyncpg/connection.py", line 315, in execute
  return await self._protocol.query(query, timeout)
File "asyncpg/protocol/protocol.pyx", line 338, in query
File "/home/ubuntu/anaconda3/lib/python3.8/asyncio/futures.py", line 260, in __await__
  yield self  # This tells Task to wait for completion.
File "/home/ubuntu/anaconda3/lib/python3.8/asyncio/tasks.py", line 349, in __wakeup
  future.result()
File "/home/ubuntu/anaconda3/lib/python3.8/asyncio/futures.py", line 178, in result
  raise self._exception
asyncpg.exceptions.DeadlockDetectedError: deadlock detected
DETAIL:  Process 2576028 waits for ShareLock on transaction 159343645; blocked by process 2545736.
Process 2545736 waits for ShareLock on transaction 159343644; blocked by process 2576028.
HINT:  See server log for query details.

Previously, I manually close the Python program (ctrl C) and restart it from a terminal (using screen). But I want such process to be automatic, controlled by the code itself to automatically reconnects to datafeed. Is there anyway for me to force the deadlock to be shutdown within the same python program?

Advertisement

Answer

Your code seems to suggest that it is okay to have two instances of threadable running concurrently, at least for some overlap period and that you unconditionally want to run a a new instance of threadable after 3600 seconds has expired. That’s all I can go on and based on that my only suggestion is that you might consider switching to using the multiprocessing.pool.Pool class as the multiprocessing pool, which has the advantage of (1) it is a different class than what you have been using as for no other reason is likely to produce a different result and (2) unlike the ProcessPoolExecutor.shutdown method, the Pool.terminate method will actually terminate running jobs immediately (the ProcessPoolExecutor.shutdown method will wait for jobs that have already started, i.e. pending futures, to complete even if you had specified shutdown(wait=False), which you had not).

The equivalent code that utilizes multiprocessing.pool.Pool would be:

from multiprocessing import Pool
...

# Only need a pool size of 1:
pool = Pool(1)
job2 = pool.apply_async(threadable, args=(list_tmp_replace,))
time.sleep(3600)
pool_tmp = pool
pool = Pool(1)
job2 = pool.apply_async(threadable, args=(list_tmp_replace_2,))
time.sleep(20) #warm up the new process 
pool_tmp.terminate()
pool_tmp.join()

But why are you even using a pool to run a single process? Consider using multiprocessing.Process instances:

from multiprocessing import Process
...

# Only need a pool size of 1:
job2 = Process(targeet=threadable, args=(list_tmp_replace,))
job2.start()
time.sleep(3600)
job2_tmp = job2
job2 = Process(targeet=threadable, args=(list_tmp_replace_2,))
job2.start()
time.sleep(20) #warm up the new process 
job2_tmp.terminate()
User contributions licensed under: CC BY-SA
8 People found this is helpful
Advertisement