Showing posts with label DATA_LAKE. Show all posts
Showing posts with label DATA_LAKE. Show all posts

Monday, 20 March 2023

Synapse - Creating Silver Delta Tables

Case
I want to create and fill a Silver layer based on parquet files in my bronze layer. Is there a simple way to create and populate the tables automatically.
Adding files to your Silver layer












Solution
You can create a notebook for this and then call that notebook from your Synapse pipeline with some parameters (location, table name and keys). This allows you to for example loop through all your ingested source files from the bronze (raw/ingest) layer and then call this notebook for each file to add them to the Silver layer. We can also add the silver tables directly to the Lake database for easy querying later on.

Note: that this example is a technical source based Silver layer. So not realy cleansed, curated or conformed.

1) Create notebook
Go to the developer tab in Synapse and create a new Notebook. Give it a suitable name and make sure the language is PySpark. Sooner or later you want to test this Notebook, so attach it to a SparkPool. Optionally you can add a Markdown cell to explain this notebook.
New Synapse Notebook









2) Code cell 1: parameters
The first code cell is for the parameters that can be overridden by parameters from the Notebook activity in the pipeline. Toogle the parameters option to make is a parameter cell. For more details see our post about notebook parameters. For debugging within the notebook we used real values.

For this example everything (bronze and silver) is in the same container. So you might want to add more parameters to split those up. This example uses parquet files as a source. If you want for example CSV then you need to change the format in the mail code to fill the Spark Data Frame with data.
# path of the data lake container (bronze and silver for this example)
data_lake_container = 'abfss://mysource@datalakesvb.dfs.core.windows.net'
# The ingestion folder where your parquet file are located
bronze_folder = 'Bronze'
# The silver folder where your Delta Tables will be stored
silver_folder = 'Silver'
# The name of the table
table_name = 'SalesOrderHeader'
# The wildcard filter used within the bronze folder to find files
source_wildcard = 'SalesOrderHeader*.parquet'
# A comma separated string of one or more key columns (for the merge)
key_columns_str = 'SalesOrderID'
Parameters








3) Code cell 2: import modules and functions
The second code cell is for importing all required/useful modules. For this basic example we have only one import:
# Import modules
from delta.tables import DeltaTable
Import Delta Table module








3) Code cell 3: filling delta lake
Now the actual code for filling the delta lake tables with parquet files from the data lake. Note: code is very basic. It checks whether the Delta Lake table already exists. If not it creates the Delta Lake table and if it already exists it merges the new data into the existing table. If you have transactional data then you could also do an append instead of a merge.

# Convert comma separated string with keys to array
key_columns = key_columns_str.split(',')  

# Convert array with keys to where-clause for merge statement
conditions_list = [f"existing.{key}=updates.{key}" for key in key_columns]

# Determine path of source files from ingest layer
source_path = data_lake_container + '/' + bronze_folder + '/' + source_wildcard 

# Determine path of Delta Lake Table 
delta_table_path = data_lake_container + '/' + silver_folder + '/' + table_name

# Read file(s) in spark data frame
sdf = spark.read.format('parquet').option("recursiveFileLookup", "true").load(source_path)

# Check if the Delta Table exists
if (DeltaTable.isDeltaTable(spark, delta_table_path)):
    print('Existing delta table')
    # Read the existing Delta Table
    delta_table = DeltaTable.forPath(spark, delta_table_path)

    # Merge new data into existing table
    delta_table.alias("existing").merge(
        source = sdf.alias("updates"),
        condition = " AND ".join(conditions_list)
        
    ).whenMatchedUpdateAll(
    ).whenNotMatchedInsertAll(
    ).execute()

    # For transactions you could do an append instead of a merge
    # sdf.write.format('delta').mode('append').save(delta_table_path)

else:
    print('New delta table')
    # Create new delta table with new data
    sdf.write.format('delta').save(delta_table_path)
Adding data to new or existing Delta Table



















4) Code cell 4: Adding Delta Table to Lake Database
The last step is optional, but very useful: adding the Delta Table to the Lake Database. This allows you to query the Delta Table by its name instead of its path in the Data Lake. Make sure you first add a Silver layer to that Lake database. See this post for more details (step 1).
# Adding the Delta Table to the Delta Database for easy querying in other notebooks or scripts within Synapse.
spark.sql(f'CREATE TABLE IF NOT EXISTS Silver.{table_name} USING DELTA LOCATION \'{delta_table_path}\'')

# Spark SQL version
#  CREATE TABLE Silver.MyTable
#  USING DELTA
#  LOCATION 'abfss://yourcontainer@yourdatalake.dfs.core.windows.net/Silver/MyTable'
Adding Delta Table to Lake Database








5) Creating Pipeline
Now it is time to loop through your ingested files and call this new Notebook for each file to create the Silver Layer Delta Tables. You have to provide values for all parameters in the notebook. Since you need the key column(s) of each table to do the merge you probably need to store these somewhere.

For the ingestion we often store the table/file names from each source that we want to download to the data lake in a meta data table. In this table we also store the key column(s) from each table.

