Wednesday, 1 December 2021

ADF: Looping through pipelines and execute them

Case
I want to loop through my ADF pipelines and then execute them in a foreach loop, but the pipeline property of the Execute Pipeline activity doesn't support dynamic content. Is this possible?
Execute Pipeline





















Solution
The standard Execute Pipeline activity is pretty much useless for this specific case, but with another activity it is possible. However don't use this workaround to execute a whole bunch of similar pipelines like we used to do in the SSIS era. In that case it is just better to invest your time in creating a more flexible/configurable pipeline that can handle multiple tables or files.

Now the workaround:
Looping and executing pipelines













It uses a Web activity to get all pipelines via a Rest API. Then there is a Filter to get only a selection of all those pipelines. After that the Foreach loop with another Web activity in it will execute the pipelines via a Rest API call.

1) Access control (IAM)
This solution uses Rest APIs from Azure Data Factory. This means we need to give this ADF access to its own resources so that is can call those Rest APIs.
  • Go to your ADF in the Azure Portal
  • Click on the ellipsis button (three dots) to copy the ADF name
  • Click on Access control (IAM) in the left menu
  • Click on +Add and choose Add role assignment
  • Select the role with just enough access (less is more). Data Factory Contributor is perfect for this example
  • Then select members. In the search window you can paste your ADF name. Click on your ADF and then push the Select button
  • Now review and assign the role to your ADF
Give your ADF access to its own resources












2) Web activity - Get all pipelines
We need a collection of pipelines for the Foreach loop. The Rest API list-by-factory retrieves all pipelines from a single Data Factory. You need to prepare the Rest API url by replacing all parts between the curly braces with the info of your own ADF (also remove the curly braces):

https://management.azure.com/subscriptions/{subscriptionId}/resourceGroups/{resourceGroupName}/providers/Microsoft.DataFactory/factories/{factoryName}/pipelines?api-version=2018-06-01

Tip: If you copy the URL of the ADF overview page in the Azure portal then you have all the information you need.
  • Add the Web activity to your pipeline and give it a suitable name (you need it in the next activity)
  • On the Settings tab enter the adjusted URL from above in the URL field
  • Select GET as method
  • Set Authenication to Managed Identity
  • Use https://management.azure.com/ in the Resource field
Web activity - Get all pipelines

























3) Filter pipelines
Now we have all pipelines available in our collection, but we need to add a filter to only get the required pipelines. The easiest way to do this is by putting them all in a specific folder or giving them all the same prefix. If you're using a folder then you can use the first expression. It first checks whether the pipeline contains a property named 'folder' because pipelines in the root will not have this property. Then it checks whether that property is filled with the value 'demo' (the name of our folder).
@and(
   contains(item().properties, 'folder'),
   equals(item().properties.folder.name,'demo')
)
If you want to use the prefix then the expression is less complex with only a startswith expression: @startswith(item().name, 'Sub_')

  • Add the Filter activity to the pipeline and give it a suitable name. We need it in the Foreach. Connect it to the Web activity.
  • For Items add the following expression @activity('Get All Pipelines').output.value (enter your own activity name)
  • For Condition add one of the above expressions
Filter activity






















4) Foreach loop
The foreach loop is very straightforward. Use the filter activity in the items field with an expression like this @activity('Filter on folder demo').output.value (replace the activity name).
Foreach activity

















5) Web activity - Execute pipeline
Within the foreach loop we need to add an other Web activity. This one will call the create-run Rest API which will execute an ADF pipeline. Just like in step 2 we need to adjust the example URL from the documentation:

https://management.azure.com/subscriptions/{subscriptionId}/resourceGroups/{resourceGroupName}/providers/Microsoft.DataFactory/factories/{factoryName}/pipelines/{pipelineName}/createRun?api-version=2018-06-01

However we need to make it change every iteration of the Foreach because the pipeline name is part of the URL. You could create a expression where you only change that pipeline name and leave the rest hardcoded, but besides the pipeline name that collection of pipelines also contains a ID property that contains 90% of the URL we need. If you debug the pipeline and check the output of the first Web activity or the Filter activity you can check their output. The ID property is filled with something like:

/subscriptions/7q618f21-ad62-4e1d-9019-4a23beda7777/resourceGroups/RG_ADF_DEV/providers/Microsoft.DataFactory/factories/DataFactory2-DEV/pipelines/Sub_pipeline1

We only need to add something in front of it and behind of it and then you have the correct URL:
@concat('https://management.azure.com',
replace(item().id, ' ', '%20'),
'/createRun?api-version=2018-06-01'
)
The replace is to replace spaces, which are not allowed in a URL, by %20. So if a pipeline name contains a space then it will be replaced.
  • Add the Web activity to the Foreach and give it a suitable name 
  • On the Settings tab enter the expression above as URL by first clicking on the 'Add dynamic content' link below the field
  • Select POST as method
  • Now we don't need a Body for the Rest API but the Web activity requires it when the method is POST. Enter a dummy JSON message like: {dummy:"dummy"}
  • Set Authenication to Managed Identity
  • Use https://management.azure.com/ in the Resource field
The Web activity calls the pipelines asynchronous (execute and don't wait for an answer). If you want a synchronous call and perhaps get some feedback if the pipeline fails then you need to replace the Web activity by a Webhook activity.

6) The result
Now run the pipeline and check the result. Make sure to check the monitor. Then you will see one big disadvantage. Each execution will become a separate run with each its own Run ID. This means it will be a little bit more work to connect these in your logging, but it is possible because the output of the Web activity will return the Run ID.
ADF Monitor






Conclusion
In this post you learned how to execute pipelines in a loop via the Web activity and Rest APIs. Because each pipeline will get its own Run ID the logging needs some extra attention. You can also use the Web activity contruction to execute pipelines from an other Data Factory.

Unfortunately you cannot use this same trick for Data Flows because at te moment there is no Rest API to execute a pipeline (only in debug mode).




No comments:

Post a Comment

All comments will be verified first to avoid URL spammers