Skip to content
Advertisement

Azure ServiceBus using async/await in Python seems not to work

I’m trying to read messages from Azure ServiceBus Topics using async/await and then forward the content to another application via HTTP. My code is simple:

import asyncio
from aiohttp import ClientSession
from azure.servicebus.aio.async_client import ServiceBusService

bus_service = ServiceBusService(service_namespace=..., shared_access_key_name=..., shared_access_key_value=...)

async def watch(topic_name, subscription_name):
    print('{} started'.format(topic_name))

    message = bus_service.receive_subscription_message(topic_name, subscription_name, peek_lock=False, timeout=1)

    if message.body is not None:
        async with ClientSession() as session:
            await session.post('ip:port/endpoint',
                               headers={'Content-type': 'application/x-www-form-urlencoded'},
                               data={'data': message.body.decode()})


async def do():
    while True:
        for topic in ['topic1', 'topic2', 'topic3']:
            await watch(topic, 'watcher')


if __name__ == "__main__":
    asyncio.run(do())

I want to look for messages (forever) from various topics and when a message arrives send the POST. I import the aio package from azure which should work in an async way. After many attempts, the only solution I got is this with while True and setting the timeout=1. This is not what I wanted, I’m doing it sequentially.

azure-servicebus version 0.50.3.

This is my first time with async/await probably I’m missing something…
Any solution/suggestions?

Advertisement

Answer

Here’s how you’ll do it with the latest major version v7 of servicebus Please take a look a the async samples to send and receive subscription messages https://github.com/Azure/azure-sdk-for-python/blob/04290863fa8963ec525a0b2f4079595287e15d93/sdk/servicebus/azure-servicebus/samples/async_samples/sample_code_servicebus_async.py

import os
import asyncio
from aiohttp import ClientSession
from azure.servicebus.aio import ServiceBusClient
connstr = os.environ['SERVICE_BUS_CONNECTION_STR']
topic_name = os.environ['SERVICE_BUS_TOPIC_NAME']
subscription_name = os.environ['SERVICE_BUS_SUBSCRIPTION_NAME']

async def watch(topic_name, subscription_name):
    async with ServiceBusClient.from_connection_string(conn_str=servicebus_connection_str) as servicebus_client:
        subscription_receiver = servicebus_client.get_subscription_receiver(
            topic_name=topic_name,
            subscription_name=subscription_name,
        )
    async with subscription_receiver:
         message = await subscription_receiver.receive_messages(max_wait_time=1)

    if message.body is not None:
        async with ClientSession() as session:
            await session.post('ip:port/endpoint',
                               headers={'Content-type': 'application/x-www-form-urlencoded'},
                               data={'data': message.body.decode()})

async def do():
    while True:
        for topic in ['topic1', 'topic2', 'topic3']:
            await watch(topic, 'watcher')


if __name__ == "__main__":
    loop = asyncio.get_event_loop()
    loop.run_until_complete(do())
User contributions licensed under: CC BY-SA
7 People found this is helpful
Advertisement