Call Notebook in ForEach loop













Synapse doesn't retrieve the parameters from the Notebook. You have to add them manually as Base parameters in the Settings tab.
Calling Notebook
















If you enter a column or set of columns for the key that are not unique you will get an error the second time you run (first time the merge is not used). 
Cannot perform Merge as multiple source rows matched and attempted to modify the same target row in the Delta table in possibly conflicting ways. By SQL semantics of Merge, when multiple source rows match on the same target row, the result may be ambiguous as it is unclear which source row should be used to update or delete the matching target row. You can preprocess the source table to eliminate the possibility of multiple matches.

6) Result
Now you can run your pipeline and check whether the silver layer of you Lake database is populated with new tables. And you can create a new notebook with Spark SQL or PySpark to check the contents of the tables to see wether the Time Travel works.
Running the pipeline that calls the new Notebook












Delta Lake folders in the Data Lake





















Conclusions
In this post you learned how to create and populate a (source based) silver layer of your Lake House Delta Tables. An easy quick start for your lake house. If you have multiple sources with similar data then you should also consider creating a real cleansed, curated and conformed silver layer manually. In a later post we will show you some of those manual steps in Spark SQL or PySpark.

Special thanks to colleague Heleen Eisen for helping out with the PySpark.





Sunday, 19 March 2023

Synapse - Add existing Delta Table to Lake Database

Case
How can I query my lake house files and tables in Synapse without specifying the entire data lake path for each file or table?
Querying your Lake House in Synapse













Solution
In the Data tab (left menu) of the Synapse Workspace you can create a Lake database and then add your files and tables to it. By default there is already a Lake Database present called 'default', but it won't be visible until you add tables to it or add other databases.

1) Create database
Let's create a new database. We will create one for bronze, silver and gold. For the files we could create a separate database called Ingest for easy querying. In the documentation they will either mention a CREATE DATABASE or a CREATE SCHEMA command. They are the same thing.
  • Create a new notebook with either SPARK SQL or PySpark as language.
  • Attach it to a spark pool
  • In the first Code block add the code below and execute it. The IF NOT EXISTS and the COMMENT are optional.
%%sql
CREATE DATABASE IF NOT EXISTS Ingest COMMENT 'Raw files';
CREATE DATABASE IF NOT EXISTS Bronze COMMENT 'Raw Layer';
CREATE DATABASE IF NOT EXISTS Silver COMMENT 'Validated Layer';
CREATE DATABASE IF NOT EXISTS Gold COMMENT 'Enriched Layer';
%%pyspark
spark.sql(f'CREATE DATABASE IF NOT EXISTS Ingest COMMENT \'Raw files\';');
spark.sql(f'CREATE DATABASE IF NOT EXISTS Bronze COMMENT \'Raw Layer\';');
spark.sql(f'CREATE DATABASE IF NOT EXISTS Silver COMMENT \'Validated Layer\';');
spark.sql(f'CREATE DATABASE IF NOT EXISTS Gold COMMENT \'Enriched Layer\';');
SPARK SQL Code block









Now go to the Data and then you will see a Lake database list with 4 databases (default + the three you created)
Synapse Lake databases









2) Create table on Parquet file
For the raw files we will create a table base on the parquet (or flat) files from your ingestion from the source into the data lake. If the file has timestamp in the name you could even use a wildcard in the path.
  • Go back to your notebook and create a second Code block
  • Add the code below and execute it. It will create a table in the Ingest Lake database
  • Then go to the Data tab and then unfold the Ingest database and then its tables (if you are to fast then you might have to refresh the tables list.
%%sql
CREATE TABLE IF NOT EXISTS Ingest.Cities
USING PARQUET
Location 'abfss://mysource@mydatalake.dfs.core.windows.net/Ingestfolder/Cities*.parquet'
%%pyspark
spark.sql(f'CREATE TABLE IF NOT EXISTS Ingest.{table_name} USING PARQUET LOCATION \'{parquet_path}\'')

Lake table based on a Parquet file from the data lake









3) Create table on Delta Table
For the Bronze (or Silver or Gold) layer we will create a table based on an existing Delta Table from the data lake.
  • Go back to your notebook and create a third Code block
  • Add the code below and execute it. It will create a table in the Bronze Lake database
  • Then go to the Data tab and then unfold the Bronze (or Silver or Gold) database and then its tables (if you are to fast then you might have to refresh the tables list.
%%sql
CREATE TABLE IF NOT EXISTS Bronze.Cities
USING DELTA
Location 'abfss://mysource@mydatalake.dfs.core.windows.net/Bronze/Cities'
$$pyspark
spark.sql(f'CREATE TABLE IF NOT EXISTS Bronze.{table_name} USING DELTA LOCATION \'{delta_table_path}\'')

Lake table based on a Delta Table from the data lake










4) Query the new tables
Now you can query the files and delta tables like a regular database table in either a notebook with a Spark pool running or a SQL script. There is one difference between those two. In the SQL Script you get the default dbo schema between the database name and the table name. This in mandatory for the SQL script, but not allowed in a notebook.
Query in notenbook with SPARK SQL





















