Sunday, 22 March 2020

Build dynamic pipelines in Azure Data Factory

Case
I want to create multiple Azure Data Factory (ADF) pipelines that are using the same source and sink location, but with different files and tables. How can we do this in ADF when taking extensibility and maintainability of your pipelines into account?














Solution
Let's say we have a scenario where all the source files are stored into our Azure Storage, for example Blob Storage. We want to load these files into a traditional Data Warehouse (DWH) using an Azure SQL Database, that contains a separate schema for the Staging layer. Earlier we showed you how you can do this using a Foreach Loop in one single pipeline. For this scenario we will not this because:
  • Not loading all Staging tables every day on the same time. 
    • Sometimes it will only load 3 of the total 5 files, because not all the files are coming for a certain day and can be different per day. 
    • Load can be spread during the day, several moments.
  • Be more flexible per Staging load.
    • In most cases you want to have the same Staging load for every table, but sometimes transformations can be different or you need an extra table to prepare the dataset.
  • The use of technical columns in Staging table.
    • In most cases you want to store some metadata, for example "inserted date" and "inserted by". When using a "Copy Data Activity", you have to configure the mapping section when the source and sink fields are not equal. 
One of the solutions is building dynamic pipelines. This will be a combination of parameters, variables and naming convention. The result will be a dynamic pipeline, that we can clone to create multiple pipelines using the same source and sink dataset. In this blog post we will focus on the Staging layer. Of course you can implement this for every layer of your DWH.

Note:
When working in a team, it is important to have a consistent way of development and have a proper naming convention in place. This contributes to the extensibility and maintainability of your application. Click here for an example of naming convention in ADF.

Pipeline

For this solution, we will create a pipeline that contains the following activities:
  • Copy Data activity (load data from source to sink)
  • Stored Procedure activity (update technical columns)
The source will be a CSV file and is stored in a Blob container. The data is based on the customer sales table from WideWorldImporters-Standard. The file will be delivered daily.

Storage account
bitoolssa
Blob container
sourcefiles
Blob folder
WWI
File name*
SalesCustomers20200322.csv
* this is the blog post date, but it should be the date of today

Note:
Blob storage containers only have virtual folders which means that the folder name is stored in the filename. Microsoft Azure Storage Explorer will show it as if it are real folders.

1) Linked Service

Open the ADF portal, go to Connections - Linked services and and click on New. Select Azure Blob Storage and give it a suitable (generic) name. Make connection with your storage account. Create another Linked service for Azure SQL Database, because that will be our destination (sink). Click here how to make connection using Azure Key Vault.
ADF portal - Create Linked Service























2) Dataset (source)

Click New dataset and select Azure Blob Storage. The format is DelimitedText. Give it a suitable (generic) name and select the Linked Service for Blob Storage that we created earlier. 

Once you click 'OK', it will open the dataset automatically. Go to Parameters and add the following:
  • Container -> type "String"
  • Directory -> type "String"
  • File -> type "String"
  • ColumnDelimiter -> type "String"
Go to Connection and now use the applicable parameters to fill File path. You can apply dynamic content for each setting. For now, we also added a parameter for "Column delimiter". At last, we use First row as header.

Click Publishing to save your content locally. For now we did not configure a Git Repository, but of course we recommend that.
ADF portal - Create Source Dataset
























Note:
Use Preview data to verify if the source input is as expected by filling in the correct parameters values.

3) Dataset (sink)

Create another dataset for the sink. Choose Azure SQL Database and give it a suitable (generic) name. Select the Linked Service for Azure SQL Database that we created earlier and click 'OK'.

Add the following Parameters:
  • SchemaName (String)
  • TableName (string)
Go to Connection and click Edit. Fill in both parameters using dynamic content. 
ADF portal - Create Sink Dataset














4) Pipeline - Copy data

Create a new pipeline and include the schema and table name in the name, like "PL_Load_Stg_Customer". Go to Variables and add the following including values:
  • Container -> type "String" and value "sourcefiles"
  • Directory -> type "String" and value "WWI"
  • File -> type "String" and value "SalesCustomers"
  • ColumnDelimiter -> type "String" and value ","
Add the Copy data activity, go to Source and use dynamic content to assign the variables to the input parameters. For the file, we use also expression language to retrieve the correct name of the file dynamically: "@{variables('File')}@{formatDateTime(utcnow(), 'yyyyMMdd')}.csv"

Go to Sink and fill in the schema and table name. We use the SPLIT function to retrieve this from the pipeline name. We use a Pre-copy data script to truncate the table before loading.

At last, go to Mapping and click on Import schemas. This will automatically map the columns with the same names (source and sink). We will remove the columns that do not exists in the source, in this case our technical columns "InsertedDate" and "InsertedBy". We will fill those columns in the next activity.

ADF portal - Create Copy data activity
















5) Pipeline - Stored Procedure

Add the Stored Procedure activity and give it a suitable name. In SQL Account, select the Linked service for Azure SQL Database that we created earlier. We created a SP, that contains dynamic SQL to fill the columns "InsertedDate" and "InsertedBy" for every Staging table. See code below.

