I have a list of asyncio tasks which contains with connecting,handshaking and receiving data from a websocket. This process is running correctly but sometimes the connection of one of the websocket (or maybe all of them) is closed. How can I detect and make a new conncetion to the closed one?
Here is the code which I use:
async def main(_id): try: async with websockets.connect("wss://ws.bitpin.ir/", extra_headers = request_header, timeout=10, ping_interval=None) as websocket: await websocket.send('{"method":"sub_to_price_info"}') recv_msg = await websocket.recv() if recv_msg == '{"message": "sub to price info"}': await websocket.send(json.dumps({"method":"sub_to_market","id":_id})) recv_msg = await websocket.recv() print(recv_msg) counter = 1 task = asyncio.create_task(ping_func(websocket)) while True: msg = await websocket.recv() return_func(msg, counter, asyncio.current_task().get_name()) ## Return recieved message counter+=1 except Exception as e: err_log(name='Error in main function', text=str(e)) async def ping_func(websocket): try: while True: await websocket.send('{"message":"PING"}') # print('------ ping') await asyncio.sleep(5) except Exception as e: err_log(name='Error in ping function', text=str(e)) def return_func(msg, counter, task_name): if msg != '{"message": "PONG"}' and len(json.loads(msg)) <20: print(task_name, counter, msg[:100]) else: print(task_name, counter) async def handler(): try: tasks = [] for _id in symbols_id_dict.values(): tasks.append(asyncio.create_task(main(_id), name='task{}'.format(_id))) responses = await asyncio.gather(*tasks) except Exception as e: err_log(name='Error in handler function', text=str(e)) try: if __name__ == '__main__': asyncio.run(handler()) else: os._exit(0) except Exception as e: err_log(name='Error in running asyncio handler', text=str(e)) finally: os._exit(0)
According to the below line, each task is specified with a name:
tasks.append(asyncio.create_task(main(_id), name='task{}'.format(_id)))
So each task can be detected. How can I use this feature to detect closed websocket.
Advertisement
Answer
try: data = await ws.recv() except (ConnectionClosed): print("Connection is Closed") data = None print('Reconnecting') websocket = await websockets.connect(params)