Query in a SQL Script











5) Create views on new tables
You can also create views on those new tables where you already add some business logic that is usefull for other colleagues working with this data or you can create views for dimensions and facts. You could even create views for dimensions and facts on your silver table and then serve them to Power BI
Create view on new tables




















Conclusion
In this post you learned how to make your data lake files and delta lake tables easier to query. A few extra steps for each table, but after that querying is much easier. In a future post we will show you how to do this directy during the INGEST and DELTA steps. Then you don't have any manual steps for each new file or table.

In the next post we will show you how to use timetravel on those Delta Tables with SPARK SQL. This post is the start position of that post.

Special thanks to colleague Martijn Broeks for helping out.

Sunday, 5 February 2023

Streaming data Azure & Power BI - Introduction

Case
I want to send streaming data to Power BI for reporting purposes. What should I take into account when choosing the right architecture?
Streaming Data to Power BI













Solution
If you for example have a helpdesk for your customers where they can call or chat for support then you probably also want some real time reports to see the current state of the calls and chats. Most regular Data Warehouses are often only refreshed once a night and then it's already too late to react to incidents.

For real time reports in Power BI have two main options. The first option is to send the events directly to a Power BI Streaming dataset (Push or Streaming) and then build a report and pin reports visuals to a dashboard. This is an appropriate solution for a lot of real time reports, but there are some limitations. For example there is a maximum number of events per second. Once you exceed that limit you start loosing data. Propably just when you need accurate reports the most: when it is very busy in your helpdesk. An other limitation for streaming datasets in Power BI is the history. It keeps only one hour of data.

The second option is to push the data into Azure Event Hubs and then use Azure Stream Analytics to push it to Power BI. This solves the max number of events per second because Stream Analytics can aggregate or filter the data before sending it to Power BI and Stream Analytics can also send it to for example a data lake to solve your history problem.
Streaming Data to Power BI












In this streaming data series we will explain this second option focussing on the hot path and the capture in the data lake which is part of the cold path. Just like for a 'regular' data warehouse architecture there are a lot of different solutions, but this one is probably the most common and simple solution that will fit the majority of cases. One particular new streaming data feature in Azure that is worth mentioning, is writing to a Delta Lake table. At the moment of writing this is still in public preview and only available in a limited number of Azure regions, but this will fit the Lake House architecture very well.

Posts in this series:






Monday, 7 November 2022

Create Data Lake containers and folders via DevOps

Case
I need a process to create Azure Data Lake containers throughout my DTAP environment of my Azure Data Platform. Manually is not an option because we want to minimize owner and contributor access to the Data Lake of acceptance and production, but Synapse and Data Factory don't have a standard activity to create ADL containers. How to automatically create Azure Data Lake Containers (and folders) ?
Storage Account (datalake) containers














Solution
An option is to use a PowerShell script that is executed by the Custom activity in combination with an Azure Batch service. Or an Azure Automation runbook with the same PowerShell script that is executed by a Web(hook) activity.

However since you probably don't need create new containers during every (ADF/Synapse) pipeline run, we suggest to do this via an Azure Devops Pipeline as part of your CICD proces with the same PowerShell script. You could either create a separte CICD pipeline for it or integrate it in your Synapse or ADF pipeline.

The example below creates containers and optionaly also folders and subfolders within these container. Synapse and Data Factory will create folders with forexample the Copy Data activity

1) Repos folder structure
For this example we use a CICD folder in the repos with subfolders for PowerShell, YAML and Json.
Repos folder structure




















2) JSON config
Because we don't want to hardcode the containers and folders we use a JSON file as input for the PowerShell script. This JSON file is stored within the JSON folder of the DevOps Repository. We use the same JSON file for the entire environment, but you can ofcourse create a separate file for each environment if you need for example different containers on production. Our file is called config_storage_account.json

The folder array in this example is optional and when left empty no folders will be created. You can create subfolders within folders by separating them with a forwardslash.
{
"containers":   {
                "dataplatform":["folder1","folder2/subfolder1","folder2/subfolder2"]
                , "SourceX":["Actual","History"]
                , "SourceY":["Actual","History"]
                , "SourceZ":[]
                }
}

3) PowerShell code
The PowerShell script called SetStorageAccounts.ps1 is stored in the PowerShell folder and contains three parameters:
  • ResourceGroupName - The name of the resource group where the storage account is located.
  • StorageAccountName - The name the storage account
  • JsonLocation - The location of the json config file in the repos (see previous step)
It checks the existance of both the config file and the storage account. Then first loop through the containers from the config and within the container loop it loops through the folders of that specific container. For container names and folderpaths it does some small corrections for often made mistakes.

