Showing posts with label SYNAPSE. Show all posts
Showing posts with label SYNAPSE. Show all posts

Wednesday 28 June 2023

Synapse and ADF Pipeline snack - Disable Activity

Case
Something that we already had in SSIS, but was never implemented in Data Factory or Synapse... until now! You can disable Pipeline activities with Activity State (in preview at the moment of writing)
Disabling Pipeline Activities!
























Solution
In each activity (ADF and Synapse) you now have the option to change the state of an Activity from Active to Inactive and after that you can provice the end status that should be used when running this activity in the pipeline: Skipped, Succeeded or Failed.
Mark activity as Skipped, Succeeded or Failed






















This is extremly useful for debugging and testing parts of your pipeline. You can set it in the properties, but even more simpel is just right clicking the activity (just like in SSIS ♥).
Right click to change activity state





















An other great scenario is that you can now disable an activity that is not yet ready and validate the pipeline without getting any errors that required fields are not yet provided. Validating just ignores the disabled activities.

Validating ignores disabled activities














Conclusion
It only took almost 6 years, but this seemingly simple feature could be very useful. It is still in preview but available in Data Factory and Synapse pipelines.

Disable multiple activities at the same time





Friday 5 May 2023

Synapse - Automatically check naming conventions

Case
We use naming conventions in Synapse, but sometimes it's just a lot of work to check if everything is correct. Is there a way to automatically check those naming conventions?
Automatically check Naming Conventions












Solution
You can use a PowerShell script to loop through the JSON files from Synapse that are stored in the repository. Prefixes for Linked Services, Datasets, Pipelines, Notebooks, Dataflows and scripts are easy to check since that is just a case of checking the filename of the  JSON file. If you also want to check activities or use the Linked Service or Dataset type to check the naming conventions then you also need to check the contents of those JSON files.

There are a lot of different naming conventions, but as long as you are committed to use one, it will make your Synapse workspace more readable and your logs easier to understand. In one glance you will immediately see which part of Synapse is causing the error. Above all it looks more professional and it show you took the extra effort to make it better.

However, having a lot of different naming conventions also makes it is hard to create one script to rule them all. We created a script to check the prefixes of all differents parts of Synapse and you can configure them in a JSON file. You can use this script to either run it once a while to occasionally check your workspace or run it as a Validation step for a pull request in Azure DevOps. Then it acts like a gatekeep that doesn't allow bad named items. You can make it a required step then you first have to solve the issues or make it an optional check and then you will get the result, but you can choose to ignore it. For new projects you should make it required and for big existing projects you should probably first make it optional for a while and then change it to required.

The PowerShell script, the YAML file and the JSON config example are stored in a public GitHub site. This allows us to easily improve the code for you and keep this blog post up-to-date. It also allows you to help out by doing suggestions or even to write some better code.


1) Folder structure repository
Just like the Synapse Deployment scripts we store these validation files in the Synapse respository. We have CICD and a SYN folder in the root. SYN contains the JSON files from the Synapse workspace. The CICD folder had three sub folders: JSON (for the config), PowerShell (for the actual script) and YAML (for the pipeline that is required for validation).

Download the files from the Github Repository and store these in your own Repository according the structure described above. If you use a different structure you have to change the paths in the YAML file.
Repository structure
























2) Create pipeline
Now create a new pipeline with the existing YAML file called NamingValidation4Synapse.yml. Make sure the paths in the YAML are following the folder structure from step 1 or change it to your own structure.

If you are not sure about the folders then there is a treeview step in the YAML that will show you the structure of the agent. Just continue with the next steps and after the first run check the result of the treeview step and change the paths in the YAML and run again. You can remove or comment-out the treeview step when everything works.

This example uses the Azure DevOps respository with the following steps:
  • Go to pipelines and create a new pipeline
  • Select the Azure Repos Git
  • Select the Synapse repository
  • Choose Existing Azure Pipelines YAML file
  • Choose the right branch and select the YAML file under path
  • Save it and rename it because the default name is equals to the repos name
Create new pipeline with existing YAML file












3) Branch validation
Now that we have the new YAML pipeline, we can use it as a Build Validation in the branch policies. The example shows how to add them in an Azure DevOps repository.
  • In DevOps go to Repos in the left menu.
  • Then click branches to get all branches.
  • Now hover you mouse above the first branch and click on the 3 vertical dots.
  • Click Branch policies
  • Click on the + button in the Build Validation section.
  • Select the new pipeline created in step 2 (optionally change the Display name)
  • Choose the Policy requirement (Required or Optional)
  • Click on the Save button
Repeat these steps for all branches where you need the extra check. Don't add them on feature branches because it will also prevent you doing manual changes in these branches.
All branches








Required or Optional
















4) Testing
Now its time to perform a pull request. You will see that the validation will first be queued. So this extra validation will take a little extra time, especially when you have a busy agent. However you can just continue working and wait for the approval or even press Set auto-complete to automatically complete the Pull Request when all approvals and validations are validated. However don't auto-complete if you chose to make it an optional validation because then it will cancel it as soon as all other validations are ready.

