Thursday 1 March 2018

Add email notification in Azure Data Factory V2

Case
I am running SSIS packages in Azure Data Factory (ADF V2), but I want to get an email notification when my package execution fails. It seems that ADF V2 doesn't have a built-in email notification option. How can I be notified without checking the built-in pipeline monitor in ADF?
Email notification on failure

















Solution
For this solution we will be using a Logic App to send an email and trigger it, if an error occurs in ADF. The starting point of this blog post is a working pipeline that executes an SSIS package using a stored procedure.
Data Factory loves Logic App











So, the solution exists of two parts: Logic App (email) and ADF (error handling). The communication between these two Azure parts is done with a JSON message via an HTTP request (post). The JSON message contains the name of the Data Factory and the pipeline that failed, an error message and an email address. You could of course hardcode the email address in Logic Apps, but now you can reuse the Logic App for various pipelines or data factories and notify different people.
{
    "properties": {
        "DataFactoryName": {
            "type": "string"
        },
        "PipelineName": {
            "type": "string"
        },
        "ErrorMessage": {
            "type": "string"
        },
        "EmailTo": {
            "type": "string"
        }
    },
    "type": "object"
}


a) Logic App
We first start with creating the Logic App. We need it before creating the error handler in ADF.

a1) Create new Logic App
Click on Create a resource and locate Logic App under Enterprise Integration. Pick a descriptive name like "ADF-Notifications" and then choose the Subscription, Resource Group and Location. For the Resource Group and this Logic app we use West Europe since we are from the Netherlands.
Create new Logic App




















a2) HTTP Trigger
When editing the Logic App we first need to pick a trigger. It is the event that starts this Logic App. Pick the HTTP trigger When a HTTP request is received and then click on edit to specify the parameters. Paste the JSON message from above in the textbox. In the next step we can use these parameters to setup the email.
Add HTTP trigger














a3) Send an email
Add a new step and choose Add an action. Search for "Send an email" and then scroll down to Office 365 Outlook - Send an email. The first time that you use this action you need to login with your Office 365 account. Now you can setup the email with fixed texts mixed with parameters from the JSON message from the previous step. When you are satisfied with the email setup, click on Save in the upper left corner.
Add action to Send an email














a4) Copy URL from HTTP trigger
The Logic App is ready. Click on the HTTP trigger and copy the URL. We need this in ADF to trigger the Logic App.
Copy the URL for ADF




















b) Data Factory
Next we will add a new activity in the existing ADF pipeline to trigger the new Logic App.

b1) Add Parameter
To specify the email address of the recipient we will use a pipeline parameter. Go to your existing pipeline (do not select any of the activities in it) and go to the Parameters page. Click on New and add a new String parameter called EmailTo. Add the email address of the recipient in the default value field. You can override the default value when editing the trigger (after save).
Add pipeline parameter













b2) Add Web activity
Next collapse the General activities and drag a Web activity to the canvas. Make sure to give it a suitable name like Error Notification. Add an Activity Dependency (Similar to the Precedence Constraints in SSIS) between the Stored Procedure activity and the Web activity. When right clicking it you can change it to Failure.
Add Web activity















b3) Web activity settings
Select the newly added Web activity and go to the Settings page. In the URL field you must paste the URL from step a4 and as method you need to select Post.
Next step is to add an new header with a JSON message. The header is called Content-Type and its expression is application/json. As body you need to add the following JSON message, but make sure to change the name of Stored Procedure activity. Ours is called Execute Package. The first two items are retrieving the Data Factory name and Pipeline name. The last one is getting the value of the parameter created in step b1.
{
    "DataFactoryName":
        "@{pipeline().DataFactory}",
    "PipelineName":
        "@{pipeline().Pipeline}",
    "ErrorMessage":
        "@{activity('Execute Package').error.message}",
    "EmailTo":
        "@{pipeline().parameters.EmailTo}"
}
.
Add URL and json to Web activity













b4) Testing
Now it is time to test the pipeline. Make sure something is failing in the package. For example by changing a servername or password in the SSIS environment. Or you could just pause your Integration Runtime and run the trigger. Now wait for the email to arrive.
Email notification received













The solution has one downside! Because you are handling the error with an Activity Dependency the entire pipeline succeeds despite of the failing SSIS stored procedure. Check the image below. The last 4 jobs did fail, but show the Status 'Succeeded'. Though there is an error message.
Failed or Succeeded?











b5) Add fail
If you want the correct status when the SSIS stored procedure fails then copy and paste the existing Stored Procedure activity, rename it to for example 'Fail' and replace SQL code with the code below. Then connect the Web activity to this new activity.
--T-SQL Code
Declare @err_msg NVARCHAR(150)
SET @err_msg=N'Error occurred, email was sent'
RAISERROR(@err_msg,15,1)
Add fail













Now we have a Failed status for a failing pipeline. Please leave a comment when you have a better or easier solution for this.
Status Failed












Summary
In this post we showed you how to use a Logic App to send you an email notification in case of a failing pipeline in Azure Data Factory. This saves you a daily login to the Azure portal to check the pipelines monitor. Feel free to adjust the JSON message to your own needs. You could also add an additional notification for successful jobs.

Update dec 1: instead of Office 365 you can now also use SendGrid

