Skip to content
Advertisement

Send / receive in parallel using websockets in Python FastAPI

I will try to explain what I am doing with an example, say I am building a weather client. The browser sends a message over websocket, eg:

{
  "city": "Chicago",
  "country": "US"
}

The server queries the weather every 5 minutes and updates the browser back with the latest data.

Now the browser could send another message, eg:

{
  "city": "Bangalore",
  "country": "IN"
}

Now I the server should STOP updating the weather details of Chicago and start updating the details about Bangalore, i.e. simultaneously send / receive messages over websocket. How should I go about implementing this?

Currently I have this but this only updates the browser on receiving an event:

@app.websocket("/ws")
async def read_webscoket(websocket: WebSocket):
    await websocket.accept()
    weather_client = WeatherClient(client)
    while True:
        data = await websocket.receive_json()
        weather = await weather_client.weather(data)
        await websocket.send_json(weather.dict())

If I move websocket.receive_json() outside the loop, I won’t be able to continuously listen to the message from browser. I guess I need to spin up two asyncio tasks but I am not quite able to nail down the implementation since I am new to asynchronous way of programming.

Advertisement

Answer

The simplest way to do this is like you mentioned moving the reading outside of the loop in a separate task. In this paradigm you’ll need to update a local variable with the latest data, making your code look something like this:

@app.websocket("/ws")
async def read_webscoket(websocket: WebSocket):
    await websocket.accept()
    json_data = await websocket.receive_json()

    async def read_from_socket(websocket: WebSocket):
        nonlocal json_data
        async for data in websocket.iter_json():
            json_data = data

    asyncio.create_task(read_from_socket(websocket))
    while True:
        print(f"getting weather data for {json_data}")
        await asyncio.sleep(1)  # simulate a slow call to the weather service

Note I’ve used the iter_json asynchronous generator, which amounts to an infinite loop over receive_json.

This will work but may have a bug depending on your requirements. Imagine that the weather service takes 10 seconds to complete and in that time the user sends three requests for different cities over the socket. In the code above you’ll only get the latest city the user sent. That might be fine for your application, but if you need to keep track of all that the user sent you’ll need to use a queue. In this paradigm you’ll have one task reading data and putting it on the queue and one task getting data from the queue and querying the weather service. You’ll then run these concurrently with gather.

@app.websocket("/wsqueue")
async def read_webscoket(websocket: WebSocket):
    await websocket.accept()
    queue = asyncio.queues.Queue()

    async def read_from_socket(websocket: WebSocket):
        async for data in websocket.iter_json():
            print(f"putting {data} in the queue")
            queue.put_nowait(data)

    async def get_data_and_send():
        data = await queue.get()
        while True:
            if queue.empty():
                print(f"getting weather data for {data}")
                await asyncio.sleep(1)
            else:
                data = queue.get_nowait()
                print(f"Setting data to {data}")

    await asyncio.gather(read_from_socket(websocket), get_data_and_send())

In this way, you won’t lose data the user sends. In the example above, I only get weather data for the latest the user requests, but you still have access to all data sent.

EDIT: To answer your question in the comments, a queue approach is probably best to cancel tasks when new requests come in. Basically move the long-running task you want to be able to cancel into its own coroutine function (in this example read_and_send_to_client) and run it as a task. When new data comes in, if that task is not finished, cancel it and then create a new one.

async def read_and_send_to_client(data):
    print(f'reading {data} from client')
    await asyncio.sleep(10) # simulate a slow call
    print(f'finished reading {data}, sending to websocket client')


@app.websocket("/wsqueue")
async def read_webscoket(websocket: WebSocket):
    await websocket.accept()
    queue = asyncio.queues.Queue()

    async def read_from_socket(websocket: WebSocket):
        async for data in websocket.iter_json():
            print(f"putting {data} in the queue")
            queue.put_nowait(data)

    async def get_data_and_send():
        data = await queue.get()
        fetch_task = asyncio.create_task(read_and_send_to_client(data))
        while True:
            data = await queue.get()
            if not fetch_task.done():
                print(f'Got new data while task not complete, canceling.')
                fetch_task.cancel()
            fetch_task = asyncio.create_task(read_and_send_to_client(data))

    await asyncio.gather(read_from_socket(websocket), get_data_and_send())
User contributions licensed under: CC BY-SA
4 People found this is helpful
Advertisement