Showing posts with label LAKE_HOUSE. Show all posts
Showing posts with label LAKE_HOUSE. Show all posts

Tuesday 26 December 2023

Give Power BI access to Synapse Serverless SQL Pool

Case
I have a Service Account for my data source in the Power BI Gateway, but how do I give this account access to the (External) Tables located in the Synapse Serverless SQL Pool database?
Give Power BI Access to Delta Tables








Solution
If you create a Data Lakehouse with Synapse then at the moment you still need to put the datamart as a set of external tables in the Synapse Serverless SQL Pool to make it easily accesable for Power BI. Giving access to it is very similar to the solution we showed to give readonly access to an Azure SQL Database when you have a Data Warehouse Achitecture. For a Data Lakehouse Architecture we use  External Tables that point to a Delta Table instead of regular tables. This requires an extra step with a Database Scoped Credential.

The starting position is that you have a database in the Synapse Serverless SQL Pool with some external tables pointing to your Delta Tables within your 'golden' container. Below the basic TSQL code for setting up those External Tables.
  1. First step is to create a Master Key. We need this for the second step. You can make it more secure by adding a strong password for encryption.
  2. Second step is to create a Database Scoped Credential with the Managed Service Identity of the underlying Synapse Workspace. This allows us the give the underlying Synapse Workspace access to the 'golden' container instead of giving all the separate users access. There are alternatives so study the options to see which solution fits the security policy of your organization.
  3. Third step is creating an External Data Source that points to your 'golden' container where the Delta Tables are stored. Notice that it uses the credential from the previous step. The location is the URI pointing to a container in a Gen2 Storage Account. ABFSS is short for Azure Blob File System Secure. The format of the URI is:  abfss://[containername]@[storageaccountname].dfs.core.windows.net/
  4. Fourth step is creating an External File Format where you can specify options for your source type (CSV/Parquet/JSON/Delta/etc). In this example the format for the Delta Table is very basic.
  5. The fifth and last step is creating an External Table that points to the Delta Table in your data lake. This means the data stays in the data lake and only the table structure is stored in the database. Notice the reference to your data source and file format from the previous steps. The location in the table is a folder path that starts in the root of your container and points to the Delta Table folder.
-- 1. Create Master Key for using Managed Service Identity access
CREATE MASTER KEY;

-- 2. Create Credential to use Managed Service Identity of Synapse
CREATE DATABASE SCOPED CREDENTIAL [SynapseIdentity]
WITH
    IDENTITY = 'Managed Service Identity'
;

-- 3. Create a Data source pointing to the container of your Delta tables
CREATE EXTERNAL DATA SOURCE [DeltaLocation]
	WITH (
	LOCATION = N'abfss://gold@mydevstorage.dfs.core.windows.net/',
    CREDENTIAL = [SynapseIdentity]
	);

-- 4. Create a File format for Delta tables
CREATE EXTERNAL FILE FORMAT [DeltaFormat]
    WITH (
    FORMAT_TYPE = DELTA,
    DATA_COMPRESSION = N'org.apache.hadoop.io.compress.SnappyCodec'
    );

-- 5. Create an external table
CREATE EXTERNAL TABLE [dbo].[dim_abcdef] (
    [integerfield] INT NULL,
    [stringfield] NVARCHAR (400) NULL,
    [datefield] DATETIME2 (7) NULL
)
    WITH (
    LOCATION = N'/delta/dim_abcdef',
    DATA_SOURCE = [DeltaLocation],
    FILE_FORMAT = [DeltaFormat]
    );

When you have created a whole set of those External Tables for your datamart, then you can add a user to the Serverless SQL Pool database which can be used by Power BI to create the Data Source in the Gateway.
  1. First you need to create a user. For this example we used a user from Microsoft Entra  ID (Azure Active Directory). Therefor you see the FROM EXTERNAL USER. This is because our Synapse Workspace is created with the option 'Microsoft Entra authentication only'. Therefor database users or Shared Access Signatures are not allowed.
  2. The second step is to give this newly added user some access. In this simplified example we give the new user db_datareader access to the entire database. If you have a larger or more complex database then you can make this much more sophisticated by for example giving reader access to only a specific schema or table.
  3. The last step is giving the user permission to use the Database Scoped Credential that was used in the External Data Source that was used in the External Table. This is done with GRANT and the REFERENCES permission. Without this step you will not be able to query the External Tables.
-- 6. Service Account bekend maken op datamart database
CREATE USER [sa-pbi-workspace-d@xyz.com] FROM EXTERNAL PROVIDER;