As soon as the validation is ready it will show you the first couple if errors.
Required Naming Convention Validation failed











You can click on those couple of errors to see the total number of errors and the first ten errors.
Get error count and first 10 errors

















And you can click on one of those errors to see the entire output including all errors and all correct items.

















And at the bottom you will find a summary with the total number of errors and percentages per Synapse part.
Summary example










Summary example











Or download your Synapse JSON files and run the PowerShell script locally with for example Visual Studio Code.
Running Naming Validations in VCode














Conclusion
In this post you learned how you could automate your naming conventions check. This helps/forces your team to consistantly use the naming conventions without you being some kind of nitpicking police officer. Everybody can just blame DevOps/themselves.

You can combine this validation with for example the branch validation to prevent accidentily choosing the wrong branch in a Pull Request.

Please submit your suggestions under Issues (and then bug report of feature request) in the GitHub site or drop it in the comments below.




Wednesday 5 April 2023

Synapse snack - Increment counter variable

Case
I want to increment a variable to count some actions. For example to break a until loop after 4 times. What is the Synapse (or ADF) equivalent of counter++ or counter = counter + 1 because you can not self reference in a variable expression.
The expression contains self referencing variable.
A variable cannot reference itself in the expression.



















Solution
Unfortunately you can not self reference a variable in an expression of a Set variable activity. If you want to increment a variable called Counter then you can't use an expresion like this:
@add(int(variables('Counter')), 1)
1) Add extra variable
The trick is to use two variables (and two Set Variable activities). In this example we have besides the Counter variable also a second 'helper' variable called NewCounterValue. The type is String for both since there isn't an integer type available.
Two variables to increment a variable value















2) Add first Set variable
Now add two Set variable activities to the canvas of your pipeline. In this case inside an UNTIL loop. The first activity will set the value of the helper variable called NewCounterValue. It retrieves the value of the original Counter variable, converts it to an Integer and then adds 1. After that it is converted back to a String:
@string(
    add(
        int(variables('Counter')),
        1
    )
)
Increment variable part 1










3) Add second Set variable
The second activity just retrieves the value of the new helper variable and uses it to set the value of the Counter variable. No need to cast the variable to an other type.
Increment variable part 2















4) Until loop
Now lets test it in an until loop where it stops the until loop after 4 times.
The Until loop









The Until loop in action



















Conclusions
In this post we showed you how to overcome a little shortcoming in Synapse and ADF. It's a handy contruction to do something max X times. For example checking the status max 4 times. The Until stops whenever the status changes or when the retry counter is 4 or higher.
@or(
    greaterOrEquals(int(variables('Counter')),4)
,   not(equals(activity('WEB_Check_Status').output.status,'PENDING'))
)
Case Example for retry











Saturday 1 April 2023

Synapse - Cleanup workspace after deployment

Case
A while ago we created a script to clean up the Synapse Workspace before deploying new pipelines, datasets, etc. to Synapse. This helps you to remove old parts like for example triggers because the Synapse deployment is incremental and does not do any deletes. That script works fine, however for workspaces with a lot of items it could take up to 30 minutes to clean up everything. In that post we already mentioned some future improvements...
Cleanup Synapse Workspace after deployment



Update: use DeleteArtifactsNotInTemplate: true in deployment task to avoid powershell

Solution

This version of the cleanup script compares the items in the Synapse Workspace against the Artifact that you just deployed. Everything that is in the workspace but not in the artifact will now be deleted (afterwards). This will significantly shorten the cleanup period during deployment.

You need to add this YAML step to your Synapse Deployment pipeline (preferably after the Deployment step) and change the path of the PowerShell script to your setup. The first parameter is the Synapse Workspace Name of the environment you are deploying to. The second one is the corresponding Resource Group Name. The last one is the location of the artifact. In this case the Pipeline.Workspace variable + "s\Synapse" (see screenshot below of the treeview on the agent).
###################################
# 5 Cleanup Synapse
###################################
- task: AzurePowerShell@5
  displayName: '5 Cleanup Synapse'
  inputs:
	azureSubscription: ${{ parameters.ServiceConnnection }}
	scriptType: filePath
	scriptPath: $(Pipeline.Workspace)\s\CICD\Powershell\ClearSynapse.ps1
	scriptArguments:
	  -WorkspaceName ${{ parameters.TargetWorkspaceName }} `
	  -ResourceGroupName ${{ parameters.ResourceGroupName }} `
	  -ArtifactDirectory $(Pipeline.Workspace)\s\Synapse
	azurePowerShellVersion: latestVersion
	pwsh: true
Showing the path of the Artifact root folder















Save the PowerShell Script below in your repository and change the path in the above YAML script. We like to have a CICD folder in the root to store everything deployment related.
 
Repos folder structure























