I would like to call Durable Azure Functions from Azure Data Factory. I would like to post json to Functions and get status when Function processing have completed. My ultimate goals is to successfully run Functions which takes 10min without timeout.
I have already successfully executed Azure Function Activity from ADF with method GET.
Now I need advice to modify Python code of Orchestrator to acceppt json and use json values to filtering which Result set is processed. {“Country”: “Japan”}
Current Code base is from Tutorial: https://learn.microsoft.com/en-us/azure/azure-functions/durable/quickstart-python-vscode
I’m following Durable Function instruction from here: http://datanrg.blogspot.com/2020/10/using-durable-functions-in-azure-data.html
# This function an HTTP starter function for Durable Functions. # Before running this sample, please: # - create a Durable orchestration function # - create a Durable activity function (default name is "Hello") # - add azure-functions-durable to requirements.txt # - run pip install -r requirements.txt import logging import azure.functions as func import azure.durable_functions as df async def main(req: func.HttpRequest, starter: str) -> func.HttpResponse: client = df.DurableOrchestrationClient(starter) instance_id = await client.start_new(req.route_params["functionName"], None, None) logging.info(f"Started orchestration with ID = '{instance_id}'.") return client.create_check_status_response(req, instance_id) # This function is not intended to be invoked directly. Instead it will be # triggered by an HTTP starter function. # Before running this sample, please: # - create a Durable activity function (default name is "Hello") # - create a Durable HTTP starter function # - add azure-functions-durable to requirements.txt # - run pip install -r requirements.txt import logging import json import azure.functions as func import azure.durable_functions as df def orchestrator_function(context: df.DurableOrchestrationContext): result1 = yield context.call_activity('Hello', "Tokyo") result2 = yield context.call_activity('Hello', "Seattle") result3 = yield context.call_activity('Hello', "London") return [result1, result2, result3] main = df.Orchestrator.create(orchestrator_function) # This function is not intended to be invoked directly. Instead it will be # triggered by an orchestrator function. # Before running this sample, please: # - create a Durable orchestration function # - create a Durable HTTP starter function # - add azure-functions-durable to requirements.txt # - run pip install -r requirements.txt import logging def main(name: str) -> str: return f"Hello {name}!"
Advertisement
Answer
Now I need advice to modify Python code of Orchestrator to accept Json and use Json values to filter which Result set is processed. {“Country”: “Japan”}
import azure.durable_functions as df import azure.functions as func async def main(documents: func.DocumentList, starter: str): client = df.DurableOrchestrationClient(starter) instance_id = await client.start_new('MyDFOrchestrator', {"doc_list": [{doc1}, {doc2}, {doc3}]}) logging.info(f"Started orchestration ID {instance_id}")
JSON should be fine to provide to the orchestrator as an input value. Here is an example that accomplishes a similar goal. Although the example uses a http trigger, the region that has to be addressed has nothing to do with the trigger you choose to utilize in the starter or triggering function.
As an alternative, you may develop a concrete class that is serializable and has the model/entity structure (much cleaner than raw Json). All we need is for your class to export two static methods, to Json() and from Json, to generate serializable classes (). These classes will be called repeatedly by the Durable Functions framework to serialize and deserialize your custom class.