-- 7. Service Account leesrechten geven op tabellen
ALTER ROLE [db_datareader] ADD MEMBER [sa-pbi-workspace-d@xyz.com]

-- 8. Service Account reference rechten geven op credential van Synapse
GRANT REFERENCES ON DATABASE SCOPED CREDENTIAL::SynapseIdentity TO [sa-pbi-workspace-d@xyz.com]
Conclusion
In this post we showed you how to give a user, that was created for Power BI, read-only access to the tables in your Azure Synapse Serverless SQL Pool database. Very similar to giving access to a regular database. Now you can use this user for your Power BI gateway datasource.

In the future we can hopefully use the Power BI Direct Lake mode on regular Azure Storage Accounts or write with Synapse to the Fabric One Lake.

Sunday 24 December 2023

Deploying Synapse Serverless SQL pool with DevOps

Case
The external tables and other objects in my Synapse Serverless SQL pool are not stored in the GIT repository of Synapse. How do I deploy those objects through the DTAP street?
Release Synapse Serverless SQL Pools with DevOps











Solution
You can use the good old database project in Visual Studio with the SQL Server Data Tools (SSDT) addon to store the External tables, File formats and Data Stores in a Git repository and then use Azure DevOps with YAML pipelines to release the database objects through the DTAP street. Very similar to deploying a regular Azure SQL Database with some minor differences.


1) Download Visual Studio
First download and install Visual Studio 2022 then make sure to add the SQL Server Data Tools (SSDT) extention which will add the Database project option. If you already have Visual Studio 2022 then make sure to update it because versions before 17.7 don't support SSDT for Serverless SQL pools.
Visual Studio 2022 with SQL Server Data Tools

















2) Create repository
This example is for Azure DevOps, but feel free to use Github instead. Create a new repository. In the root we have two folders:
  • CICD: to store all release-related files such as the YAML files. In fact there is a subfolder called YAML to store the .yml files
  • SQL: to store the database project from Visual Studio.
The repository folder structure








After you have created the initial folder structure you need to clone the repository to Visual Studio by hitting the clone button in the upper right corner.
Clone repository to Visual Studio











3) Create new database project
Once in Visual Studio you now have a cloned repos folder. In the SQL folder we will create a new database project.
Cloned DevOps repository in VS2022














Create a new project via the File menu and search for SQL in the upper search textbox.
Create new project
















Create the new SQL Server Database Project in the SQL folder from your repository. Since there will be only one project in the solution, the solution and project are stored in the same folder. Otherwise you will get an additional subfolder level.
Create new SQL Server Database Project
























4) Create Azure DevOps Service Connection
Wihtin your Azure DevOps project click on the Project settings and under Service connections create a new Service connection of the type Azure Resource Manager. You will need an Azure Entra Service Principal for this. Depending on the organization/projectsize/number of environments create one or more Service Connections. Ideally, one per DTAP environment.
Create Service Connection













5) Give Service Principal access to database
Go to Synapse and open a new SQL Script. Then either create a new database in your Serverless SQL pool to store your external tables or use an existing one. Our example database is called datamart since it will host external tables from our gold layer a.k.a. the datamart with facts and dimensions for Power BI.

Then switch to your datamart database. Either via the use-command or via the selectbox. Once you are in your datamart database create an (external) user for your Service Principal (SP) that you used in the  Azure Devops Service connection from the previous step. After that we will need to give the SP enough rights to deploy all objects to this database: db_owner
Give Service Principal access to Serverless SQL pool DB









-- First create a new database (if you don't have one)
CREATE DATABASE datamart;

-- Once created switch to your (new) database
USE datamart;

-- Create a new database use for the Service Principal
-- used in the DevOps Service Connection
CREATE USER [myserviceprincipal] FROM EXTERNAL PROVIDER;

-- Give the Service Principal enough rights to create
-- external resources and a master key
ALTER ROLE [db_owner] ADD MEMBER [myserviceprincipal];
You have to repeat this for all your Serverless SQL pool DB's in your DTAP environment. Note that if you will do the SQL development within Synapse Studio, and not in Visual Studio, then you don't need to deploy to the Development environment and then you also don't need to give a SP access to your development environment. In that case only do this for Test, Acceptance and Production.

Tip: you can also give the SP access within Synapse by either making is Synapse Adminstrator or Synapse SQL administrator. However then it's access for everything wihtin Synapse or all Serverless SQL Pool database within Synapse. If you already are using the same SP to role out Synapse with Infra as Code (with BICEP or Terraform) then it already has the Synapse Admistrator role.

6) Master Key and External Resources
For this example we will use a basic example with the following code. Note the that the DeltaLocation is pointing to the Development environment of our dataplatform. During the deployment we will need to override this since each DTAP enviroment has its own Azure Storage Account.
Create external table on delta table (for Power BI)