param (
   [Parameter (Mandatory = $true, HelpMessage = 'Synapse name')]
   [ValidateNotNullOrEmpty()]
   [string] $WorkspaceName,
   
   [Parameter (Mandatory = $true, HelpMessage = 'Resourcegroup name')]
   [ValidateNotNullOrEmpty()]
   [string] $ResourceGroupName,
   
   [Parameter (Mandatory = $true, HelpMessage = 'Artifact Directory')]
   [ValidateNotNullOrEmpty()]
   [string] $ArtifactDirectory
)

# Two default Linked Services we cannot remove
[string] $WorkspaceDefaultSqlServer = "$($WorkspaceName)-WorkspaceDefaultSqlServer"
[string] $WorkspaceDefaultSqlStorage = "$($WorkspaceName)-WorkspaceDefaultStorage"

# A little dummy protection to check whether you have configured the right folder
# If these folders are not available there is probably something wrong and we
# don't want to delete everything in the workspace.
if (!(Test-Path -Path (Join-Path $ArtifactDirectory "integrationRuntime")) -AND
    !(Test-Path -Path (Join-Path $ArtifactDirectory "linkedService")) -AND 
    !(Test-Path -Path (Join-Path $ArtifactDirectory "pipeline")))
    {
        Write-Output "Artifact folder $($ArtifactDirectory) "
        throw "Dummy protection - Probably not the right folder that stores your artifact"
    }


#######################################################
# 1) Checking for resource locks and removing them
#######################################################
Write-Output "==========================================="
Write-Output "1) Removing resource locks"
Write-Output "==========================================="

# Getting all locks on the Azure Synapse Workspace
$lock = Get-AzResourceLock -ResourceGroupName $ResourceGroupName -ResourceName $WorkspaceName -ResourceType "Microsoft.Synapse/workspaces"
Write-Output "Found $($lock.Count) locks"

# Check if the collection of Azure resource locks is not emtpy
if($null -ne $lock)
{
    # Looping through all resource locks to remove them one by one
    $lock | ForEach-Object -process {
        # Remove lock
        Write-Output "Removing Lock Id: $($lock.LockId)"
        Remove-AzResourceLock -LockId $_.LockId -Force
    }
}
Write-Output "Step 'Removing resource locks' completed`r`n"



#######################################################
# 2) Stop and remove Triggers not in Artifact
#######################################################
Write-Output "==========================================="
Write-Output "2) Stop and remove Triggers not in Artifact"
Write-Output "==========================================="

# Check if the artifact contains any triggers
if (Test-Path -Path (Join-Path $ArtifactDirectory "trigger"))
{
    # Getting all Triggers from Artifact
    $ArtifactTriggers = Get-ChildItem -Path (Join-Path $ArtifactDirectory "trigger") -Filter "*.json" | Select-Object -ExpandProperty BaseName

    # Getting all Triggers from Synapse
    $SynapseTriggers = Get-AzSynapseTrigger -WorkspaceName $WorkspaceName | Select-Object -ExpandProperty Name

    # Getting Triggers from Synapse that are not in the Artifact
    $Triggers = Compare-Object -ReferenceObject $ArtifactTriggers -DifferenceObject $SynapseTriggers | Select-Object -ExpandProperty InputObject
} else {
    # Fill collection with all existing triggers to remove them
    # because artifact doesn't contain any triggers anymore
    Write-Output "Path not found in Artifact, removing all existing Triggers in Synapse Workspace"
    $Triggers = Get-AzSynapseTrigger -WorkspaceName $WorkspaceName | Select-Object -ExpandProperty Name
}
Write-Output "Found $($Triggers.Count) Triggers that are not in the Artifact"

# Stopping all Triggers before deleting them
$Triggers | ForEach-Object -process { 
    Write-Output "Stopping Trigger $($_)"
    try {
        # Trying to stop each Trigger
        Stop-AzSynapseTrigger -WorkspaceName $WorkspaceName -Name $($_) -ErrorAction Stop
    }
    catch {
        if ($_.Exception.Message -eq "{}")
        {
            # Ignore failures for triggers that are already stopped
            Write-Output "Trigger stopped"
        }
        else {
            # Unexpected error
            Write-Output "Something went wrong while stopping trigger!"
            Throw $_
        }
    }
    # Remove trigger
    Write-Output "Removing Trigger $($_)"
    Remove-AzSynapseTrigger -Name $_ -WorkspaceName $WorkspaceName -Force
}
Write-Output "Step 'Stop and remove Triggers not in Artifact' completed`r`n"



#######################################################
# 3) Remove Pipelines not in Artifact
#######################################################
Write-Output "==========================================="
Write-Output "3) Remove Pipelines not in Artifact"
Write-Output "==========================================="