Note that the script will not delete containers and folders (or set authorizations to them). This is of course possible, but make sure to test this very thoroughly and even with testing a human error in configuring the config file is easy to make and could cause lots of data lose!
# This PowerShell will create the containers provided in the JSON file
# It does not delete of update containers and folders or set authorizations
param (
    [Parameter (Mandatory = $true, HelpMessage = 'Resource group name of the storage account.')]
    [ValidateNotNullOrEmpty()]
    [string] $ResourceGroupName,

    [Parameter (Mandatory = $true, HelpMessage = 'Storage account name.')]
    [ValidateNotNullOrEmpty()]
    [string] $StorageAccountName,

    [Parameter (Mandatory = $true, HelpMessage = 'Location of config_storage_account.json on agent.')]
    [ValidateNotNullOrEmpty()]
    [string] $JsonLocation
 )

# Combine path and file name for JSON file. The file name is hardcoded and the
# same for each environment. Create an extra parameters for the filename if
# you need different files/configurations per environment.
$path = Join-Path -Path $JsonLocation -ChildPath "config_storage_account.json"
Write-output "Extracting containernames from $($path)"


# Check existance of file path on the agent
if (Test-Path $path -PathType leaf) {
    
    # Get all container objects from JSON file
    $Config = Get-Content -Raw -Path $path | ConvertFrom-Json

    # Create containers array for looping
    $Config | ForEach-Object { 
        $Containers = $($_.containers) | Get-Member -MemberType NoteProperty | Select-Object -ExpandProperty Name
    }
    
    # Check Storage Account existance and get the context of it
    $StorageCheck = Get-AzStorageAccount -ResourceGroupName $ResourceGroupName -Name $StorageAccountName -ErrorAction SilentlyContinue        
    $context = $StorageCheck.Context

    # If Storage Account found
    if ($StorageCheck) {
        # Storage Account found
        Write-output "Storage account $($StorageAccountName) found"

        # Loop through container array and create containers if the don't exist
        foreach ($container in $containers) {
            # First a little cleanup of the container

            # 1) Change to lowercase
            $container = $container.ToLower()

            # 2) Trim accidental spaces
            $container = $container.Trim()


            # Check if container already exists
            Write-output "Checking existence of container $($container)"
            $ContainerCheck = Get-AzStorageContainer -Context $context -Name $container -ErrorAction SilentlyContinue 

            # If container exists
            if ($ContainerCheck) {
                Write-Output "Container $($container) already exists"
            }
            else {
                Write-Output "Creating container $($container)"
                New-AzStorageContainer -Name $container -Context $context | Out-Null
            }

            # Get container folders from JSON
            Write-Output "Retrieving folders from config"
            $folders = $Config.containers.$container
            Write-Output "Found $($folders.Count) folders in config for container $($container)"

            # Loop through container folders
            foreach ($folder in $folders) {
                # First a little cleanup of the folders

                # 1) Replace backslashes by a forward slash
                $path = $folder.Replace("\","/")

                # 3) Remove unwanted spaces
                $path = $path.Trim()
                $path = $path.Replace("/ ","/")
                $path = $path.Replace(" /","/")

                # 3) Check if path ends with a forward slash
                if (!$path.EndsWith("/")) {
                    $path = $path + "/"
                }
                    
                # Check if folder path exists
                $FolderCheck = Get-AzDataLakeGen2Item -FileSystem $container -Context $context -Path $path  -ErrorAction SilentlyContinue
                if ($FolderCheck) {
                    Write-Output "Path $($folder) exists in container $($container)"
                } else {
                    New-AzDataLakeGen2Item -Context $context -FileSystem $container -Path $path -Directory | Out-Null
                    Write-Output "Path $($folder) created in container $($container)"
                }
            }

        }
    } else {
        # Provided storage account not corrrect
        Write-Output "Storageaccount: $($StorageAccountName) not available, containers not setup."
    }
} else {
    # Path to JSON file incorrect
    Write-output "File $($path) not found, containers not setup."
}
4) YAML file.
If you integrate this in your existing Data Factory or Synapse YAML pipeline then you only need to add one PowerShell step. Make sure you have a checkout step to copy the config and powershell file from the repository to the agent. You may also want to add a (temporary) treeview step to check the paths on your agent. This makes it easier to configure paths within your YAML code.
parameters:
  - name: SerCon
    displayName: Service Connection
    type: string
  - name: Env
    displayName: Environment
    type: string
    values: 
    - DEV
    - ACC
    - PRD
  - name: ResourceGroupName
    displayName:
    type: string
  - name: StorageAccountName
    displayName:
    type: string

jobs:
    - deployment: deploymentjob${{ parameters.Env }}
      displayName: Deployment Job ${{ parameters.Env }} 
      environment: Deploy to ${{ parameters.Env }}

      strategy:
        runOnce:
          deploy:
            steps:
            ###################################
            # 1 Check out repository to agent
            ###################################
            - checkout: self
              displayName: '1 Retrieve Repository'
              clean: true 

            ###################################
            # 3 Show environment and treeview
            ###################################
            - powershell: |
                Write-Output "Deploying Synapse in the ${{ parameters.Env }} environment"
                tree "$(Pipeline.Workspace)" /F
              displayName: '2 Show environment and treeview Pipeline_Workspace'

            ###################################
            # 3 Create containers in datalake
            ###################################
            - task: AzurePowerShell@5
              displayName: '3 Create data lake containers'
              inputs:
                azureSubscription: ${{ parameters.SerCon }}
                scriptType: filePath
                scriptPath: $(Pipeline.Workspace)\s\CICD\PowerShell\SetStorageAccounts.ps1
                scriptArguments:
                  -ResourceGroupName ${{ parameters.ResourceGroupName }} `
                  -StorageAccountName ${{ parameters.StorageAccountName }} `
                  -JsonLocation $(Pipeline.Workspace)\s\CICD\Json\
                azurePowerShellVersion: latestVersion
                pwsh: true

