Skip to content
Advertisement

Use the same websocket connection in multiple asynchronous loops (Python)

I am running two loops asynchronously, and want both to have access to the same websocket connection. One function periodic_fetch() fetches some data periodically (every 60 seconds) and sends a message to the websocket if a condition is met. The other retrieve_websocket() receives messages from the websocket and perform some action if a condition is met. As of now, I connect to the websocket in both functions, but that means retrieve_websocket() will not receive the response to the websocket message sent by periodic_fetch(). How do I create one websocket connection and use the same one in both loops as they run asynchronously? My code:

# Imports
import asyncio
import websockets
from datetime import datetime

websocket_url = "wss://localhost:5000/"

# Simulate fetching some data
async def fetch_data():
    print("Fetching started")
    await asyncio.sleep(2)
    return {"data": 2}

# Receive and analyze websocket data
async def retrieve_websocket():
    async with websockets.connect(websocket_url) as ws:
        while True:
            msg = await ws.recv()
            print(msg)
            # Perform some task if condition is met

# Periodically fetch data and send messages to websocket
async def periodic_fetch():
    async with websockets.connect(websocket_url) as ws:
        while True:
            print(datetime.now())
            fetch_task = asyncio.create_task(fetch_data())
            wait_task = asyncio.create_task(asyncio.sleep(60))

            res = await fetch_task
            # Send message to websocket
            await ws.send("Websocket message")
            # Wait the remaining wait duration
            await wait_task

loop = asyncio.get_event_loop()
cors = asyncio.wait([periodic_fetch(), retrieve_websocket()])
loop.run_until_complete(cors)

Advertisement

Answer

The solution was to open the connection in a separate function and use asyncio.gather() passing in the two functions with the websocket as parameter.

async def run_script():
    async with websockets.connect(websocket_url) as ws:
        await asyncio.gather(periodic_fetch(ws), retrieve_websocket(ws))


asyncio.run(run_script())
User contributions licensed under: CC BY-SA
1 People found this is helpful
Advertisement