# Check if artifact contains any pipelines
if (Test-Path -Path (Join-Path $ArtifactDirectory "pipeline"))
{
    # Getting all Pipelines from Artifact
    $ArtifactPipelines = Get-ChildItem -Path (Join-Path $ArtifactDirectory "pipeline") -Filter "*.json" | Select-Object -ExpandProperty BaseName

    # Getting all Pipelines from Synapse
    $SynapsePipelines = Get-AzSynapsePipeline -WorkspaceName $WorkspaceName | Select-Object -ExpandProperty Name

    # Getting Pipelines from Synapse that are not in the Artifact
    $Pipelines = Compare-Object -ReferenceObject $ArtifactPipelines -DifferenceObject $SynapsePipelines | Select-Object -ExpandProperty InputObject
} else {
    # Fill collection with all existing pipelines to remove them 
    # because artifact doesn't contain any pipelines anymore
    Write-Output "Path not found in Artifact, removing all existing Pipelines in Synapse Workspace"
    $Pipelines = Get-AzSynapsePipeline -WorkspaceName $WorkspaceName | Select-Object -ExpandProperty Name
}
Write-Output "Found $($Pipelines.Count) Synapse Pipelines that are not in the Artifact"

# Trying to delete all pipelines. If a pipeline is still referenced
# by an other pipeline it will continue to remove other pipelines 
# before trying to remove it again... max 100 times. So don't create
# chains of pipelines that are too long
[int] $depthCount = 0
while ($Pipelines.Count -gt 0 -and $depthCount -lt 100)
{
    # Loop through collection of pipelines and try to remove them
    $Pipelines | ForEach-Object -process { 
        Write-Output "Trying to delete pipeline $($_)"
        Remove-AzSynapsePipeline -Name $_ -WorkspaceName $WorkspaceName -Force -ErrorAction SilentlyContinue
    }

    # Wait 2 seconds before retry and raise retry counter
    Start-Sleep -Seconds 2
    $depthCount += 1

    # Check if artifact contains any pipelines
    if (Test-Path -Path (Join-Path $ArtifactDirectory "pipeline"))
    {   
        # Getting all Pipelines from Synapse
        $SynapsePipelines = Get-AzSynapsePipeline -WorkspaceName $WorkspaceName | Select-Object -ExpandProperty Name
        
        # Getting Pipelines from Synapse that are not in the Artifact
        $Pipelines = Compare-Object -ReferenceObject $ArtifactPipelines -DifferenceObject $SynapsePipelines | Select-Object -ExpandProperty InputObject
    }  else {
        # Fill collection with all existing pipelines to remove them 
        # because artifact doesn't contain any pipelines anymore
        $Pipelines = Get-AzSynapsePipeline -WorkspaceName $WorkspaceName | Select-Object -ExpandProperty Name
    }
    
    # Check if there are any pipelines left for a retry
    if ($Pipelines.count -gt 0)
    {
        Write-Output "Still found $($Pipelines.Count) Synapse Pipelines that are not in the Artifact. Starting next iteration."
    } else {
        Write-Output "Deletion of Pipelines not in Artifact completed"
    }
}
# Error when you have circulair pipeline links or just way to many levels
if ($depthCount -eq 100)
{
    throw "Too many levels of child pipelines or circulair relations!"
}
Write-Output "Step 'Remove Pipelines not in Artifact' completed`r`n"



#######################################################
# 4) Remove Notebooks not in Artifact
#######################################################
Write-Output "==========================================="
Write-Output "4) Remove Notebooks not in Artifact"
Write-Output "==========================================="

# Check if artifact contains any pipelines
if (Test-Path -Path (Join-Path $ArtifactDirectory "notebook"))
{
    # Getting all Notebooks from Artifact
    $ArtifactNotebooks = Get-ChildItem -Path (Join-Path $ArtifactDirectory "notebook") -Filter "*.json" | Select-Object -ExpandProperty BaseName

    # Getting all Notebooks from Synapse
    $SynapseNotebooks = Get-AzSynapseNotebook -WorkspaceName $WorkspaceName | Select-Object -ExpandProperty Name

    # Getting Notebooks from Synapse that are not in the Artifact
    $Notebooks = Compare-Object -ReferenceObject $ArtifactNotebooks -DifferenceObject $SynapseNotebooks | Select-Object -ExpandProperty InputObject
} else {
    # Fill collection with all existing notebooks to remove them 
    # because artifact doesn't contain any notebooks anymore
    Write-Output "Path not found in Artifact, removing all existing Notebooks in Synapse Workspace"
    $Notebooks = Get-AzSynapseNotebook -WorkspaceName $WorkspaceName | Select-Object -ExpandProperty Name
}    
Write-Output "Found $($Notebooks.Count) Synapse Notebooks that are not in the Artifact"

# Loop through collection of Notebooks to delete them
$Notebooks | ForEach-Object -process {
    Write-Output "Deleting Notebook $($_)"
    Remove-AzSynapseNotebook -Name $($_) -WorkspaceName $WorkspaceName -Force
}
Write-Output "Step 'Remove Notebooks not in Artifact' completed`r`n"



#######################################################
# 5) Remove SQL-Scripts not in Artifact
#######################################################
Write-Output "==========================================="
Write-Output "5) Remove SQL-Scripts not in Artifact"
Write-Output "==========================================="