5) The result
Now it's time to run the YAML pipeline and check the Storage Account to see wether the containers and folders are created.
DevOps logs of creating containers and folders















Created data lake folders in container














Conclusion
In this post you learned how to create containers and folders in the Storage Account / Data Lake via a little PowerShell script and a DevOps pipeline, but you can also reuse this PowerShell script in for the mentioned alternative solutions.

Again the note about also deleting containers and folders. Make sure to double check the code, but also the procedures to avoid human errors and potenially loose a lot of data. You might want to setup soft deletes in your storage account to have a fallback scenario for screwups.

Sunday, 13 February 2022

ADF Snack - Empty file when creating new folder

Case
I'm adding files to a Data Lake container via ADF, but when creating a new folder I'm also getting an empty file with the same name as the folder. How do I avoid the creation of these unwanted empty files?
Unwantend empty files with same name as folders










Solution
You are probably using an Azure Storage Account V2 (general purpose v2) without the hierarchical namespace setting enabled. You can check that on the Overview page of the Storage Account.
Hierarchical namespace disabled
















To solve this you need to enable that setting in your storage account. You can do that bij clicking on 'Disabled' behind Hierarchical namespace on the Overview page of the Storage Account. However there are some options that don't work with the hierarchical namespace like 'point-in-time restore for containers', 'versioning for blobs', 'blob change feed' and 'version-level immutability support''. When you enable the hierarchical namespace setting you will get a warning about this so that you can disable those settings. After Step 2 Validate account you can donwload a json file all the stuff you need to fix first.
Upgrading your storage account














Make sure to consult other users / Data Contributors of that Data Lake before changing those settings or at a last resort you could just create a new separate Storage Account if you are not sure about the settings. An other option could be to create a Python or PowerShell script to clean up the empty files.

Conclusion
In this post your learned how to fix an annoying needless empty file 'bug' for each directory. This is especially handy when you have those flexible pipelines that store your files in neat folder structures per year/month/day. It is just one setting, but it could have consequences for other settings. Thanks to colleague Derek Groen for finding the right setting.




Saturday, 12 February 2022

ADF Snack - Give ADF access to Data Lake via MSI

Case
I don't want to use the Access Keys to access my Data Lake with Azure Data Factory. Is there a better alternative?

Don't use the Access Keys














Solution
There are various options to authorize access to your Storage Account, but using Managed Service Identity (MSI) is probably the easiest and safest. This is because you don't need to use any secrets like passwords or keys that could end up in the wrong hands. This means you give a specific ADF access to your Storage Account via its Managed Service Identity (MSI). Each deployed ADF can be found in the Azure Actice Directory and you can assign a role like Storage Blob Data Contributor or Storage Blob Data Reader to that ADF and give for example access to an entire Storage Account or a specific container.

For this example we have an existing Azure Storage Account (general purpose v2) and an existing Azure Data Factory. We will give ADF write access to the entire storage account. Networking (nsg, VNETs, subnets, etc.) is out of scope for this example.

1) Access Control (IAM)
First step is to configure authorize your Data Factory within the stored account. This is where we will give ADF the role Storage Blob Data Contributor. This will allow ADF to read, write and delete Azure Storage containers and blobs. There is also an optional conditions step where you can add additional rules. 
  • Go to the Azure Portal and then to the Storage Account where ADF needs access to
  • In the left menu click on Access Control (IAM)
  • Click on +Add and choose Add role assignment
  • Select the required role (Storage Blob Data Contributor for this example) and click Next
  • Now first check the Managed identity radio button and then click on +Select members 
  • In the Managed Identity field on the right side select Data Factory and then search for your ADF by name. One or more factories will appear in a list.
  • Click on your ADF and then on the Select button.
  • A description is optional, but it could be handy later on.
  • Now click on Next and optionally add one or more conditions. In the example we wont be adding conditions.
  • Next click on Review + assign to finish
Give ADF access to Data Lake















2) Test ADF Linked Service
Now go to Data Factory and create a new Linked Service to your Data Lake. Make sure the Authenication Method is Managed Idenity. After that you need to select your Data Lake and hit the Test Connection button.
Create Linked Service to test connection














If you get a 'forbidden' error like below then:
  • Check whether you selected the right ADF under step 1 and the correct Storage Account under step 2.
  • Make sure you checked Managed Identity under step 1 (and not 'User, group or service principal')
  • Test the network settings for example by creating a linked service with the Account Key.
