I want to create multiple Azure Data Factory (ADF) pipelines that are using the same source and sink location, but with different files and tables. How can we do this in ADF when taking extensibility and maintainability of your pipelines into account?
Solution
Let's say we have a scenario where all the source files are stored into our Azure Storage, for example Blob Storage. We want to load these files into a traditional Data Warehouse (DWH) using an Azure SQL Database, that contains a separate schema for the Staging layer. Earlier we showed you how you can do this using a Foreach Loop in one single pipeline. For this scenario we will not this because:
- Not loading all Staging tables every day on the same time.
- Sometimes it will only load 3 of the total 5 files, because not all the files are coming for a certain day and can be different per day.
- Load can be spread during the day, several moments.
- Be more flexible per Staging load.
- In most cases you want to have the same Staging load for every table, but sometimes transformations can be different or you need an extra table to prepare the dataset.
- The use of technical columns in Staging table.
- In most cases you want to store some metadata, for example "inserted date" and "inserted by". When using a "Copy Data Activity", you have to configure the mapping section when the source and sink fields are not equal.
Note:
When working in a team, it is important to have a consistent way of development and have a proper naming convention in place. This contributes to the extensibility and maintainability of your application. Click here for an example of naming convention in ADF.
Pipeline
For this solution, we will create a pipeline that contains the following activities:
- Copy Data activity (load data from source to sink)
- Stored Procedure activity (update technical columns)
The source will be a CSV file and is stored in a Blob container. The data is based on the customer sales table from WideWorldImporters-Standard. The file will be delivered daily.
* this is the blog post date, but it should be the date of today
Note:
Blob storage containers only have virtual folders which means that the folder name is stored in the filename. Microsoft Azure Storage Explorer will show it as if it are real folders.
Storage account
|
bitoolssa
|
Blob container
|
sourcefiles
|
Blob folder
|
WWI
|
File name*
|
SalesCustomers20200322.csv
|
Note:
Blob storage containers only have virtual folders which means that the folder name is stored in the filename. Microsoft Azure Storage Explorer will show it as if it are real folders.
1) Linked Service
Open the ADF portal, go to Connections - Linked services and and click on New. Select Azure Blob Storage and give it a suitable (generic) name. Make connection with your storage account. Create another Linked service for Azure SQL Database, because that will be our destination (sink). Click here how to make connection using Azure Key Vault.
ADF portal - Create Linked Service |
2) Dataset (source)
Click New dataset and select Azure Blob Storage. The format is DelimitedText. Give it a suitable (generic) name and select the Linked Service for Blob Storage that we created earlier.
Once you click 'OK', it will open the dataset automatically. Go to Parameters and add the following:
- Container -> type "String"
- Directory -> type "String"
- File -> type "String"
- ColumnDelimiter -> type "String"
Go to Connection and now use the applicable parameters to fill File path. You can apply dynamic content for each setting. For now, we also added a parameter for "Column delimiter". At last, we use First row as header.
Click Publishing to save your content locally. For now we did not configure a Git Repository, but of course we recommend that.
Click Publishing to save your content locally. For now we did not configure a Git Repository, but of course we recommend that.
ADF portal - Create Source Dataset |
Note:
Use Preview data to verify if the source input is as expected by filling in the correct parameters values.
3) Dataset (sink)
Create another dataset for the sink. Choose Azure SQL Database and give it a suitable (generic) name. Select the Linked Service for Azure SQL Database that we created earlier and click 'OK'.
Add the following Parameters:
Add the following Parameters:
- SchemaName (String)
- TableName (string)
Go to Connection and click Edit. Fill in both parameters using dynamic content.
ADF portal - Create Sink Dataset |
4) Pipeline - Copy data
Create a new pipeline and include the schema and table name in the name, like "PL_Load_Stg_Customer". Go to Variables and add the following including values:
- Container -> type "String" and value "sourcefiles"
- Directory -> type "String" and value "WWI"
- File -> type "String" and value "SalesCustomers"
- ColumnDelimiter -> type "String" and value ","
Add the Copy data activity, go to Source and use dynamic content to assign the variables to the input parameters. For the file, we use also expression language to retrieve the correct name of the file dynamically: "@{variables('File')}@{formatDateTime(utcnow(), 'yyyyMMdd')}.csv"
Go to Sink and fill in the schema and table name. We use the SPLIT function to retrieve this from the pipeline name. We use a Pre-copy data script to truncate the table before loading.
At last, go to Mapping and click on Import schemas. This will automatically map the columns with the same names (source and sink). We will remove the columns that do not exists in the source, in this case our technical columns "InsertedDate" and "InsertedBy". We will fill those columns in the next activity.
At last, go to Mapping and click on Import schemas. This will automatically map the columns with the same names (source and sink). We will remove the columns that do not exists in the source, in this case our technical columns "InsertedDate" and "InsertedBy". We will fill those columns in the next activity.
ADF portal - Create Copy data activity |
5) Pipeline - Stored Procedure
Add the Stored Procedure activity and give it a suitable name. In SQL Account, select the Linked service for Azure SQL Database that we created earlier. We created a SP, that contains dynamic SQL to fill the columns "InsertedDate" and "InsertedBy" for every Staging table. See code below./* ========================================================================================================================== Stored Procedure [dbo].[uspPostLoadStaging] ========================================================================================================================== Description: This query will load the following columns: - InsertedDate - InsertedBy In ADF, without using Data Flows (Mapping), you can combine a Copy data activity with a Stored Procedure in order to fill those (technical) columns during execution of the pipeline. In SSIS this was done by the Derived Column task. ========================================================================================================================== Parameter Parameter description -------------------------------------------------------------------------------------------------------------------------- @InsertedDate Date when the data was inserted into the Staging table. @InsertedBy Name of a service / account that has inserted the data into the Staging table. ========================================================================================================================== ========================================================================================================================== Change history Date Who Remark -------------------------------------------------------------------------------------------------------------------------- 2020-03-22 Ricardo Schuurman Intial creation. ========================================================================================================================== */ CREATE PROCEDURE [dbo].[uspPostLoadStaging] @SchemaName AS NVARCHAR(255) , @TableName AS NVARCHAR(255) , @InsertedDate AS DATETIME , @InsertedBy AS NVARCHAR(255) AS BEGIN SET NOCOUNT ON; BEGIN TRY DECLARE @QueryStep1 AS NVARCHAR(MAX) , @QueryStep2 AS NVARCHAR(MAX) -- ERROR variables , @errorMsg AS NVARCHAR(MAX) = '' , @errorLine AS NVARCHAR(MAX) = '' /* Example values of SP parameters , @SchemaName AS NVARCHAR(255) , @TableName AS NVARCHAR(255) , @InsertedDate AS DATETIME , @InsertedBy AS NVARCHAR(255) SET @SchemaName = N'Stg' SET @TableName = N'Customer' SET @InsertedDate = GETDATE() SET @InsertedBy = N'Test' */ /* ================================================================================================================ Step 1: Extract schema and table name (based on the ADF pipeline naming convention) ================================================================================================================ */ -- Add LIKE condition for schema SET @SchemaName = '%' + @SchemaName + '%' SELECT @QueryStep1 = '[' + [TABLE_SCHEMA] + '].[' + [TABLE_NAME] + ']' FROM [INFORMATION_SCHEMA].[TABLES] WHERE TABLE_TYPE = 'BASE TABLE' AND TABLE_SCHEMA LIKE @SchemaName AND TABLE_NAME = @TableName /* ================================================================================================================ Step 2: Execute Update statement using the fixed (technical) columns ================================================================================================================ */ SET @QueryStep2 = N' UPDATE ' + @QueryStep1 + ' SET [InsertedDate] = ''' + CONVERT(NVARCHAR(30),@InsertedDate, 120) + ''' , [InsertedBy] = ''' + @InsertedBy + '''; ' EXEC SP_EXECUTESQL @Query = @QueryStep2 END TRY -------------------- Error Handling -------------------- BEGIN CATCH SELECT @errorMsg = ERROR_MESSAGE() , @errorLine = ERROR_LINE() SET @errorMsg = 'Error Occured with the following Message ' + @errorMsg + 'Error Line Number '+ @errorLine; THROW 50001, @errorMsg, @errorLine; END CATCH END
Create the SP in the database, go to Stored Procedure and select the SP. Click Import parameter and fill the parameters. We use the System variables 'Pipeline Name' and 'Pipeline trigger time' for "InsertedDate" and "InsertedBy". Reuse the values of "SchemaName" and "TableName" from the sink (copy data activity).
ADF portal - Create Stored Procedure activity |
Note:
Instead of using a Stored Procedure with an update query, you can also use the metadata columns of ADF itself. More information here.
Result
Execute the pipeline in ADF (by clicking Debug) and check the result in SQL Server Management Studio. It works!
Note:
In case of a "SqlFailedToConnect" error, make sure the firewall of the Azure SQL Database allows the Integration Runtime of ADF to access. Go to the Azure portal or use sp_set_firewall_rule on the database to create a firewall rule.
Summary
In this blog post you saw how you can create a dynamic pipeline using parameters and variables in combination with a proper naming convention in ADF, for example the names of your pipelines. This way of working can contribute to a consistent solution and code. In case of a new Staging load table, clone the existing pipeline and use the schema and table name in the pipeline name. You only have to change the mapping in the copy data activity.
To make your pipelines even more dynamic, you can also use metadata that is stored in a database instead of ADF itself and retrieve it using the Stored Procedure activity.
Thanks for posting detailed steps for making dynamic pipeline. I was thinking do you have git repo with source .CSV file and sink db script and pipeline json code
ReplyDeleteHad couple of query regarding stored procedure.
ReplyDelete1. Use of dynamic SQL within Stored Procedure. Both schema and table name is anyway parameterized in pipeline
2. Use of like clause for schema name in stored procedure.
Update statement in SP anyway will suffice the ask. Can you please clarify. Thanks
Excellent! Used this with a Get Metadata --> For Each combo to load 235 unique files. Thank you so much, keep up the good work.
ReplyDelete