Tuesday, 29 December 2020

ADF Snack: get files of last hour

Case
I have a Data Factory pipeline that should run each hour and collect all new files added to the data lake since the last run. What is the best activity or do we need to write code?
No scripts, no loops





Solution
The Copy Data Activity has a wildcard filter which allows you to read multi files (of the same type/format) at once. So no need for a ForEach Activity to process multiple files at once. Combine that with the start- and enddate filter option within that same Copy Data Activity and you can limit the files to a certain period.

Date filter
The End datetime property will be populated with the start-datetime of the current ADF pipeline. So files added during the run of the pipeline will be skipped and processed during the next run. This End datetime will also be stored afterwards for the next run.
The Start datetime will be retrieved from a database table. The previous run of the pipeline stored its End datetime as the Start datetime for the next run.
The basic setup





1) Table and stored procedures
To store (and retrieve) the datetime from the pipeline we use a database table and some Stored Procedures. To keep it easy we kept it very very basic. Feel free to extend it to your own needs. In this solution there will be only one record per source. The SetLastRun stored procedure will either insert a new record or update the existing record via a MERGE statement. The GetLastRun stored procedure will retrieve the datetime of the last run and return a default date if there is no record available.
-- Create runs table
CREATE TABLE [dbo].[Runs](
	[SourceName] [nvarchar](50) NOT NULL,
	[LastRun] [datetime2](7) NULL,
	CONSTRAINT [PK_Runs] PRIMARY KEY CLUSTERED 
	(
	[SourceName] ASC
	)
)

-- Save the lastrundate
CREATE PROCEDURE SetLastRun
(
    @SourceName as nvarchar(50)
,	@LastRun as datetime2(7)
)
AS
BEGIN
    -- SET NOCOUNT ON added to prevent extra result sets from
    -- interfering with SELECT statements.
    SET NOCOUNT ON

	-- Check if there already is a record
	-- Then Insert of Update a record
    MERGE dbo.Runs AS [Target]
    USING (SELECT @SourceName, @LastRun) AS [Source] ([SourceName], [LastRun])  
    ON ([Target].[SourceName] = [Source].[SourceName])  
    WHEN MATCHED THEN
        UPDATE SET [LastRun] = [Source].[LastRun]  
    WHEN NOT MATCHED THEN  
        INSERT ([SourceName], [LastRun])  
        VALUES ([Source].[SourceName], [Source].[LastRun]);  
END
GO

-- Retrieve the lastrundate
CREATE PROCEDURE GetLastRun
(
    @SourceName as nvarchar(50)
)
AS
BEGIN
	DECLARE @DefaultDate as datetime2(7) = '1-1-1900'

	-- Retrieve lastrun and provide default date in case null
	SELECT	ISNULL(MAX([LastRun]), @DefaultDate) as [LastRun]
	FROM	[Runs] 
	WHERE	[SourceName] = @SourceName
END
GO

2) Retrieve datetime last run
So the first step in the pipeline is to execute a stored procedure that retrieves the End datetime of the previous run. As mentioned a default datetime will be returned if there is no previous run available. Due the lack of getting output parameters we will use the Lookup activity instead of the Stored Procedure activity.
  • Add a Lookup activity to the pipeline and give it a descriptive name
  • On the Settings tab add or reuse a Source dataset (and Linked service) that points to the database containing the table and store procedures of the previous step (don't point to a specific table).
  • Choose Stored Procedure under the Use query property
  • Select 'GetLastRun' as Stored procedure name and hit the Import button to get the paramaters from the stored procedure
  • Now either use a hardcoded source name or use an expression like @pipeline().Pipeline to for example use the pipeline name as source.
Execute Stored Procedure via Lookup to retrieve last rundate



















3) Copy Data Activity
The second step is to retrieve the actual data from the data lake with a Copy Data activity. With two expressions it will first retrieve the datetime of the previous step and use it as the starttime filter and secondly retrieve the Start datetime of the pipeline itself and use that as Endtime filter.
  • Add the Copy Data Activity and set it up to load a specific file from the data lake to a SQL Server table (or your own destination)
  • Now on the Source tab change the File path type to Wildcard file path
  • Then set the Wildcard file name to for example read all CSV files with *.csv instead of a specific file.
  • Next set the Start time (UTC) property under Filter by last modified to the following expression:
    @activity('Get Last Run').output.firstRow.LastRun. Where the yellow marked text is the name of the previous task and the green marked text is the output of the Stored Procedure (more details here).
  • Also set the End time (UTC) property with the following expression:
    @pipeline().TriggerTime (this will get the actual starttime of the pipeline)
  • You also might want to add an extra metadata column with the Filename via the Additional columns option (more details here).
Set up wildcard and datetime filters





















4) Store datetime for next run
The last step is to save the Start datetime of the pipeline itself as run datetime so that it can be retrieved in the next run. Since this Stored Procedure doesn't have any output parameters we can use the standard Stored Procedure Activity.
  • Add the Stored Procedure activity and connect it to the previous activity
  • On the Settings tab reuse the same Linked service as in step 2
  • Select SetLastRun as the Stored procedure name
  • Hit the import button and set the parameters
    • LastRun should be filled with the startdatetime of the pipeline: @pipeline().TriggerTime
    • SourceName should be filled with the same expression as in step 2
Add Stored Procedure to save rundate
















5) Schedule
Now just schedule your pipeline every x minutes or x hours with a trigger to keep your database table up-to-date with files from the data lake. Then keep adding files to the data lake and watch your runs table (step 1) and the actual staging table to see the result. The optional metadata column of step 3 should make debugging and testing a lot easier.


Summary
In this post you learned how to use the wildcard and filter option of the Copy Data activity to create a mechanism to keep your data up-to-date. A downside of this solution is that it will sometimes run the pipeline unnecessarily because no new files where added to the data lake. An other downside is that the process is not realtime.

If you need a more (near-)realtime solution instead of running every x minutes or hours then you can use the trigger solution. Then you process files as soon as they arrive. However that solution has two downsides. First of all you are running the pipeline for each file. Which means you are paying for each file. Secondly there is a limit for the number of files that can be triggered per hour as specially when you don't want (or can't) process files in parallel. The execution queue has a limit of 100 executions per pipeline. After that you will receive an error and miss that file.