Wednesday 5 June 2019

Staging with the Azure Data Factory Foreach Loop

Case
When we where using SSIS we could use BIML to generate a whole bunch of staging packages for all tables in a certain database. Is there a similar solution for Data Factory where I don't have to create pipelines for all tables?
ForEach loop in Azure Data Factory















Solution
Azure Data Factory (ADF) has a For Each loop construction that you can use to loop through a set of tables. This is similar to BIML where you often create a For Each loop in C# to loop through a set of tables or files.

To keep things very simple for this example, we have two databases called Source and Stage. We already generated three tables in the Stage database that have the same structure and datatypes as in the Source database. We will empty the stage tables before filling them.

1) Create view in Stage
First we need a list of tables that we can use to loop through. For this example we used a view in the Stage database that retrieves all tables from the information schema. You can finetune this view with for example a where clause or use a filled table instead.
CREATE VIEW [dbo].[StageTables]
AS
  SELECT TABLE_SCHEMA
  ,      TABLE_NAME
  FROM   INFORMATION_SCHEMA.TABLES
  WHERE  TABLE_TYPE = 'BASE TABLE'
GO

Note: Use QUOTENAME to make deviating tablenames monkeyproof. It will add brackets around schema and table names.

2) Create Dataset for view
Now go to ADF (Author & Monitor dashboard) and create a new Dataset for the view above.
ADF - Add Dataset





















Select the right Data Store (Azure SQL Server in our example) and give your dataset a suitable name like 'StageTables'. Next add a new (or select an existing) Linked service. Then select the newly created view.
ADF - New Dataset























3) Create Dataset for Source tables
Next we will create a Dataset for the tables in the Source database, but instead of pointing to a particular table we will use a Parameter called 'SourceTable'. First go to the Parameters tab and create the variable. Then return to the Connection tab where the tablename can now be filled with Dynamic Content: @dataset().SourceTable
ADF - Create Dataset with parameter




















4) Create Dataset for Stage tables
Now repeat the exact same steps to create a Dataset for the Stage tables and call the Dataset 'Stage' and the Parameter 'StageTable'. The tablename can then be filled with Dynamic Content: @dataset().StageTable

5) Create Pipeline
At this point we have three Datasets (StageTables, Source and Stage) and two Linked Services (to the Source and Stage database). Now it's time to create the pipeline that exists of a Lookup (under General) and a ForEach (under Iteration & Conditionals).
ADF - Pipeline with Lookup and Foreach










6) Add Lookup
In the newly created pipeline we first need to add a Lookup activity that points to the Dataset called StageTables which points to the view. You can hit the Preview data button to check the tables.
ADF - Add Lookup


















7) Add Foreach
Next step is adding a Foreach activity that loops through the result of the lookup. On the Settings tab you can provide the items you want to loop though. In our case the output of the preceding Lookup activity from the previous step: @activity('my tables').output.value
On the Activities tab we can add a new activity that we want to execute for all our staging tables. For this example only a Copy Data activity which we will configure in the next step.
ADF - Configure ForEach


















Note: on the Settings tab you will also see the setting 'Batch count', this is the maximum number of parallel executions of the Copy Data Activity. If you have a lot of tables then you should probably max it with this setting!


8) Copy Data Activity
The last step is to configure the Copy Data Activity to copy the data from the Source to the Stage database. First give it a suitable name like: stage my table. Then go to the Source tab and select  'Source' (from step 3) as the Source dataset.
Because we added a parameter to this dataset, a new field called 'sourcetable' will appear. With an expression we will add the schema name and a table name from the ForEach loop construction: @concat(item().TABLE_SCHEMA,'.',item().TABLE_NAME)
Next we will repeat this for the Sink tab where we will use the Sink dataset called 'Stage'. The expression for the table name is the same as above: @concat(item().TABLE_SCHEMA,'.',item().TABLE_NAME)
ADF - Copy Data in ForEach



















9) Truncate Stage
To empty the Stage tables, you could of course add a Stored Procedure Activity before the Copy Data Activity to execute a Truncate or Delete statement. However the Copy Data Activity has a Pre-copy script option on the Sink tab that you can use to execute this same statement (thx colleague David van der Velden for the idea). Add the following expression:
@concat('TRUNCATE TABLE ', item().TABLE_SCHEMA,'.',item().TABLE_NAME)
ADF - Copy Data Pre-copy script




















Summary
In this post you have seen a BIML-SSIS alternative in Azure Data Factory to stage tables with the ForEach construction. If you want to stage an other table you can just add the empty table to the stage database and the ForEach will fill it automatically.


5 comments:

  1. Can we do the same thing with csv file having extra column as loaddatetime as GetDate()..? So that every file should have extra column loaddatetime where we will have current load datetime

    ReplyDelete
  2. Hi there, I have been following your article to loop through a list of Salesforce tables to pull to Azure SQL database. I get the following error "Value cannot be null.\r\nParameter name: properties". I'm wondering if you have any idea what this could be? Thanks.

    ReplyDelete
  3. Hey, I got this error, any idea how can I resolve this? Error is as follows:
    The function 'length' expects its parameter to be an array or a string. The provided value is of type 'Object'.

    Please let me know if you know how to resolve this one.

    ReplyDelete

All comments will be verified first to avoid URL spammers. यूआरएल स्पैमर से बचने के लिए सभी टिप्पणियों को पहले सत्यापित किया जाएगा।