I have a list of asyncio tasks which contains with connecting,handshaking and receiving data from a websocket. This process is running correctly but sometimes the connection of one of the websocket (or maybe all of them) is closed. How can I detect and make a new conncetion to the closed one?
Here is the code which I use:
JavaScript
x
57
57
1
async def main(_id):
2
try:
3
async with websockets.connect("wss://ws.bitpin.ir/", extra_headers = request_header, timeout=10, ping_interval=None) as websocket:
4
await websocket.send('{"method":"sub_to_price_info"}')
5
recv_msg = await websocket.recv()
6
if recv_msg == '{"message": "sub to price info"}':
7
await websocket.send(json.dumps({"method":"sub_to_market","id":_id}))
8
recv_msg = await websocket.recv()
9
print(recv_msg)
10
counter = 1
11
12
task = asyncio.create_task(ping_func(websocket))
13
while True:
14
msg = await websocket.recv()
15
return_func(msg, counter, asyncio.current_task().get_name()) ## Return recieved message
16
counter+=1
17
except Exception as e:
18
err_log(name='Error in main function', text=str(e))
19
20
21
async def ping_func(websocket):
22
try:
23
while True:
24
await websocket.send('{"message":"PING"}')
25
# print('------ ping')
26
await asyncio.sleep(5)
27
except Exception as e:
28
err_log(name='Error in ping function', text=str(e))
29
30
31
def return_func(msg, counter, task_name):
32
if msg != '{"message": "PONG"}' and len(json.loads(msg)) <20:
33
print(task_name, counter, msg[:100])
34
else:
35
print(task_name, counter)
36
37
38
async def handler():
39
try:
40
tasks = []
41
for _id in symbols_id_dict.values():
42
tasks.append(asyncio.create_task(main(_id), name='task{}'.format(_id)))
43
responses = await asyncio.gather(*tasks)
44
except Exception as e:
45
err_log(name='Error in handler function', text=str(e))
46
47
48
try:
49
if __name__ == '__main__':
50
asyncio.run(handler())
51
else:
52
os._exit(0)
53
except Exception as e:
54
err_log(name='Error in running asyncio handler', text=str(e))
55
finally:
56
os._exit(0)
57
According to the below line, each task is specified with a name:
JavaScript
1
2
1
tasks.append(asyncio.create_task(main(_id), name='task{}'.format(_id)))
2
So each task can be detected. How can I use this feature to detect closed websocket.
Advertisement
Answer
JavaScript
1
8
1
try:
2
data = await ws.recv()
3
except (ConnectionClosed):
4
print("Connection is Closed")
5
data = None
6
print('Reconnecting')
7
websocket = await websockets.connect(params)
8