I want to execute tasks asynchronously and concurrently. If task1
is running when task2
arrives, task2
is started right away, without waiting for task2
to complete. Also, I would like to avoid callbacks with the help of coroutines.
Here’s a concurrent solution with callbacks:
JavaScript
x
27
27
1
def fibonacci(n):
2
if n <= 1:
3
return 1
4
return fibonacci(n - 1) + fibonacci(n - 2)
5
6
7
class FibonacciCalculatorFuture:
8
9
def __init__(self):
10
self.pool = ThreadPoolExecutor(max_workers=2)
11
12
@staticmethod
13
def calculate(n):
14
print(f"started n={n}")
15
return fibonacci(n)
16
17
def run(self, n):
18
future = self.pool.submit(self.calculate, n)
19
future.add_done_callback(lambda f: print(f.result()))
20
21
22
if __name__ == '__main__':
23
calculator = FibonacciCalculatorFuture()
24
calculator.run(35)
25
calculator.run(32)
26
print("initial thread can continue its work")
27
Its output:
JavaScript
1
6
1
started n=35
2
started n=32
3
initial thread can continue its work
4
3524578
5
14930352
6
And here’s my effort to get rid of callbacks:
JavaScript
1
26
26
1
class FibonacciCalculatorAsync:
2
3
def __init__(self):
4
self.pool = ThreadPoolExecutor(max_workers=2)
5
self.loop = asyncio.get_event_loop()
6
7
@staticmethod
8
def calculate_sync(n):
9
print(f"started n={n}")
10
return fibonacci(n)
11
12
async def calculate(self, n):
13
result = await self.loop.run_in_executor(self.pool, self.calculate_sync, n)
14
print(result)
15
16
def run(self, n):
17
asyncio.ensure_future(self.calculate(n))
18
19
20
if __name__ == '__main__':
21
calculator = FibonacciCalculatorAsync()
22
calculator.run(35)
23
calculator.run(32)
24
calculator.loop.run_forever()
25
print("initial thread can continue its work")
26
Output:
JavaScript
1
5
1
started n=35
2
started n=32
3
3524578
4
14930352
5
In this case initial thread won’t be able to go further than loop.run_forever()
and hence won’t be able to accept new tasks.
So, here’s my question: is there a way to simultaneously:
- execute tasks concurrently;
- be able to accept new tasks and schedule them for execution right away (along with already running taks);
- use coroutines and code without callbacks.
Advertisement
Answer
The second bullet from your question can be met by running asyncio in a dedicated thread and using asyncio.run_coroutine_threadsafe
to schedule coroutines. For example:
JavaScript
1
32
32
1
class FibonacciCalculatorAsync:
2
def __init__(self):
3
self.pool = ThreadPoolExecutor(max_workers=2)
4
self.loop = asyncio.get_event_loop()
5
6
@staticmethod
7
def calculate_sync(n):
8
print(f"started n={n}")
9
return fibonacci(n)
10
11
async def calculate(self, n):
12
result = await self.loop.run_in_executor(self.pool, self.calculate_sync, n)
13
print(result)
14
15
def run(self, n):
16
asyncio.run_coroutine_threadsafe(self.calculate(n), self.loop)
17
18
def start_loop(self):
19
thr = threading.Thread(target=self.loop.run_forever)
20
thr.daemon = True
21
thr.start()
22
23
24
if __name__ == '__main__':
25
calculator = FibonacciCalculatorAsync()
26
calculator.start_loop()
27
calculator.run(35)
28
calculator.run(32)
29
print("initial thread can continue its work")
30
calculator.run(10)
31
time.sleep(1)
32