I want to execute tasks asynchronously and concurrently. If task1
is running when task2
arrives, task2
is started right away, without waiting for task2
to complete. Also, I would like to avoid callbacks with the help of coroutines.
Here’s a concurrent solution with callbacks:
def fibonacci(n): if n <= 1: return 1 return fibonacci(n - 1) + fibonacci(n - 2) class FibonacciCalculatorFuture: def __init__(self): self.pool = ThreadPoolExecutor(max_workers=2) @staticmethod def calculate(n): print(f"started n={n}") return fibonacci(n) def run(self, n): future = self.pool.submit(self.calculate, n) future.add_done_callback(lambda f: print(f.result())) if __name__ == '__main__': calculator = FibonacciCalculatorFuture() calculator.run(35) calculator.run(32) print("initial thread can continue its work")
Its output:
started n=35 started n=32 initial thread can continue its work 3524578 14930352
And here’s my effort to get rid of callbacks:
class FibonacciCalculatorAsync: def __init__(self): self.pool = ThreadPoolExecutor(max_workers=2) self.loop = asyncio.get_event_loop() @staticmethod def calculate_sync(n): print(f"started n={n}") return fibonacci(n) async def calculate(self, n): result = await self.loop.run_in_executor(self.pool, self.calculate_sync, n) print(result) def run(self, n): asyncio.ensure_future(self.calculate(n)) if __name__ == '__main__': calculator = FibonacciCalculatorAsync() calculator.run(35) calculator.run(32) calculator.loop.run_forever() print("initial thread can continue its work")
Output:
started n=35 started n=32 3524578 14930352
In this case initial thread won’t be able to go further than loop.run_forever()
and hence won’t be able to accept new tasks.
So, here’s my question: is there a way to simultaneously:
- execute tasks concurrently;
- be able to accept new tasks and schedule them for execution right away (along with already running taks);
- use coroutines and code without callbacks.
Advertisement
Answer
The second bullet from your question can be met by running asyncio in a dedicated thread and using asyncio.run_coroutine_threadsafe
to schedule coroutines. For example:
class FibonacciCalculatorAsync: def __init__(self): self.pool = ThreadPoolExecutor(max_workers=2) self.loop = asyncio.get_event_loop() @staticmethod def calculate_sync(n): print(f"started n={n}") return fibonacci(n) async def calculate(self, n): result = await self.loop.run_in_executor(self.pool, self.calculate_sync, n) print(result) def run(self, n): asyncio.run_coroutine_threadsafe(self.calculate(n), self.loop) def start_loop(self): thr = threading.Thread(target=self.loop.run_forever) thr.daemon = True thr.start() if __name__ == '__main__': calculator = FibonacciCalculatorAsync() calculator.start_loop() calculator.run(35) calculator.run(32) print("initial thread can continue its work") calculator.run(10) time.sleep(1)