/*
==========================================================================================================================
Stored Procedure [dbo].[uspPostLoadStaging]
==========================================================================================================================
Description:

This query will load the following columns:
- InsertedDate
- InsertedBy

In ADF, without using Data Flows (Mapping), you can combine a Copy data activity with a Stored Procedure in order to
fill those (technical) columns during execution of the pipeline. In SSIS this was done by the Derived Column task. 

==========================================================================================================================
Parameter     Parameter description
--------------------------------------------------------------------------------------------------------------------------
@InsertedDate    Date when the data was inserted into the Staging table.
@InsertedBy     Name of a service / account that has inserted the data into the Staging table.
==========================================================================================================================

==========================================================================================================================
Change history

Date  Who      Remark
--------------------------------------------------------------------------------------------------------------------------
2020-03-22 Ricardo Schuurman  Intial creation.
==========================================================================================================================
*/

CREATE PROCEDURE [dbo].[uspPostLoadStaging]
 @SchemaName   AS NVARCHAR(255)
, @TableName   AS NVARCHAR(255)
, @InsertedDate  AS DATETIME
, @InsertedBy   AS NVARCHAR(255)

AS
BEGIN

 SET NOCOUNT ON;
 BEGIN TRY

  DECLARE
   @QueryStep1    AS NVARCHAR(MAX)
  , @QueryStep2    AS NVARCHAR(MAX)

  -- ERROR variables
  , @errorMsg    AS NVARCHAR(MAX) = ''
  , @errorLine    AS NVARCHAR(MAX) = ''

  /* Example values of SP parameters
  , @SchemaName   AS NVARCHAR(255)
  , @TableName   AS NVARCHAR(255)
  , @InsertedDate  AS DATETIME
  , @InsertedBy   AS NVARCHAR(255)

  SET @SchemaName   = N'Stg'
  SET @TableName   = N'Customer'
  SET @InsertedDate  = GETDATE()
  SET @InsertedBy   = N'Test'

  */

  /* ================================================================================================================
   Step 1: Extract schema and table name (based on the ADF pipeline naming convention)
   ================================================================================================================ */

  -- Add LIKE condition for schema
  SET @SchemaName = '%' + @SchemaName + '%'

  SELECT 
   @QueryStep1 = '[' + [TABLE_SCHEMA] + '].[' + [TABLE_NAME] + ']'
  FROM [INFORMATION_SCHEMA].[TABLES]
  WHERE TABLE_TYPE = 'BASE TABLE' 
  AND TABLE_SCHEMA LIKE @SchemaName
  AND TABLE_NAME = @TableName

  /* ================================================================================================================
   Step 2: Execute Update statement using the fixed (technical) columns
   ================================================================================================================ */

  SET @QueryStep2 = N'
   UPDATE ' + @QueryStep1 + '
   SET 
    [InsertedDate]  = ''' + CONVERT(NVARCHAR(30),@InsertedDate, 120) + '''
   , [InsertedBy]  = ''' + @InsertedBy + ''';
  '

  EXEC SP_EXECUTESQL 
   @Query = @QueryStep2

 END TRY

  -------------------- Error Handling  --------------------

  BEGIN CATCH 

    SELECT 
     @errorMsg = ERROR_MESSAGE()
    , @errorLine = ERROR_LINE()

    SET @errorMsg = 'Error Occured with the following Message ' + @errorMsg + 'Error Line Number  '+ @errorLine;

    THROW 50001, @errorMsg, @errorLine;

  END CATCH

END


Create the SP in the database, go to Stored Procedure and select the SP. Click Import parameter and fill the parameters. We use the System variables 'Pipeline Name' and 'Pipeline trigger time' for "InsertedDate" and "InsertedBy". Reuse the values of "SchemaName" and "TableName" from the sink (copy data activity).
ADF portal - Create Stored Procedure activity


















Note:
Instead of using a Stored Procedure with an update query, you can also use the metadata columns of ADF itself. More information here.

Result
Execute the pipeline in ADF (by clicking Debug) and check the result in SQL Server Management Studio. It works!












Note:
In case of a "SqlFailedToConnect" error, make sure the firewall of the Azure SQL Database allows the Integration Runtime of ADF to access. Go to the Azure portal or use sp_set_firewall_rule on the  database to create a firewall rule. 


Summary
In this blog post you saw how you can create a dynamic pipeline using parameters and variables in combination with a proper naming convention in ADF, for example the names of your pipelines. This way of working can contribute to a consistent solution and code. In case of a new Staging load table, clone the existing pipeline and use the schema and table name in the pipeline name. You only have to change the mapping in the copy data activity.

To make your pipelines even more dynamic, you can also use metadata that is stored in a database instead of ADF itself and retrieve it using the Stored Procedure activity. 

3 comments:

  1. Thanks for posting detailed steps for making dynamic pipeline. I was thinking do you have git repo with source .CSV file and sink db script and pipeline json code

    ReplyDelete
  2. Had couple of query regarding stored procedure.
    1. Use of dynamic SQL within Stored Procedure. Both schema and table name is anyway parameterized in pipeline
    2. Use of like clause for schema name in stored procedure.

    Update statement in SP anyway will suffice the ask. Can you please clarify. Thanks

    ReplyDelete
  3. Excellent! Used this with a Get Metadata --> For Each combo to load 235 unique files. Thank you so much, keep up the good work.

    ReplyDelete

All comments will be verified first to avoid URL spammers. यूआरएल स्पैमर से बचने के लिए सभी टिप्पणियों को पहले सत्यापित किया जाएगा।