I’m trying to use a separate process to stream data by concurrent futures. However, on the otherside, sometimes the other party stops the datafeed. But as long as I restart this threadable
then it would work again. So I design something like this, to be able to keep streaming data without intervention.
executor = concurrent.futures.ProcessPoolExecutor() job2 = executor.submit(threadable,list_tmp_replace) time.sleep(3600) executor_tmp = executor executor = concurrent.futures.ProcessPoolExecutor(1) job2 = executor.submit(threadable, list_tmp_replace_2) time.sleep(20). #warm up the new process executor_tmp.shutdown() #to avoid infinite number of pools in the long run, also threadable itself involves writing data to database. best to avoid duplicate tasks.
However, I got this error
File "/home/ubuntu/anaconda3/lib/python3.8/asyncio/tasks.py", line 280, in __step result = coro.send(None) File "/home/ubuntu/anaconda3/lib/python3.8/site-packages/cryptofeed/backends/postgres.py", line 61, in writer await self.write_batch(updates) File "/home/ubuntu/anaconda3/lib/python3.8/site-packages/cryptofeed/backends/postgres.py", line 75, in write_batch await self.conn.execute(f"INSERT INTO {self.table} VALUES {args_str}") File "/home/ubuntu/anaconda3/lib/python3.8/site-packages/asyncpg/connection.py", line 315, in execute return await self._protocol.query(query, timeout) File "asyncpg/protocol/protocol.pyx", line 338, in query File "/home/ubuntu/anaconda3/lib/python3.8/asyncio/futures.py", line 260, in __await__ yield self # This tells Task to wait for completion. File "/home/ubuntu/anaconda3/lib/python3.8/asyncio/tasks.py", line 349, in __wakeup future.result() File "/home/ubuntu/anaconda3/lib/python3.8/asyncio/futures.py", line 178, in result raise self._exception asyncpg.exceptions.DeadlockDetectedError: deadlock detected DETAIL: Process 2576028 waits for ShareLock on transaction 159343645; blocked by process 2545736. Process 2545736 waits for ShareLock on transaction 159343644; blocked by process 2576028. HINT: See server log for query details.
Previously, I manually close the Python program (ctrl C) and restart it from a terminal (using screen). But I want such process to be automatic, controlled by the code itself to automatically reconnects to datafeed. Is there anyway for me to force the deadlock to be shutdown within the same python program?
Advertisement
Answer
Your code seems to suggest that it is okay to have two instances of threadable
running concurrently, at least for some overlap period and that you unconditionally want to run a a new instance of threadable
after 3600 seconds has expired. That’s all I can go on and based on that my only suggestion is that you might consider switching to using the multiprocessing.pool.Pool
class as the multiprocessing pool, which has the advantage of (1) it is a different class than what you have been using as for no other reason is likely to produce a different result and (2) unlike the ProcessPoolExecutor.shutdown
method, the Pool.terminate
method will actually terminate running jobs immediately (the ProcessPoolExecutor.shutdown
method will wait for jobs that have already started, i.e. pending futures, to complete even if you had specified shutdown(wait=False)
, which you had not).
The equivalent code that utilizes multiprocessing.pool.Pool
would be:
from multiprocessing import Pool ... # Only need a pool size of 1: pool = Pool(1) job2 = pool.apply_async(threadable, args=(list_tmp_replace,)) time.sleep(3600) pool_tmp = pool pool = Pool(1) job2 = pool.apply_async(threadable, args=(list_tmp_replace_2,)) time.sleep(20) #warm up the new process pool_tmp.terminate() pool_tmp.join()
But why are you even using a pool to run a single process? Consider using multiprocessing.Process
instances:
from multiprocessing import Process ... # Only need a pool size of 1: job2 = Process(targeet=threadable, args=(list_tmp_replace,)) job2.start() time.sleep(3600) job2_tmp = job2 job2 = Process(targeet=threadable, args=(list_tmp_replace_2,)) job2.start() time.sleep(20) #warm up the new process job2_tmp.terminate()