Tuesday, 13 October 2020

Execute pipelines from an other Data Factory Sync

Case
Can I use the Execute Pipeline Activity to execute pipelines from an other Data Factory? Or do we need an other activity for this?
Can you use the Execute Pipeline Activity for this?
























Solution
First of all why would you like to have multiple Data Factories other then for a DTAP-street (Development, Testing, Acceptance and Production). There are several reasons, for example:
  • Different departments/divisions each having their own Data Factory
    For example to prevent changes to your pipelines by users of a different department or to make it easier to split the Azure consumption between two divisions.
  • Different regions for international companies
    Either due legal reasons where for example the data may not leave the European Union
    Or to prevent paying unnecessary outbound data costs when your data is spread over different regions
  • Security reasons
    To prevent others to use the access provided via the Managed Identity of ADF. If you give your ADF access via MSI to an Azure Key Vault or an Azure Storage Account then everybody using that ADF can access that service via ADF.
If you have any other good reasons to use multiple Data Factories please let us know in the comments below.

And although you may have multiple Data Factories you could still use one Data Factory to execute pipelines from a different Data Factory. However you cannot use the Execute Pipeline Activity because it can only execute pipelines within the same Data Factory. 

You can either use the Web Activity or the Web Hook Activity for this. The Web Activity always executes the pipeline asynchronous. This means it does not wait for the result. The Web Hook Activity executes the child pipeline synchronous. Which means it waits until the child pipeline is ready and you could also retrieve the execution result via a call back. In this blog post we will show the synchronous Webhook Activity and in a previous blog post we already showed the asynchronous Web Activity.

1) Give parent access to child via MSI
We will not use a user to execute the pipeline in the child(/worker) Data Factory, but instead we will give the managed identity (MSI) of the parent(/master) Data Factory access to the child(/worker) Data Factory. The minimum role needed is Data Factory Contributor, but you could also use a regular Contributor or Owner (but less is more).
  • Go to the child(/worker) Data Factory (DivisionX in this example)
  • In the left menu click on Access control (IAM)
  • Click on the +Add button and choose Add role assignment
  • Select Data Factory Contributor as Role
  • Use Data Factory as Assign access to
  • Optionally change the subscription
  • Optionally enter a (partial) name of your parent ADF (if you have a lot of data factories)
  • Select your parent ADF and click on the Save button
Give one ADF access to other ADF















2) Add Call Back to Child Pipeline
The parent(/master) pipeline will call the child(/worker) and wait until it receives a call back or until the timeout exceeds. Therefor we need to add a call back activity (in the form of a Web/Webhook activity) to the child pipeline. This activity should be the last activity in your pipeline. There can be more (for example one for success and one for failure), but only the first call back will be handled by the parent.
  • Go to the child(/worker) Data Factory (DivisionX in this example)
  • Open ADF via the Author &Monitor link
  • Open the pipeline that you want to execute from the parent(/master) pipeline
  • Add a String parameter called callBackUri. The value will be automatically provided by the parent
  • Add a String parameter called myInputParam1. The value will be manually provided by the parent
  • Add a Web Activity with the following settings
    • URL: @pipeline().parameters.callBackUri   (to retrieve the pipeline parameter)
    • Method: POST
    • Body: {"Output":{"myOutputParam1":"failed"},"StatusCode":"401"}
      Every status code above 399 will tell the parent that the child failed. This can be a random number, but you could also use the official list to give a little more meaning. An other option is to use the output tag which can contain one or more 'output' parameters that can be read by the parent pipeline. You could for example pass through the error message with the Add dynamic content option.

      If you also want to pass an error description to the parent then you must extend the json message with an error tag {"Output":{"myOutputParam1":"failed"},"StatusCode":"401","error":"ErrorCode":"ParameterError","Message":"Required parameters where not provided"}}
Add two pipeline parameters










Web Activity for callback 













3) Determine URL
This solution will call the Create Run RestAPI of ADF to execute the pipeline. For this you need to replace the marked parts of the URL below by the Subscription ID, Resource Group name, Data Factory name and the Pipeline name of the child(/worker) pipeline. We will use this URL in the next step.

Example URL:
https://management.azure.com/subscriptions/aaaaaa-bbbb-1234-cccc-123456789/resourceGroups/DivisionX/providers/Microsoft.DataFactory/factories/DevisionX-ADF/pipelines/MyChildPipeline/createRun?api-version=2018-06-01


4) Webhook Activity
So for this second example we will use the Webhook Activity. This will execute the pipeline of the child(/worker) pipeline, but now it will wait until it receives a call back or until the timeout exceeds. Besides a status it can also retrieve messages from the child(/worker) ADF. See the json message in step two.
  • Go to your master ADF and click on Author & Monitor
  • Create a new pipeline and add a Webhook Activity to the canvas of the new pipeline
  • Give it a suitable descriptive name on the General Tab
  • Go to the Settings tab and enter the URL of the previous step
  • Choose POST as Method
  • Add a new header called Content-Type and with value application/json
  • As Body enter a JSON message. This could either be a dummy message or you could supply parameters in this message. The child parameter is called myParam1: {"myInputParam1":"bla bla"}
  • Use MSI as Authentication method
  • Enter this URL as Resource: https://management.core.windows.net/
Webhook Activity calling a child pipeline in an other ADF























5) Testing
Now trigger the new parent pipeline and check the monitor of the child ADF. You will see it receives two parameters. One provided in the JSON message in de BODY property and one provided by the Webhook activity itself.
Child pipeline - two parameters










Note that the child pipeline did not fail, because we handled the error. However the parent pipeline did fail because we sent a status code higher than 399. Next check the monitor of the child ADF and see the output parameters of the Webhook activity. It received a value from the child pipeline that could be used in a next activity
Parent pipeline - the result of the callback















Summary
In this blog post you learned how to give one ADF access to an other ADF and how to execute pipelines in that other ADF. The Webhook activity solution gives you a little more control compared to the Web activity. And with the correct json messages you can pass through messages and parameters from the parent to the child and back to the parent.

.