# Check if artifact contains any SQL Scripts
if (Test-Path -Path (Join-Path $ArtifactDirectory "sqlscript"))
{
    # Getting all SQL-scripts from Artifact
    $ArtifactSQLScripts = Get-ChildItem -Path (Join-Path $ArtifactDirectory "sqlscript") -Filter "*.json" | Select-Object -ExpandProperty BaseName

    # Getting all SQL-scripts from Synapse
    $SynapseSQLScripts = Get-AzSynapseSqlScript -WorkspaceName $WorkspaceName | Select-Object -ExpandProperty Name

    # Getting SQL-scripts from Synapse that are not in the Artifact
    $SQLScripts = Compare-Object -ReferenceObject $ArtifactSQLScripts -DifferenceObject $SynapseSQLScripts | Select-Object -ExpandProperty InputObject
} else {
    # Fill collection with all existing SQL Scripts to remove them 
    # because artifact doesn't contain any SQL Scripts anymore
    Write-Output "Path not found in Artifact, removing all existing SQL-Scripts in Synapse Workspace"
    $SQLScripts = Get-AzSynapseSqlScript -WorkspaceName $WorkspaceName | Select-Object -ExpandProperty Name
} 
Write-Output "Found $($SQLScripts.Count) Synapse SQL-Scripts that are not in the Artifact"

# Loop through collection of SQL scripts to delete them
$SQLScripts | ForEach-Object -Process {
    Write-Output "Deleting SQL-script $($_)"
    Remove-AzSynapseSqlScript -Name $($_) -WorkspaceName $WorkspaceName -Force
}
Write-Output "Step 'Remove SQL-Scripts not in Artifact' completed`r`n"



#######################################################
# 6) Remove Datasets not in Artifact
#######################################################
Write-Output "==========================================="
Write-Output "6) Remove Datasets not in Artifact"
Write-Output "==========================================="

# Check if artifact contains any datasets
if (Test-Path -Path (Join-Path $ArtifactDirectory "dataset"))
{
    # Getting all Datasets from Artifact
    $ArtifactDatasets = Get-ChildItem -Path (Join-Path $ArtifactDirectory "dataset") -Filter "*.json" | Select-Object -ExpandProperty BaseName

    # Getting all Datasets from Synapse
    $SynapseDatasets = Get-AzSynapseDataset -WorkspaceName $WorkspaceName | Select-Object -ExpandProperty Name

    # Getting Datasets from Synapse that are not in the Artifact
    $Datasets = Compare-Object -ReferenceObject $ArtifactDatasets -DifferenceObject $SynapseDatasets | Select-Object -ExpandProperty InputObject
} else {
    # Fill collection with all existing Datasets to remove them 
    # because artifact doesn't contain any Datasets anymore
    Write-Output "Path not found in Artifact, removing all existing Datasets in Synapse Workspace"
    $Datasets = Get-AzSynapseDataset -WorkspaceName $WorkspaceName | Select-Object -ExpandProperty Name
}
Write-Output "Found $($Datasets.Count) Synapse Datasets that are not in the Artifact"

# Loop through collection of Datasets to delete them
$Datasets | ForEach-Object -process { 
    Write-Output "Deleting Dataset $($_)"
    Remove-AzSynapseDataset -Name $_ -WorkspaceName $WorkspaceName -Force
}
Write-Output "Step 'Remove Datasets not in Artifact' completed`r`n"



#######################################################
# 7) Remove Linked Services not in Artifact
#######################################################
Write-Output "==========================================="
Write-Output "7) Remove Linked Services not in Artifact"
Write-Output "==========================================="

# Check if artifact contains any Linked Services
if (Test-Path -Path (Join-Path $ArtifactDirectory "linkedService"))
{
    # Getting all Linked Services from Artifact
    $ArtifactLinkedServices = Get-ChildItem -Path (Join-Path $ArtifactDirectory "linkedService") -Filter "*.json" | Where-Object {($_ -NotLike "*WorkspaceDefaultSqlServer.json" -and  $_ -NotLike "*WorkspaceDefaultStorage.json") } | Select-Object -ExpandProperty BaseName
    
    # Getting all Linked Services from Synapse
    $SynapseLinkedServices = Get-AzSynapseLinkedService -WorkspaceName $WorkspaceName | Where-Object {($_.Name -ne $WorkspaceDefaultSqlServer -and  $_.Name -ne $WorkspaceDefaultSqlStorage) } | Select-Object -ExpandProperty Name

    # Getting Linked Services from Synapse that are not in the Artifact
    $LinkedServices = Compare-Object -ReferenceObject $ArtifactLinkedServices -DifferenceObject $SynapseLinkedServices | Select-Object -ExpandProperty InputObject
} else {
    # Fill collection with all existing Linked Services to remove them 
    # because artifact doesn't contain any Linked Services anymore
    Write-Output "Path not found in Artifact, removing all existing Linked Services in Synapse Workspace"
    $LinkedServices = Get-AzSynapseLinkedService -WorkspaceName $WorkspaceName | Where-Object {($_.Name -ne $WorkspaceDefaultSqlServer -and  $_.Name -ne $WorkspaceDefaultSqlStorage) } | Select-Object -ExpandProperty Name
}
Write-Output "Found $($LinkedServices.Count) Synapse Linked Services that are not in the Artifact"

