I followed the tutorial in https://docs.python.org/3/library/asyncio-stream.html to read data from a sensor (TCP protocol, PC as the server):
import datetime
import asyncio
import socket
async def connect_to_sq():
_ip = "0.0.0.0"
_port = 45454
# Create a TCP server (socket type: SOCK_STREAM)
server = await asyncio.start_server(handle_server, _ip, _port,family=socket.AF_INET, reuse_address=True)
async with server:
await server.serve_forever()
async def handle_server(reader, writer):
# read data
print("start reading")
init_time = datetime.datetime.now()
end_time = init_time + datetime.timedelta(seconds=1)
while datetime.datetime.now() < end_time:
raw_data_stream = await reader.readexactly(200)
print("time:", datetime.datetime.now()-init_time)
# close connection
writer.close()
await writer.wait_closed()
print("Connection closed")
if __name__ == "__main__":
asyncio.run(connect_to_sq())
The program is supposed to finish after 1 second data transmission. However, the output is:
start reading time: 0:00:00.495863 time: 0:00:00.594812 time: 0:00:00.695760 time: 0:00:00.794883 time: 0:00:00.895336 time: 0:00:00.995024 time: 0:00:01.095308 Connection closed start reading time: 0:00:00.647908 time: 0:00:00.750355 time: 0:00:00.848436 ......
It repeated automatically and infinitely. What is the reason for this and How could I solve it?
Advertisement
Answer
Problem
serve_forever() listens to the port indefinitely. So, even though the individual connections close after 1 second, the server keeps accepting new connections. In this case, your sensor (client) seems to be creating a new connection after an old connection is closed, and since the server still accepts them, handle_server runs again from the top.
Solution
Maybe not the best way™, but one possible solution is to use a Future so that handle_server can signal the main code upon connection completion. The main code can then stop the server, avoiding new connections. This is how it can be done:
import datetime
import asyncio
import socket
from functools import partial
async def connect_to_sq():
_ip = "0.0.0.0"
_port = 45454
# Create a TCP server (socket type: SOCK_STREAM)
first_connection_completion = asyncio.Future()
server = await asyncio.start_server(
partial(handle_server, first_connection_completion=first_connection_completion),
_ip,
_port,family=socket.AF_INET,
reuse_address=True)
async with server:
server_task = asyncio.create_task(server.serve_forever())
await first_connection_completion
server_task.cancel()
async def handle_server(reader, writer, first_connection_completion=None):
# read data
print("start reading")
init_time = datetime.datetime.now()
end_time = init_time + datetime.timedelta(seconds=1)
while datetime.datetime.now() < end_time:
raw_data_stream = await reader.readexactly(200)
print("time:", datetime.datetime.now()-init_time)
# close connection
writer.close()
await writer.wait_closed()
print("Connection closed")
first_connection_completion.set_result(None)
if __name__ == "__main__":
asyncio.run(connect_to_sq())
Couple of notes:
handle_servernow takes one extra argument,first_connection_completionwhich is the future used to send signal from the function to the main code. The argument has been binded to the function usingfunctools.partial. Within the function,set_resulthas been used to mark the future as completed.serve_forever()is now wrapped by acreate_taskcall. This is because we can’tawaiton it.
The new code looks messy, but you can always refactor. So a cleaner version would be:
import datetime
import asyncio
import socket
async def listen_once(handler, *server_args, **server_kwargs):
first_connection_completion = asyncio.Future()
async def wrapped_handler(*args):
await handler(*args)
first_connection_completion.set_result(None)
server = await asyncio.start_server(wrapped_handler, *server_args, **server_kwargs)
async with server:
server_task = asyncio.create_task(server.serve_forever())
await first_connection_completion
server_task.cancel()
async def connect_to_sq():
_ip = "0.0.0.0"
_port = 45454
# Create a TCP server (socket type: SOCK_STREAM)
await listen_once(handle_server, _ip, _port, family=socket.AF_INET, reuse_address=True)
async def handle_server(reader, writer):
# read data
print("start reading")
init_time = datetime.datetime.now()
end_time = init_time + datetime.timedelta(seconds=1)
while datetime.datetime.now() < end_time:
raw_data_stream = await reader.readexactly(200)
print("time:", datetime.datetime.now()-init_time)
# close connection
writer.close()
await writer.wait_closed()
print("Connection closed")
if __name__ == "__main__":
asyncio.run(connect_to_sq())