-- Create Master Key for using Managed Service Identity access
CREATE MASTER KEY;

-- Create Credential to use Managed Service Identity of Synapse
CREATE DATABASE SCOPED CREDENTIAL [SynapseIdentity]
WITH
    IDENTITY = 'Managed Service Identity'
;

-- Create a File format for Delta tables
CREATE EXTERNAL FILE FORMAT [DeltaFormat]
    WITH (
    FORMAT_TYPE = DELTA,
    DATA_COMPRESSION = N'org.apache.hadoop.io.compress.SnappyCodec'
    );

-- Create a Data source pointing to the container of your Delta tables
CREATE EXTERNAL DATA SOURCE [DeltaLocation]
	WITH (
	LOCATION = N'abfss://gold@mydevstorage.dfs.core.windows.net/',
    CREDENTIAL = [SynapseIdentity]
	);

-- Create an external table
CREATE EXTERNAL TABLE [dbo].[dim_abcdef] (
    [integerfield] INT NULL,
    [stringfield] NVARCHAR (400) NULL,
    [datefield] DATETIME2 (7) NULL
)
    WITH (
    LOCATION = N'/delta/dim_abcdef',
    DATA_SOURCE = [DeltaLocation],
    FILE_FORMAT = [DeltaFormat]
    );
You can encrypt the master key with an addional password. The credential with the Managed Service Identity (MSI) is so that this specific Synapse identity will be used to connect to the data source (the Delta table in your Azure Storage Account a.k.a. Datalake a.k.a. Delta Lake). This means that if you have access to the Serverless SQL Pool and its credentials, you also have access to the underlying storage account.

7) Schema Compare in Visual Studio
The datamart database located in the Serverless SQL pool of Synapse in the development is filled with all the above SQL objects like the external table. Now we want to get those objects to our database project. For this we need to do a schema compare in the database project by rightclicking the project in the Solution Explorer. The schema compare is between the development database and the database project.
Schema Compare














In the Schema Compare window click on options to exclude Role Memberships and Users, because this is different for each environment in your DTAP street. This will result in not showing up in the list to compare. Otherwise you have to uncheck those each time you do a schema compare.
Application-scoped
























You also want to add Database Scoped Credentials and Master Key in the Non-Application-scoped section. These are the first two items in the script from the previous step. Without these your external tables won't work in the test/acceptance/production database. Click OK to confirm the settings.
Non-Application-scoped
























Now make sure your dev database is on the left side in the schema compare and the database project is on the right side. You can find the URL of your Serverless SQL pool in the Azure portal on the overview page of your Synapse. It's called Serverless SQL endpoint and looks like [synapsename]-ondemand.sql.azuresynapse.net.
Azure Synapse Workspace overview page










Then hit the Compare button to see all the new objects that are not yet in your project. Tip: save this schema compare (including the changed options) in your database project. Then you can reuse it.
Result of schema compare














Verify the objects and hit the Update button to include them in your database project. 
Result of the update














8) Override location of External Data Source
If you open the external data source, you will notice the hardcoded URL of the gold container in the development environment. Replace that URL by $(DeltaLocation)
Replace hardcoded URL by variable









Go to the properties of your project (not the solution) and then go to the SQLCMD Variables. Enter a new variable called $(DeltaLocation) and enter the URL of your Development environment (the URL you just replaced). You only need to fill in the Default column. Save it.
Add variable to SQLCMD Variables







Now we need to create a Publish Profile file, which we can override during deployment in the YAML pipeline from Azure DevOps. Right click the project (not the solution) and click Publish. In this window fill in the values from your development environment (we will replace them later on): target database connection and the value of the SQLCMD variable. Then hit the Create Profile button which will add the Publish Profile file to your database project. After that Cancel the window because we will not publish via Visual Studio.
Publish Profile
















9) Stage, Commit and Sync repository
The changes in Visual Studio need to go back to the repository in Azure DevOps. Brance strategy and brance policies are ignored in the explanation to keep things short and simpel.
Commit and sync changes to Azure DevOps repos




















Now all changes are stored in the Azure Repository. Next step is setting up the CICD proces.
Azure DevOps repository














10) Pipeline Libarary - Variable Groups
To make the YAML scripts reusable for mulitple Serveless SQL pools of your DTAP street we need to create some Variable Groups in Azure DevOps. You can find them under Pipelines - Library. You need one for each of your DTAP environments and opionally one for general variables that don't change between the environments. 
DevOps Variable Groups
