24200 - ADLS Gen2 operation failed for: Storage operation
'' on container 'testconnection' get failed with
'Operation returned an invalid status code 'Forbidden''.
Possible root causes: (1). It's possible because the
service principal or managed identity don't have enough
permission to access the data. (2). It's possible because
some IP address ranges of Azure Data Factory are not
allowed by your Azure Storage firewall settings. Azure
Data Factory IP ranges please refer https://docs.micro..
Account: 'abcdefg'. ErrorCode:
'AuthorizationPermissionMismatch'. Message: 'This request
is not authorized to perform this operation using this
permission.'. RequestId: '19e44b-601f-0029-4e30-000000'.
TimeStamp: 'Sat, 12 Feb 2022 16:52:22 GMT'.. Operation
returned an invalid status code 'Forbidden' Activity
ID: 94880d51-g593-418a-8q16-e7dab5c013f3.


































Conclusion
In this blog post you learned how to give ADF access to your Storage Account via its Managed Service Identity. This is probably the easiest and safest way to authorize ADF. You could use the same trick to for example give ADF access to your Azure SQL Database.

Sunday, 5 December 2021

Delta Lake support in Azure Synapse Analytics

Case
Delta Lake is already widely used in Azure Data Bricks, but now it is also available in Synapse Analytics. How can I use it there to store history and do 'time travel' in my historical data?
Synapse now supports Delta Lake











Solution
Delta Lake is now so called General Available (GA) in Synapse Analytics, but at the time of writing Microsoft is still implementing new Delta Lake features in Synapse. 

This example uses a Synapse Pipeline with a Copy Data Activity to ingest data from the source and then calls a Python Delta Lake script (other languages are possible) via the Notebook activity.












Prerequisites
You need to make sure that you (for debugging) and the Managed Service Identity (MSI) of your Synapse Analytics workspace have access to the Azure Data Lake with the Role Storage Blob Data Contributor.
  • In the Azure Portal go to the Storage Account used by the Synapse Analytics workspace
  • In the left menu click on Access Control (IAM)
  • Click on + Add and choose Add role assignment
  • Search for Storage Blob Data Contributor, select the role and click on Next
  • Click on + Select members and find your Synapse workspace and find yourself and click Select
  • Optionally add an description about the why. Then click on Review + assign (twice)
At time of writing our Apache Spark Pool uses version 3.1 with Delta Lake 1.0. If you are using an older version (2.4) of Spark then you get Delta Lake version 0.6 which is slightly different. If newer versions appear then just try the newest Spark Pool.
Apache Spark pool Additional Settings




















1) Code cell 1: parameters
The first code cell is for the parameters that can be overridden by parameters from the Notebook activity in the pipeline. For more details see our post about notebook parameters. For debugging within the notebook we used real values.
# path of the data lake container
data_lake_container = 'abfss://yourbronzecontainer@yourdatalake.dfs.core.windows.net'

# The ingestion folder where your parquet file are located
ingest_folder = 'parquetstage'

# The bronze folder where your Delta Tables will be stored
bronze_folder = 'bronze'

# The name of the table
table_name = 'residences'

# The wildcard filter used within the bronze folder to find files
source_wildcard = 'residences*.parquet'

# A comma separated string of one or more key columns (for the merge)
key_columns_str = 'Id'
Parameters









2) Code cell 2: import modules and functions
The second code cell is for importing all required/useful modules. For this basic example we two import s:
  • DeltaTable.delta.tables for handling delta tables
  • notebookutils for file system utilities (removing delta table folder)
# Import modules
from delta.tables import DeltaTable
from notebookutils import mssparkutils
Imports







3) Code cell 3: filling delta lake
Now the actual code for filling the delta lake tables with parquet files from the data lake. Note: code is very basic. It checks whether the Delta Lake table already exists. If not it creates the Delta Lake table and if it already exists it merges the new data into the existing table. If you have transactional data then you could also do an append instead of a merge.

# Convert comma separated string with keys to array
key_columns = key_columns_str.split(',')  
 
# Convert array with keys to where-clause for merge statement
conditions_list = [f"existing.{key}=updates.{key}" for key in key_columns]
 
# Determine path of source files from ingest layer
source_path = os.path.join(data_lake_container_bronze, ingest_folder, source_wildcard)
 
# Determine path of Delta Lake Table 
delta_table_path = os.path.join(data_lake_container_bronze, bronze_folder, table_name)

# Read file(s) in spark data frame
sdf = spark.read.format('parquet').option("recursiveFileLookup", "true").load(source_path)
 
# Check if the Delta Table exists
if (DeltaTable.isDeltaTable(spark, delta_table_path)):
    print('Existing delta table')
    # Read the existing Delta Table
    delta_table = DeltaTable.forPath(spark, delta_table_path)
 
    # Merge new data into existing table
    delta_table.alias("existing").merge(
        source = sdf.alias("updates"),
        condition = " AND ".join(conditions_list)
         
    ).whenMatchedUpdateAll(
    ).whenNotMatchedInsertAll(
    ).execute()
 
    # For transactions you could do an append instead of a merge
    # sdf.write.format('delta').mode('append').save(delta_table_path)
 