16 comments:

  1. Hi, Nice Post. Have a clarification on ADF pipeline restartability. In case of a pipeline failure, can I rerun the pipeline from a failure activity and not necessarily from the first activity. More like the option of executing from a particular step in the SQL Agent Jobs. Please clarify, appreciate your help, Thanks!

    ReplyDelete
    Replies
    1. yes, pipeline can be triggered from failed activity.

      Delete
  2. In my case, I applied the example for copy data pipeline, I followed the steps, but the following error return:{"code":"BadRequest","message":"The template validation failed: 'The action(s) 'Execute Package' referenced by 'inputs' in action 'ExecuteEmail Notification' are not defined in the template","target":"pipeline/CopyPipeline_... Any solution?

    ReplyDelete
    Replies
    1. I am having the same issue, were you able to figure out what's the problem?

      Delete
    2. You need to replace 'Execute Package' with the name of YOUR activity when pasting the JSON code in your ADF.

      Delete
    3. "ErrorMessage":
      "@{activity('Execute Package').error.message}",

      In this line you have to replace 'Execute Package' with the activity name, that you want to get error message from.

      Delete
  3. Error in your json payload in step b3:

    the EmailTo element should be:
    @{pipeline().parameters.EmailTo}
    not
    @pipeline().parameters.EmailTo

    You need the curly braces after the @, like the lines above it.

    ReplyDelete
  4. Super helpful, thanks for the writeup!

    ReplyDelete
  5. Hi Joost,
    This is an awesome post, the best one actually.
    I was looking for such solution for days.
    Your post explains it all details which are essential in order to implement this correctly.
    Thank you!

    ReplyDelete
  6. Hello,

    I am trying to get the Copy activty error message and passing it to the execute procedure task but its not working for me. tried the below expression.
    @{activity('copy').output.error.message
    @{activity('copy').output.errors.message
    @{activity('copy').output.errors.message[0]

    below is the error output from the copy activity

    "dataRead": 4192,
    "dataWritten": 0,
    "filesRead": 1,
    "rowsRead": 34,
    "rowsCopied": 0,
    "copyDuration": 6,
    "throughput": 0.682,
    "errors": [
    {
    "Code": 9123,
    "Message": "ErrorCode=UserErrorSqlBulkCopyInvalidColumnLength,'Type=Microsoft.DataTransfer.Common.Shared.HybridDeliveryException,Message=SQL Bulk Copy failed due to received an invalid column length from the bcp client.,Source=Microsoft.DataTransfer.ClientLibrary,''Type=System.Data.SqlClient.SqlException,Message=The service has encountered an error processing your request. Please try again. Error code 4815.\r\nA severe error occurred on the current command. The results, if any, should be discarded.,Source=.Net SqlClient Data Provider,SqlErrorNumber=40197,Class=20,ErrorCode=-2146232060,State=1,
    Errors=[{Class=20,Number=40197,State=1,Message=The service has encountered an error processing your request.
    Please try again. Error code 4815.,},{Class=20,Number=0,State=0,Message=A severe error occurred on the current command
    . The results, if any, should be discarded.,},
    ],'",
    "EventType": 0,
    "Category": 5,
    "Data": {},
    "MsgId": null,
    "ExceptionType": null,
    "Source": null,
    "StackTrace": null,
    "InnerEventInfos": []
    }
    ],
    "effectiveIntegrationRuntime": "DefaultIntegrationRuntime (East US)",
    "usedDataIntegrationUnits": 4,
    "usedParallelCopies": 1,
    "executionDetails": [
    {
    "source": {
    "type": "AzureDataLakeStore"
    },
    "sink": {
    "type": "AzureSqlDatabase"
    },
    "status": "Failed",
    "start": "2018-12-07T16:27:05.2430655Z",
    "duration": 6,
    "usedDataIntegrationUnits": 4,
    "usedParallelCopies": 1,
    "detailedDurations": {
    "queuingDuration": 3,
    "transferDuration": 2
    }
    }
    ]
    }

    ReplyDelete
  7. Hi I have used above steps and getting below error.
    {
    "errorCode": "2108",
    "message": "Error calling the endpoint. Response status code: ",
    "failureType": "UserError",
    "target": "ErrorEmailNotific"
    }

    I have setup below code in Web activity
    {
    "DataFactoryName":"@{pipeline().DataFactory}",
    "PipelineName":"@{pipeline().Pipeline}",
    "ErrorMessage":"@{activity('GetFileMapping').error.message}",
    "EmailTo":"@{pipeline().parameters.EmailTo}"
    }

    manually Logic app working fine , but not when calling it from ADF

    ReplyDelete
  8. Has someone found the solution for above problem as presented above by Javed Khan? If, so please insert here the answer as I have very similar issue :(

    Great thanks :)

    ReplyDelete
  9. Hi,

    I have tried with Copy Data task and rest is same as mentioned in the post,but still getting error.




    Error


    {"code":"BadRequest","message":"ErrorCode=InvalidTemplate, ErrorMessage=The template validation failed: 'The action(s) 'Execute Package' referenced by 'inputs' in action 'ExecuteError Notification' are not defined in the template","target":"pipeline/DemoPipeline/runid/c1359a3a-0f91-458d-8018-e7617f2b931a","details":null,"error":null}

    ReplyDelete
  10. Does anybody know how to call the duration in a logic app? I want the logic app to send me a "success" or "fail" email with the duration of how long the trigger took to process

    ReplyDelete

All comments will be verified first to avoid URL spammers. यूआरएल स्पैमर से बचने के लिए सभी टिप्पणियों को पहले सत्यापित किया जाएगा।