Is this case we will store the database project name and the database name in the general variable group.  You could just use one variable because they probably often have the same value.
  • SqlDatabaseName - Name of the database within the Serverless SQL Pool
  • SqlProjectName - Name of the database project within Visual Studio
For the environment specific variable groups we have the name of the SQL server name which is equals to the Synapse Workspacename. So if you are also deploying the Synapse workspace then you could reuse that one. The other one is the storage account location.  
  • ADLSLocation - For replacing the storage account URL between environments
  • SqlServerName - For storing the name of the server (equals to Synapse workspacename)
11) YAML pipelines
Last development step is setting up the YAML files. This example has two YAML file located in the CDCD\YAML folder of the repository. Tip: Visual Studio Code has some nice YAML editors, but you can also just use the Azure DevOps website to create and edit the YAML files.

BuildSqlServerless.yml
The YAML starts with some general steps, like reading the variable group from the previous step. Then showing that it triggers when in one of those 4 branches a change happens in the SQL folder.
Step 1, checkout, is to get the repository content to the agent.
Step 2, treeview, is just for debugging and showing you all files on the agent. Useful for step 3
Step 3, build, builds the database project that was just retrieved from the repository
Step 4, copy, copies the files required for deployment to a artifact staging folder
Step 5, publish, publishes the artifact so that it can be used in the next YAML file
###################################
# General Variables
###################################
variables:
  - group: SQLServerlessParamsGen
  