else:
    print('New delta table')
    # Create new delta table with new data
    sdf.write.format('delta').save(delta_table_path)
Adding file to Delta Lake
















4) Viewing the Delta Table in notebook
If you run the notebook with the code of the first three steps a couple of times with changed/extra/less records then history will be build in the delta table. For debugging purposes you can add an extra code cell to view the data and the various versions of the data.

To check the current version of the data you can use the following code:
display(spark.read.format('delta').load(delta_table_path))
Get current version of data













And with this code you can investigage the history versions of the data. In this case there are two versions:
# Get all versions
delta_table = DeltaTable.forPath(spark, delta_table_path)
display(delta_table.history())
Get versions of data








To retrieve one specific version you could use something like this (where the 0 is the version from the above picture):
# Get one specific version
display(spark.read.format("delta").option("versionAsOf", "0").load(delta_table_path))
Get specific version of data












You can also use a datetime to retrieve data from the Delta Lake by using timestampAsOf instead of versionAsOf:
# Get one specific version with timestamp filter
display(spark.read.format("delta").option("timestampAsOf", "2021-12-05 19:07:00.000").load(delta_table_path))
Get specific version of data with datetime filter













To remove the entire Delta Lake table (and all history) you could use something like:
# Delete Delta Table (folder)
mssparkutils.fs.rm(delta_table_path, recurse=True)
Delete Delta Table







4) Viewing the Delta Table in Serverless SQL Pool
At the moment of writing you can query the Detla Lake in a Serverless SQL Pool, but you cannot yet use the 'time-travel' feature. Please upvote this feature here.

The first option is to use an OPENROWSET query within a SQL Script in your Synapse Workspace:
-- Query the Delta Lake
SELECT TOP 10 *
FROM OPENROWSET(
    BULK 'abfss://yourcontainer@yourdatalake.dfs.core.windows.net/deltalake/places/',
    FORMAT = 'delta') as rows
ORDER BY Id;
Query the Delta Lake via an OPENROWSET query



















A second option is using Polybase by creating an External Table on the Delta Lake. This does requery you to create a database within the Serverless SQL Pool because you can't do that on the master database.
-- Query the Delta Lake

-- Create database because it wont work on the master database
CREATE DATABASE MyDwh;

-- Create External Data Source
CREATE EXTERNAL DATA SOURCE DeltaLakeStorage
WITH ( location = 'abfss://yourcontainer@yourdatalake.dfs.core.windows.net/deltalake/' );

-- Create External File Format
CREATE EXTERNAL FILE FORMAT DeltaLakeFormat
WITH ( FORMAT_TYPE = DELTA );

-- Create External Table
CREATE EXTERNAL TABLE Residence (
     Id int,
     Residence VARCHAR(50)
) WITH (
        LOCATION = 'places', --> the root folder containing the Delta Lake files
        data_source = DeltaLakeStorage,
        FILE_FORMAT = DeltaLakeFormat
);

-- Get Data from your Delta Lake Table
SELECT          TOP 10 * 
FROM            Residence
ORDER BY        Id

























Conclusion
In this post you learned how to create and query a Delta Lake within your Synapse Analytics Workspace. The main advantage is of course that you now don't need Azure Data Bricks if you are already using Synapse. Making your Data Platform architecture just slightly more clearer and easier. 

A disadvantage, at the moment of writing, is the lack of time-traveling withing the Serverless SQL Pool environment. This means you're now forced to use notebooks to create your Data Warehouse when the latest version of your data is just not enough. So please upvote this feature here. There are some more limitations and know issues in the current version, but we think at least some of them will be solved in feature updates.

Thanks to colleague Jeroen Meidam for helping!


Tuesday, 29 December 2020

ADF Snack: get files of last hour

Case
I have a Data Factory pipeline that should run each hour and collect all new files added to the data lake since the last run. What is the best activity or do we need to write code?
No scripts, no loops





Solution
The Copy Data Activity has a wildcard filter which allows you to read multi files (of the same type/format) at once. So no need for a ForEach Activity to process multiple files at once. Combine that with the start- and enddate filter option within that same Copy Data Activity and you can limit the files to a certain period.

Date filter
The End datetime property will be populated with the start-datetime of the current ADF pipeline. So files added during the run of the pipeline will be skipped and processed during the next run. This End datetime will also be stored afterwards for the next run.
The Start datetime will be retrieved from a database table. The previous run of the pipeline stored its End datetime as the Start datetime for the next run.
The basic setup





1) Table and stored procedures
To store (and retrieve) the datetime from the pipeline we use a database table and some Stored Procedures. To keep it easy we kept it very very basic. Feel free to extend it to your own needs. In this solution there will be only one record per source. The SetLastRun stored procedure will either insert a new record or update the existing record via a MERGE statement. The GetLastRun stored procedure will retrieve the datetime of the last run and return a default date if there is no record available.
-- Create runs table
CREATE TABLE [dbo].[Runs](
	[SourceName] [nvarchar](50) NOT NULL,
	[LastRun] [datetime2](7) NULL,
	CONSTRAINT [PK_Runs] PRIMARY KEY CLUSTERED 
	(
	[SourceName] ASC
	)
)

