Skip to content
Advertisement

How to call synchronous function(s) from async functions in safe manner

What can occur if one or more workers call ‘Synchronous function’ simultaneously ? Maybe one or more workers become blocked for a while ?

async def worker(queue):
    while True:
        queue_out  = await queue.get()
        file_name = queue_out.file.name
        # Create path + file_name 
        destination_path = create_path(file_name) #<-- SYNC function

        await download_medical(queue_out,destination_path)

async def main():
    queue_in = asyncio.Queue(1)
    workers = [asyncio.create_task(worker(queue_in)) for _ in range(5)]
        
    async for result in get_result(building):
        await queue_in.put(result) 

def create_path(file_name):
    #....#
    #operations related to file and folder on the hdd
    #creates a folder based on file name

Advertisement

Answer

Short answer:

  • If you call a synchronous (blocking) function from within an async coroutine, all the tasks that are concurrently running in the loop will stall until this function returns.
  • Use loop.run_in_executor(...) to asynchronous run blocking functions in another thread or subprocess.
async def worker(queue):
    loop = Asyncio.get_event_loop()  # get a handle to the current run loop
    while True:
        queue_out  = await queue.get()
        file_name = queue_out.file.name
        
        # run blocking function in an executor
        create_path_task = loop.run_in_executor(None, create_path, file_name)
        destination_path = await create_path_task  # wait for this task to finish

        await download_medical(queue_out, destination_path)

Background: Note that async functions (coroutines) do not run tasks in parallel, they run concurrently which may appear to run simultaneously. The easiest way to think about this is by realising that every time await is called, i.e, while a result is being waited for, the event loop will pause the currently running coroutine and run another coroutine until that awaits on something and so on; hence making it cooperatively concurrent.

Awaits are usually made on IO operations as they are time consuming and are not cpu-intensive. CPU intensive operation will block the loop until it completes. Also note that regular IO operations are blocking in nature, if you want to benefit from concurrency then you must use Asyncio compatible libraries like aiofile, aiohttp etc.

More about executors: The easiest way to run regular sync functions without blocking the event loop is to use loop.run_in_executor. The first argument takes an executor like ThreadPoolExecutor or ProcessPoolExecutor from the concurrent.futures module. By passing None, Asyncio will automatically run your function in a default ThreadPoolExecutor. If your task is cpu intensive, use ProcessPoolExecutor so that it can use multiple cpu-cores and run truly in parallel.

User contributions licensed under: CC BY-SA
7 People found this is helpful
Advertisement