Skip to content
Advertisement

Executing an awaitable / async function in Python RQ

My jobs are all a series of requests that need to be made per object. Ie, its a User with several data points (potentially hundreds) that need to be added to that user with requests. I had originally written those requests to run synchronously but it was blocking and slow. I was sending each User job to Python RQ and have 10 workers going through the Users sent down the queue. 1 worker, 1 user, blocking requests.

I’ve re-written my User job to use aiohttp instead of python requests, and its significantly faster. On the Python RQ documentation it says that ‘Any Python function call can be put on an RQ queue.’ but I can’t figure out how to send my async function down the queue?

async def get_prices(calls: List[dict]) -> List[dict]:
     async with aiohttp.ClientSession() as session:
         for price in prices.items():
                price_type, date = price
                price = await pg.get_price(
                    session=session, lookup_date=date
                )
        do_some_other_stuff()
        await session.close()

from core.extensions import test_queue
from prices import get_prices
job = test_queue.enqueue(get_prices, kwargs={"username":'username'})

The problem is that get_prices is never awaited, it just remains a coroutine futures…. How can I await my function on the queue?

Advertisement

Answer

Since python-rq won’t support asyncio directly, you can use a synchronous function that calls asyncio.run instead.

async def _get_prices(calls: List[dict]) -> List[dict]:
    # ...

def get_prices(*args, **kwargs):
    asyncio.run(_get_prices(*args, **kwargs))

Note, however, that asyncio.run only works if there’s no other running event loop. If you expect an asyncio loop to already be running, use loop.create_task instead.

def get_prices(*args, **kwargs):
    loop = asyncio.get_event_loop()
    coro = _get_prices(*args, **kwargs)
    loop.create_task(coro)

Then when python-rq calls get_prices it will cause the async function to be executed.

Another option would be to not use asyncio for making requests, like using grequests, threads, or something like that which will work with synchronous functions.

User contributions licensed under: CC BY-SA
7 People found this is helpful
Advertisement