-- Save the lastrundate
CREATE PROCEDURE SetLastRun
(
    @SourceName as nvarchar(50)
,	@LastRun as datetime2(7)
)
AS
BEGIN
    -- SET NOCOUNT ON added to prevent extra result sets from
    -- interfering with SELECT statements.
    SET NOCOUNT ON

	-- Check if there already is a record
	-- Then Insert of Update a record
    MERGE dbo.Runs AS [Target]
    USING (SELECT @SourceName, @LastRun) AS [Source] ([SourceName], [LastRun])  
    ON ([Target].[SourceName] = [Source].[SourceName])  
    WHEN MATCHED THEN
        UPDATE SET [LastRun] = [Source].[LastRun]  
    WHEN NOT MATCHED THEN  
        INSERT ([SourceName], [LastRun])  
        VALUES ([Source].[SourceName], [Source].[LastRun]);  
END
GO

-- Retrieve the lastrundate
CREATE PROCEDURE GetLastRun
(
    @SourceName as nvarchar(50)
)
AS
BEGIN
	DECLARE @DefaultDate as datetime2(7) = '1-1-1900'

	-- Retrieve lastrun and provide default date in case null
	SELECT	ISNULL(MAX([LastRun]), @DefaultDate) as [LastRun]
	FROM	[Runs] 
	WHERE	[SourceName] = @SourceName
END
GO

2) Retrieve datetime last run
So the first step in the pipeline is to execute a stored procedure that retrieves the End datetime of the previous run. As mentioned a default datetime will be returned if there is no previous run available. Due the lack of getting output parameters we will use the Lookup activity instead of the Stored Procedure activity.
  • Add a Lookup activity to the pipeline and give it a descriptive name
  • On the Settings tab add or reuse a Source dataset (and Linked service) that points to the database containing the table and store procedures of the previous step (don't point to a specific table).
  • Choose Stored Procedure under the Use query property
  • Select 'GetLastRun' as Stored procedure name and hit the Import button to get the paramaters from the stored procedure
  • Now either use a hardcoded source name or use an expression like @pipeline().Pipeline to for example use the pipeline name as source.
Execute Stored Procedure via Lookup to retrieve last rundate



















3) Copy Data Activity
The second step is to retrieve the actual data from the data lake with a Copy Data activity. With two expressions it will first retrieve the datetime of the previous step and use it as the starttime filter and secondly retrieve the Start datetime of the pipeline itself and use that as Endtime filter.
  • Add the Copy Data Activity and set it up to load a specific file from the data lake to a SQL Server table (or your own destination)
  • Now on the Source tab change the File path type to Wildcard file path
  • Then set the Wildcard file name to for example read all CSV files with *.csv instead of a specific file.
  • Next set the Start time (UTC) property under Filter by last modified to the following expression:
    @activity('Get Last Run').output.firstRow.LastRun. Where the yellow marked text is the name of the previous task and the green marked text is the output of the Stored Procedure (more details here).
  • Also set the End time (UTC) property with the following expression:
    @pipeline().TriggerTime (this will get the actual starttime of the pipeline)
  • You also might want to add an extra metadata column with the Filename via the Additional columns option (more details here).
Set up wildcard and datetime filters





















4) Store datetime for next run
The last step is to save the Start datetime of the pipeline itself as run datetime so that it can be retrieved in the next run. Since this Stored Procedure doesn't have any output parameters we can use the standard Stored Procedure Activity.
  • Add the Stored Procedure activity and connect it to the previous activity
  • On the Settings tab reuse the same Linked service as in step 2
  • Select SetLastRun as the Stored procedure name
  • Hit the import button and set the parameters
    • LastRun should be filled with the startdatetime of the pipeline: @pipeline().TriggerTime
    • SourceName should be filled with the same expression as in step 2
Add Stored Procedure to save rundate
















5) Schedule
Now just schedule your pipeline every x minutes or x hours with a trigger to keep your database table up-to-date with files from the data lake. Then keep adding files to the data lake and watch your runs table (step 1) and the actual staging table to see the result. The optional metadata column of step 3 should make debugging and testing a lot easier.


Summary
In this post you learned how to use the wildcard and filter option of the Copy Data activity to create a mechanism to keep your data up-to-date. A downside of this solution is that it will sometimes run the pipeline unnecessarily because no new files where added to the data lake. An other downside is that the process is not realtime.

If you need a more (near-)realtime solution instead of running every x minutes or hours then you can use the trigger solution. Then you process files as soon as they arrive. However that solution has two downsides. First of all you are running the pipeline for each file. Which means you are paying for each file. Secondly there is a limit for the number of files that can be triggered per hour as specially when you don't want (or can't) process files in parallel. The execution queue has a limit of 100 executions per pipeline. After that you will receive an error and miss that file.