###################################
# When to create a pipeline run
###################################
trigger:
  branches:
    include:
    - development
    - test
    - acceptance
    - main
  paths:
    include:
    - SQL/*
 
stages:
###################################
# Create DacPac Artifcat
###################################
- stage: CreateSQLArtifact
  displayName: Create SQL Artifact (dacpac)
 
  jobs:
  - job: Job
    displayName: 'Build DacPac'
    workspace:
      clean: all
    pool:
      vmImage: 'windows-latest'
      # name: my-agentpool
    steps:
 
    ###################################
    # 1 Retrieve Repository
    ###################################
    - checkout: self
      displayName: '1 Retrieve Repository'
      clean: true
 
    ###################################
    # 2 Show treeview of agent
    ###################################
    - powershell: |
        Write-Output "This is the folder structure within Pipeline.Workspace"
        tree "$(Pipeline.Workspace)" /F
      displayName: '2 Treeview Workspace'
 
    ###################################
    # 3 Build Visual Studio project
    ###################################
    - task: MSBuild@1
      displayName: '3. Creating Artifact'
      inputs:
        solution: '$(System.DefaultWorkingDirectory)/SQL/$(SqlProjectName)/$(SqlProjectName).sqlproj'
        msbuildArchitecture: x86
        msbuildVersion: latest
 
    ###################################
    # 4 Stage dacpac and publish.xml
    ###################################
    - task: CopyFiles@2
      displayName: '4. Copy Artifact'
      inputs:
        contents: |
          **\*.dacpac
          **\*.publish.xml
        TargetFolder: '$(build.artifactstagingdirectory)'
 
    ###################################
    # 5 Publish dacpac and xml artifact
    ###################################
    - task: PublishPipelineArtifact@1
      displayName: '5. Publish Artifact'
      inputs:
        targetPath: '$(Build.ArtifactStagingDirectory)'
        artifact: 'SQL_Dacpac'
        publishLocation: 'pipeline'
After that is calls the second YAML file that does the actual deployment. Because we want to reuse is for all environments you need to pass some parameter values that are comming from the variable groups from the previous step. The hardcoded parameter is the name of Azure DevOps Service Connection that uses the SP to connect to the database. The env parameter is just for some logging/debugging purposes to show to which environment you are deploying.
###################################
# Deploy Test environment
###################################
- stage: DeployTst
  displayName: Deploy TST
  variables:
    - group: SQLServerlessParamsTst
  pool:
    vmImage: 'windows-latest'
    # name: my-agentpool
  condition: and(succeeded(), eq(variables['Build.SourceBranchName'], 'test'))
  dependsOn: CreateSQLArtifact
  jobs:
    - template: DeploySqlServerless.yml
      parameters:
        env: TST
        ServiceConnection: SC-Synapse-T
        SqlServerName: $(SqlServerName)
        SqlDatabaseName: $(SqlDatabaseName)
        SqlProjectName: $(SqlProjectName)
        ADLSLocation: $(ADLSLocation)

###################################
# Deploy Acceptance environment
###################################
- stage: DeployAcc
  displayName: Deploy ACC
  variables:
    - group: SQLServerlessParamsAcc
  pool:
    vmImage: 'windows-latest'
    # name: my-agentpool
  condition: and(succeeded(), eq(variables['Build.SourceBranchName'], 'acceptance'))
  dependsOn: CreateSQLArtifact
  jobs:
    - template: DeploySqlServerless.yml
      parameters:
        env: ACC
        ServiceConnection: SC-Synapse-A
        SqlServerName: $(SqlServerName)
        SqlDatabaseName: $(SqlDatabaseName)
        SqlProjectName: $(SqlProjectName)
        ADLSLocation: $(ADLSLocation)

###################################
# Deploy Production environment
###################################
- stage: DeployPrd
  displayName: Deploy PRD
  variables:
    - group: SQLServerlessParamsTst
  pool:
    vmImage: 'windows-latest'
    # name: my-agentpool
  condition: and(succeeded(), eq(variables['Build.SourceBranchName'], 'main'))
  dependsOn: CreateSQLArtifact
  jobs:
    - template: DeploySqlServerless.yml
      parameters:
        env: PRD
        ServiceConnection: SC-Synapse-P
        SqlServerName: $(SqlServerName)
        SqlDatabaseName: $(SqlDatabaseName)
        SqlProjectName: $(SqlProjectName)
        ADLSLocation: $(ADLSLocation)

Build and publish artifact












DeploySqlServerless.yml
The second YAML file starts with the parameters that are required to call this script. Then the environment name in the job that you can use to set some approvals. The first step is just for debugging. Showing the parameter values and a treeview of the agent. You should be able to see the artifact folder which is useful to set up the deployment task where you need the paths of the dacpac and publish profile. In the AdditionalArguments property you can override the value of the Storage Account location. If you have multiple just repeat the entire value with a space between it.
parameters:
  - name: env
    displayName: Environment
    type: string
    values:
    - DEV
    - TST
    - ACC
    - PRD
  - name: ServiceConnection
    displayName: Service Connection
    type: string
  - name: SqlServerName
    displayName: Sql Server Name
    type: string
  - name: SqlDatabaseName
    displayName: Sql Database Name
    type: string
  - name: SqlProjectName
    displayName: Sql Project Name
    type: string
  - name: ADLSLocation
    displayName: Azure Data Lake Location
    type: string
 
 
jobs:
    - deployment: deploymentjob${{ parameters.Env }}
      displayName: Deployment Job ${{ parameters.Env }}
      environment: deploy-to-${{ parameters.Env }}
      strategy:
        runOnce:
          deploy:
            steps:
            ###################################
            # 1 Show environment and treeview
            ###################################
            - powershell: |
                Write-Output "Deploying ${{ parameters.SqlProjectName }} to DB ${{ parameters.SqlDatabaseName }} and server ${{ parameters.SqlServerName }} in the ${{ parameters.env }} environment"
                Write-Output "Changing SQLCMD variabele DeltaLocation to value ${{ parameters.ADLSLocation }}"
                tree "$(Pipeline.Workspace)" /F
              displayName: '1 Show environment and treeview Pipeline_Workspace'
 
            ###################################
            # 2 Deploy DacPac
            ###################################            
            - task: SqlAzureDacpacDeployment@1
              displayName: '2 Deploy DacPac'
              inputs:
                azureSubscription: '${{ parameters.ServiceConnection }}'
                AuthenticationType: 'servicePrincipal'
                ServerName: '${{ parameters.SqlServerName }}-ondemand.sql.azuresynapse.net'
                DatabaseName: '${{ parameters.SqlDatabaseName }}'
                deployType: 'DacpacTask'
                DeploymentAction: 'Publish'
                DacpacFile: '$(Pipeline.Workspace)/SQL_Dacpac/SQL/${{ parameters.SqlProjectName }}/bin/debug/${{ parameters.SqlProjectName }}.dacpac'
                PublishProfile: '$(Pipeline.Workspace)/SQL_Dacpac/SQL/${{ parameters.SqlProjectName }}/${{ parameters.SqlProjectName }}.publish.xml'
                AdditionalArguments: /Variables:DeltaLocation=${{ parameters.ADLSLocation }}
                IpDetectionMethod: 'AutoDetect'

Deploy Serverless SQL pool database












Conclusion
In this post you learned to apply an 'old fashioned' solution to a relative new service. This solution doesn't only work for the Synapse Serverless SQL Pool, but also for the Synapse Dedicated SQL Pool.

There is still one bug/problem. For the Serverless SQL Pool it seems that updating the External Data Source is not possible. The first deployment is no problem, but altering statements are ignored. Probably because there are already external tables referencing this object. You are probably not updating this external source a lot to make it a big issue, but if you want to do it you can use a pre-deployment script in the database project to first drop those object.

In a next post we will show how to give Power BI access to this Synapse Serverless SQL Pool database.

Thanks to colleagues Ralph Koumans and Bart van Es for helping out setting it up.

Sunday 26 March 2023

Synapse - Change Data Feed (CDF) on delta tables

Case
I like the data versioning of the Delta Tables and I know how to get data from different versions, but how can I combine that in one query to get for example the changes during a year to create a nice fact table about certain changes.
Change Data Feed in Synapse




















Solution
Change Data Feed (CDF) is still a bit new. The currently supported Delta Lake version in the Synapse workspace is 2.2.0. This version does not yet not support CDF for SQL queries. This shoud be available in Delta Lake 2.3.0 according to the release documentation. Luckily you can already use PySpark to get this information.
Current Delta Lake version in Synapse










1) Enable Change Data Feed
First you have to enable the Change Data Feed option on your Delta table. From that point in time you can use CDF. The property is called enableChangeDataFeed.

You can alter your existing tables with an Alter statement
%%sql

ALTER TABLE silver.Cities
  SET TBLPROPERTIES (delta.enableChangeDataFeed = true);
For new Delta Tables you can also do this in the Create Table command.
%%sql

CREATE TABLE Silver.Cities (Id INT, City STRING, Population INT)
  TBLPROPERTIES (delta.enableChangeDataFeed = true);
And if you used the PySpark code from our previous post, then you can add an option just in front of the save.
sdf.write.format('delta').option("delta.enableChangeDataFeed", "true").save(delta_table_path)
To check whether it is enabled on your Delta Table you can use the following command.
%%sql

SHOW TBLPROPERTIES silver.cities
CDF is enabled













2) Check available data versions
Now that we have the Change Data Feed option available, lets check which data versions we have with the DESCRIBE HISTORY command. In the first example you will see that CDF is enabled after table creation in the second version (1). This means you can not include the first version (0) in the CDF command.

You will get an error if you set the range wrong while getting CDF info:
AnalysisException: Error getting change data for range [0 , 4] as change data was not
recorded for version [0]. If you've enabled change data feed on this table,
use `DESCRIBE HISTORY` to see when it was first enabled.
Otherwise, to start recording change data, use `ALTER TABLE table_name SET TBLPROPERTIES
(delta.enableChangeDataFeed=true)`.
CDF available from version 1







In the second example it was enabled during the Delta table creation and therefore CDF is available from the first version (0).
CDF available from version 0








3) Query CDF data
When you query the CDF data you will get some extra columns:
  • _change_type: showing what action was taken to change the data - insert, update_preimage, update_postimage and delete
  • _commit_version: showing the version number of the data
  • _commit_timestamp: showing the timestamp of the data change
If you want particular versionnumbers when getting the data, then you can use startingVersion and endingVersion as an option while reading the data. Only startingVersion is also permitted.
%%pyspark

df = spark.read.format("delta") \
  .option("readChangeFeed", "true") \
  .option("startingVersion", 0) \
  .option("endingVersion", 5) \
  .table("Silver.cities")

display(df.sort("City","_commit_version"))
Filter CDF on version numbers













Probably more useful is to query date ranges, then you can use startingTimestamp and endingTimestamp as an option. Only startingTimestamp is also permitted.
%%pyspark

df = spark.read.format("delta") \
  .option("readChangeFeed", "true") \
  .option("startingTimestamp", '2023-03-26 11:07:23.008') \
  .table("Silver.cities")

display(df.sort("City","_commit_version"))
Filter CDF on timestamps














If you want to use the new column _commit_timestamp from the next record to create a new column called end_timestamp in the current record, then you need to play with the lead() function (just like in TSQL).
%%pyspark

from pyspark.sql.window import Window
from pyspark.sql.functions import lead 

df = spark.read.format("delta") \
  .option("readChangeFeed", "true") \
  .option("startingTimestamp", '2023-03-26 11:07:23.008') \
  .table("Silver.cities")

# Create window for lead
windowSpec  = Window.partitionBy("City").orderBy("_commit_version")

# Remove update_preimage records, add new column with Lead() and then sort
display(df.filter("_change_type != 'update_preimage'")
           .withColumn("_end_timestamp",lead("_commit_timestamp",1).over(windowSpec))
           .sort("City","_commit_version"))
Create end_timestamp with lead function











Conclusions
In this post you learned the basics of the Change Data Feed options in Synapse. This feature is available in Delta Lake 2.0.0 and above, but it is still in experimental support mode. For now you have to use PySpark instead of Spark SQL to query the data in Synapse.

Besides creating nice fact tables to show data changes during a certain periode this feature could also be useful to incremental load a large fact table with only changes from the silver layer. Creating audit trails for data changes over time could also be an interesting option. The CDF option is probably the most useful when there are not that many changes in a table.

In a later post, when Delta Lake 2.3.0 is available in Synapse, we will explain the Spark SQL options for CDF. Special thanks to colleagues Roelof Jonkers and Martijn Broeks for helping out.



Monday 20 March 2023

Synapse - Creating Silver Delta Tables

Case
I want to create and fill a Silver layer based on parquet files in my bronze layer. Is there a simple way to create and populate the tables automatically.
Adding files to your Silver layer












Solution
You can create a notebook for this and then call that notebook from your Synapse pipeline with some parameters (location, table name and keys). This allows you to for example loop through all your ingested source files from the bronze (raw/ingest) layer and then call this notebook for each file to add them to the Silver layer. We can also add the silver tables directly to the Lake database for easy querying later on.

Note: that this example is a technical source based Silver layer. So not realy cleansed, curated or conformed.

1) Create notebook
Go to the developer tab in Synapse and create a new Notebook. Give it a suitable name and make sure the language is PySpark. Sooner or later you want to test this Notebook, so attach it to a SparkPool. Optionally you can add a Markdown cell to explain this notebook.
New Synapse Notebook









2) Code cell 1: parameters
The first code cell is for the parameters that can be overridden by parameters from the Notebook activity in the pipeline. Toogle the parameters option to make is a parameter cell. For more details see our post about notebook parameters. For debugging within the notebook we used real values.

For this example everything (bronze and silver) is in the same container. So you might want to add more parameters to split those up. This example uses parquet files as a source. If you want for example CSV then you need to change the format in the mail code to fill the Spark Data Frame with data.
# path of the data lake container (bronze and silver for this example)
data_lake_container = 'abfss://mysource@datalakesvb.dfs.core.windows.net'
# The ingestion folder where your parquet file are located
bronze_folder = 'Bronze'
# The silver folder where your Delta Tables will be stored
silver_folder = 'Silver'
# The name of the table
table_name = 'SalesOrderHeader'
# The wildcard filter used within the bronze folder to find files
source_wildcard = 'SalesOrderHeader*.parquet'
# A comma separated string of one or more key columns (for the merge)
key_columns_str = 'SalesOrderID'
Parameters








3) Code cell 2: import modules and functions
The second code cell is for importing all required/useful modules. For this basic example we have only one import:
# Import modules
from delta.tables import DeltaTable
Import Delta Table module








3) Code cell 3: filling delta lake
Now the actual code for filling the delta lake tables with parquet files from the data lake. Note: code is very basic. It checks whether the Delta Lake table already exists. If not it creates the Delta Lake table and if it already exists it merges the new data into the existing table. If you have transactional data then you could also do an append instead of a merge.

# Convert comma separated string with keys to array
key_columns = key_columns_str.split(',')  

# Convert array with keys to where-clause for merge statement
conditions_list = [f"existing.{key}=updates.{key}" for key in key_columns]

# Determine path of source files from ingest layer
source_path = data_lake_container + '/' + bronze_folder + '/' + source_wildcard 

# Determine path of Delta Lake Table 
delta_table_path = data_lake_container + '/' + silver_folder + '/' + table_name

# Read file(s) in spark data frame
sdf = spark.read.format('parquet').option("recursiveFileLookup", "true").load(source_path)

# Check if the Delta Table exists
if (DeltaTable.isDeltaTable(spark, delta_table_path)):
    print('Existing delta table')
    # Read the existing Delta Table
    delta_table = DeltaTable.forPath(spark, delta_table_path)

    # Merge new data into existing table
    delta_table.alias("existing").merge(
        source = sdf.alias("updates"),
        condition = " AND ".join(conditions_list)
        
    ).whenMatchedUpdateAll(
    ).whenNotMatchedInsertAll(
    ).execute()

    # For transactions you could do an append instead of a merge
    # sdf.write.format('delta').mode('append').save(delta_table_path)

else:
    print('New delta table')
    # Create new delta table with new data
    sdf.write.format('delta').save(delta_table_path)
Adding data to new or existing Delta Table



















4) Code cell 4: Adding Delta Table to Lake Database
The last step is optional, but very useful: adding the Delta Table to the Lake Database. This allows you to query the Delta Table by its name instead of its path in the Data Lake. Make sure you first add a Silver layer to that Lake database. See this post for more details (step 1).
# Adding the Delta Table to the Delta Database for easy querying in other notebooks or scripts within Synapse.
spark.sql(f'CREATE TABLE IF NOT EXISTS Silver.{table_name} USING DELTA LOCATION \'{delta_table_path}\'')

# Spark SQL version
#  CREATE TABLE Silver.MyTable
#  USING DELTA
#  LOCATION 'abfss://yourcontainer@yourdatalake.dfs.core.windows.net/Silver/MyTable'
Adding Delta Table to Lake Database








5) Creating Pipeline
Now it is time to loop through your ingested files and call this new Notebook for each file to create the Silver Layer Delta Tables. You have to provide values for all parameters in the notebook. Since you need the key column(s) of each table to do the merge you probably need to store these somewhere.

For the ingestion we often store the table/file names from each source that we want to download to the data lake in a meta data table. In this table we also store the key column(s) from each table.

Call Notebook in ForEach loop













Synapse doesn't retrieve the parameters from the Notebook. You have to add them manually as Base parameters in the Settings tab.
Calling Notebook
















If you enter a column or set of columns for the key that are not unique you will get an error the second time you run (first time the merge is not used). 
Cannot perform Merge as multiple source rows matched and attempted to modify the same target row in the Delta table in possibly conflicting ways. By SQL semantics of Merge, when multiple source rows match on the same target row, the result may be ambiguous as it is unclear which source row should be used to update or delete the matching target row. You can preprocess the source table to eliminate the possibility of multiple matches.

6) Result
Now you can run your pipeline and check whether the silver layer of you Lake database is populated with new tables. And you can create a new notebook with Spark SQL or PySpark to check the contents of the tables to see wether the Time Travel works.
Running the pipeline that calls the new Notebook












Delta Lake folders in the Data Lake





















Conclusions
In this post you learned how to create and populate a (source based) silver layer of your Lake House Delta Tables. An easy quick start for your lake house. If you have multiple sources with similar data then you should also consider creating a real cleansed, curated and conformed silver layer manually. In a later post we will show you some of those manual steps in Spark SQL or PySpark.

Special thanks to colleague Heleen Eisen for helping out with the PySpark.





Sunday 19 March 2023

Synapse - Using Spark SQL to time travel Delta Tables

Case
In a previous blog post you showed how to create and query Delta Tables with PySpark for a Lake House, however most Data Warehouse people are more familiar with the SQL language. How can you query a Delta Table with the good old SQL language?
Using Spark SQL to time travel Delta Tables
















Solution
In that previous blog post we showed you that you can query the Delta Tables in for example a SQL Serverless pool by creating External Tables on those Delta Tables. This allows you to use TSQL to query Delta Tables, but it doesn't allow you to use time travel. You always get the latest version of the data.
External Tables on Delta in Serverless SQL Pool
























However we can use Synapse Notebooks with Spark SQL as a language which is very similar to TSQL to query Delta Tables. This allows you to time travel the data in a familiar language.

1) Add Delta Table to Lake Database
For easily querying Delta Tables you first need make the Delta Tables visible in Synapse by adding them to the Lake Database. We explained this in the previous blog post.
Adding Delta Table to Lake Database









Once the Delta Table is available in the Lake Database you can query it like a regular table. By default you will see the latest version of the data.
%%sql
SELECT * FROM silver.cities
The alternative is to use the entire path:
%%sql
SELECT * FROM delta.`abfss://mysource@mydatalake.dfs.core.windows.net/silver/cities`
2) Show historical versions
You can check which historical versions are available with the DESCRIBE HISTORY command.
%%sql
DESCRIBE HISTORY silver.cities
Show versions of the Delta Table

















Besides showing the history you can also check where the Delta Table is stored in your Data Lake with the DESCRIBE EXTENDED command. It will give you various details like the location of the Delta Table.
See details of Delta Table
















3) Show specifict version by version number
With the DESCRIBE HISTORY command you get a table with various versions of your table. The fist column shows the version number that starts with 0 for the initial version of the table.

When you query a Delta Table you can add VERSION AS OF X behind the query where you replace the X by the version number. In this example we take version 2 (the third version of the table).
%%sql
SELECT * FROM silver.cities VERSION AS OF 2
Showing version 2 of the Delta Table















4) Show specifict version by date
Time traveling with a specific version number is cumbersome because you first need to determine the version you need. Lucily you can also get a version that was active on a specific date by adding TIMESTAMP AS OF "2022-01-01" behind the query.
%%sql
SELECT * FROM silver.cities TIMESTAMP AS OF "2022-01-01"
Showing version of a specific date

















Conclusions
In this post you learned how to time travel a Delta Table with Spark SQL. The same options as with PySpark, but for some people just a little bit more readable. In a next post we will discus Change Data Feed to get data changes between versions.