# Trying to delete all linked services. If a linked service is still
# referenced by an other linked service it will continue to remove 
# other linked services before trying to remove it again... 
# max 100 times. Example: KeyVault linked services
$depthCount = 0
while ($LinkedServices.Count -gt 0 -and $depthCount -lt 100)
{
     # Loop through collection of Linked Services and try to remove them
    $LinkedServices | ForEach-Object -process { 
        Write-Output "Trying to delete Linked Service $($_)"
        Remove-AzSynapseLinkedService -Name $_ -WorkspaceName $WorkspaceName -Force -ErrorAction Continue
    }

    # Wait 2 seconds before retry and raise retry counter
    Start-Sleep 2 
    $depthCount += 1

    # Check if artifact contains any Linked Services
    if (Test-Path -Path (Join-Path $ArtifactDirectory "linkedService"))
    {
        # Getting all Linked Services from Synapse
        $SynapseLinkedServices = Get-AzSynapseLinkedService -WorkspaceName $WorkspaceName | Where-Object {($_.Name -ne $WorkspaceDefaultSqlServer -and  $_.Name -ne $WorkspaceDefaultSqlStorage) } | Select-Object -ExpandProperty Name

        # Getting Linked Services from Synapse that are not in the Artifact
        $LinkedServices = Compare-Object -ReferenceObject $ArtifactLinkedServices -DifferenceObject $SynapseLinkedServices | Select-Object -ExpandProperty InputObject
    } else {
        # Fill collection with all existing Linked Services to remove them 
        # because artifact doesn't contain any Linked Services anymore
        $LinkedServices = Get-AzSynapseLinkedService -WorkspaceName $WorkspaceName | Where-Object {($_.Name -ne $WorkspaceDefaultSqlServer -and  $_.Name -ne $WorkspaceDefaultSqlStorage) } | Select-Object -ExpandProperty Name
    }
    
    # Check if there are any Linked Services left for a retry
    if ($LinkedServices.count -gt 0)
    {
        Write-Output "Still found $($LinkedServices.Count) Synapse Linked Services that are not in the Artifact. Starting next iteration."
    } else {
        Write-Output "Deletion of Linked Services not in Artifact completed"
    }
}

# Error when you have circulair Linked Services links or just way to many levels
if ($depthCount -eq 100)
{
    throw "Too many levels of references to other Linked Services!"
}
Write-Output "Step 'Remove Linked Services not in Artifact' completed"
Write-Output "==========================================="


There is a check at the start that will check whether it can find some expected sub folders in the supplied artifact path. Otherwise it will asume that you don't have any datasets, pipelines, etc. and cleanup your entire Synapse Workspace.






If nothing has to be cleaned/removed the script is ready within seconds.
No cleanup necessary



















If items do have to be removed it is still ready in a few minutes instead of half an hour. Notice the iterations for cleaning up the pipelines. It will try to delete a pipeline, but if it is still used by an other pipeline then it will continue with the next pipeline and try again afterwards. The same construction is used for Linked Services where for example a Key Vault Linked Service can still be used by an other Linked Service.
Some cleanup was necessary




















Conclusions
In this post you learned how to clean your Synapse Workspace much more efficiently by comparing the Workspace and the Artifact. This way you don't have to delete each item in your workspace, but only the ones you deleted during development.

Note that not all parts of Synapse are available in this clean up script. For example KQL scripts and Dataflows are still missing. They will be added later on. If you want to add those yourself make sure they are on the right place within the script Dataflows should probably added before the pipelines and the KQL scripts in front of or after the SQL Scripts. Feel free to let us know if you have any improvements for this script that you would like the share with the community.

Special thanks to colleague Joan Zandijk for helping out.

Sunday 26 March 2023

Synapse - Change Data Feed (CDF) on delta tables

Case
I like the data versioning of the Delta Tables and I know how to get data from different versions, but how can I combine that in one query to get for example the changes during a year to create a nice fact table about certain changes.
Change Data Feed in Synapse




















Solution
Change Data Feed (CDF) is still a bit new. The currently supported Delta Lake version in the Synapse workspace is 2.2.0. This version does not yet not support CDF for SQL queries. This shoud be available in Delta Lake 2.3.0 according to the release documentation. Luckily you can already use PySpark to get this information.
Current Delta Lake version in Synapse










1) Enable Change Data Feed
First you have to enable the Change Data Feed option on your Delta table. From that point in time you can use CDF. The property is called enableChangeDataFeed.

You can alter your existing tables with an Alter statement
%%sql

ALTER TABLE silver.Cities
  SET TBLPROPERTIES (delta.enableChangeDataFeed = true);
For new Delta Tables you can also do this in the Create Table command.
%%sql

