Skip to content
Advertisement

Listen for Microsoft.Storage.BlobCreated event using python

I need to monitor Azure Blob Storage and check if a blob is created in Python. In Logic Apps, this is how it looks:

Can you help me out and tell me how I should write this in Python?

Advertisement

Answer

  • One of the workarounds is to use Azure functions Blob trigger for python where you can able to retrieve the details for the blob once it has been added to the storage.

  • For checking if the blob is created/exists in blob storage you can check one of my answers from this SO Thread

import logging
import azure.functions as func from azure.storage.blob import
BlockBlobService


def main(req: func.HttpRequest) -> func.HttpResponse:
    logging.info('Python HTTP trigger function processed a request.')
    
    ContainerName="<YOUR_CONTAINER_NAME>";
    AccountName="<YOUR_ACCOUNT_NAME>";
    AccountKey="<ACCESS_KEY>";

    block_blob_service = BlockBlobService(account_name=AccountName, account_key=AccountKey)
    
    name = req.params.get('name')
    if not name:
        try:
            req_body = req.get_json()
        except ValueError:
           return func.HttpResponse(f"Enter the blob you are searching for")
        else:
            name = req_body.get('name')

    if block_blob_service.exists(container_name=ContainerName, blob_name=name):
        return func.HttpResponse(f"{name}, Is already Present in container")
    else:
        return func.HttpResponse(
             "{name}, Is not Present in container",
             status_code=200
        ) ```

REFERENCES:

  1. Azure Blob storage trigger for Azure Functions – MSFT Docs
  2. Manage blobs with Python – MSFT Docs
Advertisement