Skip to content
Advertisement

Non-blocking launching of concurrent coroutines in Python

I want to execute tasks asynchronously and concurrently. If task1 is running when task2 arrives, task2 is started right away, without waiting for task2 to complete. Also, I would like to avoid callbacks with the help of coroutines.

Here’s a concurrent solution with callbacks:

def fibonacci(n):
    if n <= 1:
        return 1
    return fibonacci(n - 1) + fibonacci(n - 2)


class FibonacciCalculatorFuture:

    def __init__(self):
        self.pool = ThreadPoolExecutor(max_workers=2)

    @staticmethod
    def calculate(n):
        print(f"started n={n}")
        return fibonacci(n)

    def run(self, n):
        future = self.pool.submit(self.calculate, n)
        future.add_done_callback(lambda f: print(f.result()))


if __name__ == '__main__':
    calculator = FibonacciCalculatorFuture()
    calculator.run(35)
    calculator.run(32)
    print("initial thread can continue its work")

Its output:

started n=35
started n=32
initial thread can continue its work
3524578
14930352

And here’s my effort to get rid of callbacks:

class FibonacciCalculatorAsync:

    def __init__(self):
        self.pool = ThreadPoolExecutor(max_workers=2)
        self.loop = asyncio.get_event_loop()

    @staticmethod
    def calculate_sync(n):
        print(f"started n={n}")
        return fibonacci(n)

    async def calculate(self, n):
        result = await self.loop.run_in_executor(self.pool, self.calculate_sync, n)
        print(result)

    def run(self, n):
        asyncio.ensure_future(self.calculate(n))


if __name__ == '__main__':
    calculator = FibonacciCalculatorAsync()
    calculator.run(35)
    calculator.run(32)
    calculator.loop.run_forever()
    print("initial thread can continue its work")

Output:

started n=35
started n=32
3524578
14930352

In this case initial thread won’t be able to go further than loop.run_forever() and hence won’t be able to accept new tasks.

So, here’s my question: is there a way to simultaneously:

  • execute tasks concurrently;
  • be able to accept new tasks and schedule them for execution right away (along with already running taks);
  • use coroutines and code without callbacks.

Advertisement

Answer

The second bullet from your question can be met by running asyncio in a dedicated thread and using asyncio.run_coroutine_threadsafe to schedule coroutines. For example:

class FibonacciCalculatorAsync:
    def __init__(self):
        self.pool = ThreadPoolExecutor(max_workers=2)
        self.loop = asyncio.get_event_loop()

    @staticmethod
    def calculate_sync(n):
        print(f"started n={n}")
        return fibonacci(n)

    async def calculate(self, n):
        result = await self.loop.run_in_executor(self.pool, self.calculate_sync, n)
        print(result)

    def run(self, n):
        asyncio.run_coroutine_threadsafe(self.calculate(n), self.loop)

    def start_loop(self):
        thr = threading.Thread(target=self.loop.run_forever)
        thr.daemon = True
        thr.start()


if __name__ == '__main__':
    calculator = FibonacciCalculatorAsync()
    calculator.start_loop()
    calculator.run(35)
    calculator.run(32)
    print("initial thread can continue its work")
    calculator.run(10)
    time.sleep(1)
User contributions licensed under: CC BY-SA
4 People found this is helpful
Advertisement