CREATE TABLE Silver.Cities (Id INT, City STRING, Population INT)
  TBLPROPERTIES (delta.enableChangeDataFeed = true);
And if you used the PySpark code from our previous post, then you can add an option just in front of the save.
sdf.write.format('delta').option("delta.enableChangeDataFeed", "true").save(delta_table_path)
To check whether it is enabled on your Delta Table you can use the following command.
%%sql

SHOW TBLPROPERTIES silver.cities
CDF is enabled













2) Check available data versions
Now that we have the Change Data Feed option available, lets check which data versions we have with the DESCRIBE HISTORY command. In the first example you will see that CDF is enabled after table creation in the second version (1). This means you can not include the first version (0) in the CDF command.

You will get an error if you set the range wrong while getting CDF info:
AnalysisException: Error getting change data for range [0 , 4] as change data was not
recorded for version [0]. If you've enabled change data feed on this table,
use `DESCRIBE HISTORY` to see when it was first enabled.
Otherwise, to start recording change data, use `ALTER TABLE table_name SET TBLPROPERTIES
(delta.enableChangeDataFeed=true)`.
CDF available from version 1







In the second example it was enabled during the Delta table creation and therefore CDF is available from the first version (0).
CDF available from version 0








3) Query CDF data
When you query the CDF data you will get some extra columns:
  • _change_type: showing what action was taken to change the data - insert, update_preimage, update_postimage and delete
  • _commit_version: showing the version number of the data
  • _commit_timestamp: showing the timestamp of the data change
If you want particular versionnumbers when getting the data, then you can use startingVersion and endingVersion as an option while reading the data. Only startingVersion is also permitted.
%%pyspark

df = spark.read.format("delta") \
  .option("readChangeFeed", "true") \
  .option("startingVersion", 0) \
  .option("endingVersion", 5) \
  .table("Silver.cities")

display(df.sort("City","_commit_version"))
Filter CDF on version numbers













Probably more useful is to query date ranges, then you can use startingTimestamp and endingTimestamp as an option. Only startingTimestamp is also permitted.
%%pyspark

df = spark.read.format("delta") \
  .option("readChangeFeed", "true") \
  .option("startingTimestamp", '2023-03-26 11:07:23.008') \
  .table("Silver.cities")

display(df.sort("City","_commit_version"))
Filter CDF on timestamps














If you want to use the new column _commit_timestamp from the next record to create a new column called end_timestamp in the current record, then you need to play with the lead() function (just like in TSQL).
%%pyspark

from pyspark.sql.window import Window
from pyspark.sql.functions import lead 

df = spark.read.format("delta") \
  .option("readChangeFeed", "true") \
  .option("startingTimestamp", '2023-03-26 11:07:23.008') \
  .table("Silver.cities")

# Create window for lead
windowSpec  = Window.partitionBy("City").orderBy("_commit_version")

# Remove update_preimage records, add new column with Lead() and then sort
display(df.filter("_change_type != 'update_preimage'")
           .withColumn("_end_timestamp",lead("_commit_timestamp",1).over(windowSpec))
           .sort("City","_commit_version"))
Create end_timestamp with lead function











Conclusions
In this post you learned the basics of the Change Data Feed options in Synapse. This feature is available in Delta Lake 2.0.0 and above, but it is still in experimental support mode. For now you have to use PySpark instead of Spark SQL to query the data in Synapse.

Besides creating nice fact tables to show data changes during a certain periode this feature could also be useful to incremental load a large fact table with only changes from the silver layer. Creating audit trails for data changes over time could also be an interesting option. The CDF option is probably the most useful when there are not that many changes in a table.

In a later post, when Delta Lake 2.3.0 is available in Synapse, we will explain the Spark SQL options for CDF. Special thanks to colleagues Roelof Jonkers and Martijn Broeks for helping out.



Monday 20 March 2023

Synapse - Creating Silver Delta Tables

Case
I want to create and fill a Silver layer based on parquet files in my bronze layer. Is there a simple way to create and populate the tables automatically.
Adding files to your Silver layer












Solution
You can create a notebook for this and then call that notebook from your Synapse pipeline with some parameters (location, table name and keys). This allows you to for example loop through all your ingested source files from the bronze (raw/ingest) layer and then call this notebook for each file to add them to the Silver layer. We can also add the silver tables directly to the Lake database for easy querying later on.

Note: that this example is a technical source based Silver layer. So not realy cleansed, curated or conformed.

1) Create notebook
Go to the developer tab in Synapse and create a new Notebook. Give it a suitable name and make sure the language is PySpark. Sooner or later you want to test this Notebook, so attach it to a SparkPool. Optionally you can add a Markdown cell to explain this notebook.
New Synapse Notebook









2) Code cell 1: parameters
The first code cell is for the parameters that can be overridden by parameters from the Notebook activity in the pipeline. Toogle the parameters option to make is a parameter cell. For more details see our post about notebook parameters. For debugging within the notebook we used real values.

