My jobs are all a series of requests that need to be made per object. Ie, its a User
with several data points (potentially hundreds) that need to be added to that user with requests. I had originally written those requests to run synchronously but it was blocking and slow. I was sending each User
job to Python RQ and have 10 workers going through the Users sent down the queue. 1 worker, 1 user, blocking requests.
I’ve re-written my User job to use aiohttp instead of python requests, and its significantly faster. On the Python RQ documentation it says that ‘Any Python function call can be put on an RQ queue.’ but I can’t figure out how to send my async function down the queue?
async def get_prices(calls: List[dict]) -> List[dict]: async with aiohttp.ClientSession() as session: for price in prices.items(): price_type, date = price price = await pg.get_price( session=session, lookup_date=date ) do_some_other_stuff() await session.close()
from core.extensions import test_queue from prices import get_prices job = test_queue.enqueue(get_prices, kwargs={"username":'username'})
The problem is that get_prices
is never awaited, it just remains a coroutine futures…. How can I await my function on the queue?
Advertisement
Answer
Since python-rq
won’t support asyncio directly, you can use a synchronous function that calls asyncio.run
instead.
async def _get_prices(calls: List[dict]) -> List[dict]: # ... def get_prices(*args, **kwargs): asyncio.run(_get_prices(*args, **kwargs))
Note, however, that asyncio.run
only works if there’s no other running event loop. If you expect an asyncio loop to already be running, use loop.create_task
instead.
def get_prices(*args, **kwargs): loop = asyncio.get_event_loop() coro = _get_prices(*args, **kwargs) loop.create_task(coro)
Then when python-rq
calls get_prices
it will cause the async function to be executed.
Another option would be to not use asyncio for making requests, like using grequests
, threads, or something like that which will work with synchronous functions.