Skip to content
Advertisement

How to detect closed websocket in asyncio.gather(*tasks)

I have a list of asyncio tasks which contains with connecting,handshaking and receiving data from a websocket. This process is running correctly but sometimes the connection of one of the websocket (or maybe all of them) is closed. How can I detect and make a new conncetion to the closed one?

Here is the code which I use:

async def main(_id):
    try:
        async with websockets.connect("wss://ws.bitpin.ir/", extra_headers = request_header, timeout=10, ping_interval=None) as websocket:
            await websocket.send('{"method":"sub_to_price_info"}')
            recv_msg = await websocket.recv()
            if recv_msg == '{"message": "sub to price info"}':
                await websocket.send(json.dumps({"method":"sub_to_market","id":_id}))
                recv_msg = await websocket.recv()
                print(recv_msg)
                counter = 1 

                task = asyncio.create_task(ping_func(websocket))
                while True:
                    msg = await websocket.recv()
                    return_func(msg, counter, asyncio.current_task().get_name())  ## Return recieved message
                    counter+=1
    except Exception as e:
        err_log(name='Error in main function', text=str(e))


async def ping_func(websocket):
    try:
        while True:
            await websocket.send('{"message":"PING"}')
            # print('------ ping')
            await asyncio.sleep(5)
    except Exception as e:
        err_log(name='Error in ping function', text=str(e))


def return_func(msg, counter, task_name):
    if msg != '{"message": "PONG"}' and len(json.loads(msg)) <20:
        print(task_name, counter, msg[:100])
    else:
        print(task_name, counter)
 

async def handler():
    try:
        tasks = []
        for _id in symbols_id_dict.values():
            tasks.append(asyncio.create_task(main(_id), name='task{}'.format(_id)))
        responses = await asyncio.gather(*tasks)
    except Exception as e:
        err_log(name='Error in handler function', text=str(e))


try:
    if __name__ == '__main__':
        asyncio.run(handler())
    else:
        os._exit(0)
except Exception as e:
    err_log(name='Error in running asyncio handler', text=str(e))
finally:
    os._exit(0)

According to the below line, each task is specified with a name:

tasks.append(asyncio.create_task(main(_id), name='task{}'.format(_id)))

So each task can be detected. How can I use this feature to detect closed websocket.

Advertisement

Answer

try:
            data = await ws.recv()
except (ConnectionClosed):
            print("Connection is Closed")
            data = None
            print('Reconnecting')
            websocket = await websockets.connect(params)
User contributions licensed under: CC BY-SA
5 People found this is helpful
Advertisement