I’m trying to understand how to use asyncio streams for multiple connections that will keep sending messages until a predefined condition or a socket timeout. Looking at Python docs, they provide the following example for a TCP server based on asyncio streams:
import asyncio async def handle_echo(reader, writer): data = await reader.read(100) message = data.decode() addr = writer.get_extra_info('peername') print(f"Received {message!r} from {addr!r}") print(f"Send: {message!r}") writer.write(data) await writer.drain() print("Close the connection") writer.close() async def main(): server = await asyncio.start_server( handle_echo, '127.0.0.1', 8888) addrs = ', '.join(str(sock.getsockname()) for sock in server.sockets) print(f'Serving on {addrs}') async with server: await server.serve_forever() asyncio.run(main())
What I’m trying to do is more complex and it looks more like so (a lot of it is pseudocode, written in capital letters or with implementation omitted):
import asyncio async def io_control(queue): while true: ... # do I/O control in this function ... async def data_processing(queue): while true: ... # perform data handling async def handle_data(reader, writer): data = await reader.read() message = data.decode() addr = writer.get_extra_info('peername') print(f"Received {message!r} from {addr!r}") #do stuff with a queue - pass messages to other two async functions as needed #keep open until something happens if(ERROR or SOCKET_TIMEOUT): writer.close() async def server(queue): server = await asyncio.start_server( handle_data, '127.0.0.1', 8888) addrs = ', '.join(str(sock.getsockname()) for sock in server.sockets) print(f'Serving on {addrs}') async with server: await server.serve_forever() async def main(): queue_io = asyncio.Queue() queue_data = asyncio.Queue() asyncio.run(server(queue_data)) asyncio.run(data_handling(queue_data)) asyncio.run(io_control(queue_io)) asyncio.run(main())
Does this look feasible? I’m not used to working with co-routines (I’m coming from more of a multi-threading paradigm), so I’m not sure if what I’m doing is right or if I have to explicitly include yields or do any extra stuff.
Advertisement
Answer
If I understand correctly, you just need the TCP server to be able to handle multiple concurrent connections. The start_server
function should already give you everything you need.
The first parameter client_connected_cb
is a coroutine function called whenever a client establishes a connection. If you introduce a loop into that function (in your example code handle_data
), you can keep the connection open until some criterion is met. What conditions exactly should lead to closing the connection is up to you, and the implementation details will obviously depend on that. The simplest approach I can imagine is something like this:
import asyncio import logging log = logging.getLogger(__name__) async def handle_data(reader, writer): while True: data = (await reader.readline()).decode().strip() if not data: log.debug("client disconnected") break response = await your_data_processing_function(data) writer.write(response.encode()) await writer.drain() ... async def main(): server = await asyncio.start_server(handle_data, '127.0.0.1', 8888) async with server: await server.serve_forever() if __name__ == '__main__': asyncio.run(main())
There is theoretically no limit for the number of concurrent connections.
If your client_connected_cb
is a coroutine function, each new connection will schedule a new task for the event loop. That is where the concurrency comes from. The magic then happens at the point of await
ing new data from the client; that is where the event loop can switch execution to another coroutine. All this happens behind the scenes, so to speak.
If you want to introduce a timeout, you could wrap the awaitable readline
coroutine in a wait_for
for example and then catch the TimeoutError
exiting the loop.
Hope this helps.