Skip to content
Advertisement

Python Azure Data Factory Update Pipeline

I want to use Python to add an activity to a pipeline in Azure Data Factory. With the following code I am replacing the actual activity but not adding a new one:

p_name = 'test'
act_name = 'Wait4'

Wait_activity = WaitActivity(name=act_name,wait_time_in_seconds=5)


p_obj = PipelineResource(activities=[Wait_activity])
p = adf_client.pipelines.create_or_update(rg_name, df_name, p_name, p_obj)

This is the pipeline before running the code : enter image description here

After running the code:

enter image description here

Expected :

enter image description here

Advertisement

Answer

Researched the statements in source code:

enter image description here

So when you update the pipeline, the activities property should be the list of activities in pipeline, not single one.

For example:

wait_activity = WaitActivity(name="waittest", type="Wait", wait_time_in_seconds=100, )
ActivityDependency = [{"activity":"waittest","dependencyConditions":["Succeeded"]}]
wait_activity1 = WaitActivity(name="waittest1", type="Wait", wait_time_in_seconds=100,depends_on=ActivityDependency)


p_name = 'testforadf'
p_obj = PipelineResource(
        activities=[wait_activity, wait_activity1])
p = adf_client.pipelines.create_or_update(rg_name, df_name, p_name, p_obj)

Please note two lines:

activities=[wait_activity, wait_activity1])

This property should contains all of your activities.

ActivityDependency = [{"activity":"waittest","dependencyConditions":["Succeeded"]}]

This is the dependency conditions between your activities.

My output:

enter image description here

Any concern, please let me know.


Well,please see my sample code:

The premise is that I already have the above two wait activities

adftest = adf_client.pipelines.get(rg_name,df_name,p_name)
print(adftest)
for activity in adftest.activities :
    print(activity.name)
    print(activity.type)

Then output is :

{'additional_properties': None, 'id': '/subscriptions/b83c1ed3-c5b6-44fb-b5ba-2b83a074c23f/resourceGroups/v-jugong-ChinaCXPTeam/providers/Microsoft.DataFactory/factories/jaygongadf/pipelines/testforadf', 'name': 'testforadf', 'type': 'Microsoft.DataFactory/factories/pipelines', 'etag': 'ed006cf3-0000-0800-0000-5da970600000', 'description': None, 'activities': [<azure.mgmt.datafactory.models.wait_activity_py3.WaitActivity object at 0x000001C05FEDE0F0>, <azure.mgmt.datafactory.models.wait_activity_py3.WaitActivity object at 0x000001C05FED6DA0>], 'parameters': None, 'variables': None, 'concurrency': None, 'annotations': None, 'folder': None}
waittest
Wait
waittest1
Wait

Then you could see the objects in above activities property. Besides,you could see their types: 'activities': [<azure.mgmt.datafactory.models.wait_activity_py3.WaitActivity object at 0x000001C05FEDE0F0>, <azure.mgmt.datafactory.models.wait_activity_py3.WaitActivity object at 0x000001C05FED6DA0>]

They are WaitActivity type, so you could view their loop the activity to get every item in it using :

for activity in adftest.activities :
        print(activity.name)
        print(activity.type)

You could view what properties the WaitActivity type contains, likename,type in source code statements.(For me, i used Pycharm to test code,the IDE could detect source code directly)

enter image description here

Then if you want to add one more activity,for example, one more WaitActivity:

wait_activity2 = WaitActivity(name="waittest2", type="Wait", wait_time_in_seconds=100, )
adftest.activities.append(wait_activity2)
p = adf_client.pipelines.create_or_update(rg_name, df_name, p_name, adftest)

Please see above code, i created a new WaitActivity named wait_activity2 ,then append it into activities array. Then update the pipeline as normal, you will find the new activity :

enter image description here

User contributions licensed under: CC BY-SA
6 People found this is helpful
Advertisement