Skip to content
Advertisement

Keep indefinite connections open on TCP server with asyncio streams?

I’m trying to understand how to use asyncio streams for multiple connections that will keep sending messages until a predefined condition or a socket timeout. Looking at Python docs, they provide the following example for a TCP server based on asyncio streams:

import asyncio

async def handle_echo(reader, writer):
    data = await reader.read(100)
    message = data.decode()
    addr = writer.get_extra_info('peername')

    print(f"Received {message!r} from {addr!r}")

    print(f"Send: {message!r}")
    writer.write(data)
    await writer.drain()

    print("Close the connection")
    writer.close()

async def main():
    server = await asyncio.start_server(
        handle_echo, '127.0.0.1', 8888)

    addrs = ', '.join(str(sock.getsockname()) for sock in server.sockets)
    print(f'Serving on {addrs}')

    async with server:
        await server.serve_forever()

asyncio.run(main())

What I’m trying to do is more complex and it looks more like so (a lot of it is pseudocode, written in capital letters or with implementation omitted):

import asyncio

async def io_control(queue):
   while true:
      ...
# do I/O control in this function ... 

async def data_processing(queue):
   while true:
      ...
# perform data handling

async def handle_data(reader, writer):
    data = await reader.read()
    message = data.decode()
    addr = writer.get_extra_info('peername')

    print(f"Received {message!r} from {addr!r}")
    
    #do stuff with a queue - pass messages to other two async functions as needed        

    #keep open until something happens
    if(ERROR or SOCKET_TIMEOUT):
       writer.close()

async def server(queue):
    server = await asyncio.start_server(
        handle_data, '127.0.0.1', 8888)

    addrs = ', '.join(str(sock.getsockname()) for sock in server.sockets)
    print(f'Serving on {addrs}')

    async with server:
        await server.serve_forever()

async def main():

    queue_io = asyncio.Queue()
    queue_data = asyncio.Queue()
    
    asyncio.run(server(queue_data))
    asyncio.run(data_handling(queue_data))
    asyncio.run(io_control(queue_io))

asyncio.run(main())

Does this look feasible? I’m not used to working with co-routines (I’m coming from more of a multi-threading paradigm), so I’m not sure if what I’m doing is right or if I have to explicitly include yields or do any extra stuff.

Advertisement

Answer

If I understand correctly, you just need the TCP server to be able to handle multiple concurrent connections. The start_server function should already give you everything you need.

The first parameter client_connected_cb is a coroutine function called whenever a client establishes a connection. If you introduce a loop into that function (in your example code handle_data), you can keep the connection open until some criterion is met. What conditions exactly should lead to closing the connection is up to you, and the implementation details will obviously depend on that. The simplest approach I can imagine is something like this:

import asyncio
import logging

log = logging.getLogger(__name__)

async def handle_data(reader, writer):
    while True:
        data = (await reader.readline()).decode().strip()
        if not data:
            log.debug("client disconnected")
            break
        response = await your_data_processing_function(data)
        writer.write(response.encode())
        await writer.drain()

...

async def main():
    server = await asyncio.start_server(handle_data, '127.0.0.1', 8888)
    async with server:
        await server.serve_forever()

if __name__ == '__main__':
    asyncio.run(main())

There is theoretically no limit for the number of concurrent connections.

If your client_connected_cb is a coroutine function, each new connection will schedule a new task for the event loop. That is where the concurrency comes from. The magic then happens at the point of awaiting new data from the client; that is where the event loop can switch execution to another coroutine. All this happens behind the scenes, so to speak.

If you want to introduce a timeout, you could wrap the awaitable readline coroutine in a wait_for for example and then catch the TimeoutError exiting the loop.

Hope this helps.

Advertisement