Skip to content
Advertisement

Asyncio stream process data with pipe between two subprocesses

Hello I can’t find a solution for the example here. I found https://stackoverflow.com/a/36666420/6089311 but there is no stream reading.

I’d like to run two subprocesses independent of python program (running in the background). The first subprocess feeds the second process through the pipe and I want to do some processing with the stdout lines by stream way.

The example bellow is blocked but I don’t know why:

import asyncio
import os

async def foo():
    read, write = os.pipe()
    process_1 = await asyncio.create_subprocess_exec('ls', stdout=write)
    process_2 = await asyncio.create_subprocess_exec('wc', stdin=read, stdout=asyncio.subprocess.PIPE)
    
    async for l in process_2.stdout:
        # streaming process data
        print(l)
    
    os.close(write)
    os.close(read)

await foo() # jupyter call
# async.run(foo()) # python call

Advertisement

Answer

Code works for me if I move close() in places like in your link.

But probably it is not what you expect.

import asyncio
import os

async def foo():
    read, write = os.pipe()
    
    process_1 = await asyncio.create_subprocess_exec('ls', stdout=write)
    os.close(write)

    process_2 = await asyncio.create_subprocess_exec('wc', stdin=read, stdout=asyncio.subprocess.PIPE)
    os.close(read)
    
    async for line in process_2.stdout:
        # streaming process data
        print(line.decode())

#await foo() # jupyter call
asyncio.run(foo()) # python call

Eventually I can close read later but I have to close write before for-loop.

import asyncio
import os

async def foo():
    read, write = os.pipe()
    
    process_1 = await asyncio.create_subprocess_exec('ls', stdout=write)
    process_2 = await asyncio.create_subprocess_exec('wc', stdin=read, stdout=asyncio.subprocess.PIPE)
    
    os.close(write)

    async for line in process_2.stdout:
        # streaming process data
        print(line.decode())

    os.close(read)

#await foo() # jupyter call
asyncio.run(foo()) # python call
User contributions licensed under: CC BY-SA
7 People found this is helpful
Advertisement