For this example everything (bronze and silver) is in the same container. So you might want to add more parameters to split those up. This example uses parquet files as a source. If you want for example CSV then you need to change the format in the mail code to fill the Spark Data Frame with data.
# path of the data lake container (bronze and silver for this example)
data_lake_container = 'abfss://mysource@datalakesvb.dfs.core.windows.net'
# The ingestion folder where your parquet file are located
bronze_folder = 'Bronze'
# The silver folder where your Delta Tables will be stored
silver_folder = 'Silver'
# The name of the table
table_name = 'SalesOrderHeader'
# The wildcard filter used within the bronze folder to find files
source_wildcard = 'SalesOrderHeader*.parquet'
# A comma separated string of one or more key columns (for the merge)
key_columns_str = 'SalesOrderID'
Parameters








3) Code cell 2: import modules and functions
The second code cell is for importing all required/useful modules. For this basic example we have only one import:
# Import modules
from delta.tables import DeltaTable
Import Delta Table module








3) Code cell 3: filling delta lake
Now the actual code for filling the delta lake tables with parquet files from the data lake. Note: code is very basic. It checks whether the Delta Lake table already exists. If not it creates the Delta Lake table and if it already exists it merges the new data into the existing table. If you have transactional data then you could also do an append instead of a merge.

# Convert comma separated string with keys to array
key_columns = key_columns_str.split(',')  

# Convert array with keys to where-clause for merge statement
conditions_list = [f"existing.{key}=updates.{key}" for key in key_columns]

# Determine path of source files from ingest layer
source_path = data_lake_container + '/' + bronze_folder + '/' + source_wildcard 

# Determine path of Delta Lake Table 
delta_table_path = data_lake_container + '/' + silver_folder + '/' + table_name

# Read file(s) in spark data frame
sdf = spark.read.format('parquet').option("recursiveFileLookup", "true").load(source_path)

# Check if the Delta Table exists
if (DeltaTable.isDeltaTable(spark, delta_table_path)):
    print('Existing delta table')
    # Read the existing Delta Table
    delta_table = DeltaTable.forPath(spark, delta_table_path)

    # Merge new data into existing table
    delta_table.alias("existing").merge(
        source = sdf.alias("updates"),
        condition = " AND ".join(conditions_list)
        
    ).whenMatchedUpdateAll(
    ).whenNotMatchedInsertAll(
    ).execute()

    # For transactions you could do an append instead of a merge
    # sdf.write.format('delta').mode('append').save(delta_table_path)

else:
    print('New delta table')
    # Create new delta table with new data
    sdf.write.format('delta').save(delta_table_path)
Adding data to new or existing Delta Table



















4) Code cell 4: Adding Delta Table to Lake Database
The last step is optional, but very useful: adding the Delta Table to the Lake Database. This allows you to query the Delta Table by its name instead of its path in the Data Lake. Make sure you first add a Silver layer to that Lake database. See this post for more details (step 1).
# Adding the Delta Table to the Delta Database for easy querying in other notebooks or scripts within Synapse.
spark.sql(f'CREATE TABLE IF NOT EXISTS Silver.{table_name} USING DELTA LOCATION \'{delta_table_path}\'')

# Spark SQL version
#  CREATE TABLE Silver.MyTable
#  USING DELTA
#  LOCATION 'abfss://yourcontainer@yourdatalake.dfs.core.windows.net/Silver/MyTable'
Adding Delta Table to Lake Database








5) Creating Pipeline
Now it is time to loop through your ingested files and call this new Notebook for each file to create the Silver Layer Delta Tables. You have to provide values for all parameters in the notebook. Since you need the key column(s) of each table to do the merge you probably need to store these somewhere.

For the ingestion we often store the table/file names from each source that we want to download to the data lake in a meta data table. In this table we also store the key column(s) from each table.

Call Notebook in ForEach loop













Synapse doesn't retrieve the parameters from the Notebook. You have to add them manually as Base parameters in the Settings tab.
Calling Notebook
















If you enter a column or set of columns for the key that are not unique you will get an error the second time you run (first time the merge is not used). 
Cannot perform Merge as multiple source rows matched and attempted to modify the same target row in the Delta table in possibly conflicting ways. By SQL semantics of Merge, when multiple source rows match on the same target row, the result may be ambiguous as it is unclear which source row should be used to update or delete the matching target row. You can preprocess the source table to eliminate the possibility of multiple matches.

6) Result
Now you can run your pipeline and check whether the silver layer of you Lake database is populated with new tables. And you can create a new notebook with Spark SQL or PySpark to check the contents of the tables to see wether the Time Travel works.
Running the pipeline that calls the new Notebook












Delta Lake folders in the Data Lake





















Conclusions
In this post you learned how to create and populate a (source based) silver layer of your Lake House Delta Tables. An easy quick start for your lake house. If you have multiple sources with similar data then you should also consider creating a real cleansed, curated and conformed silver layer manually. In a later post we will show you some of those manual steps in Spark SQL or PySpark.

Special thanks to colleague Heleen Eisen for helping out with the PySpark.