Showing posts with label NOTEBOOK. Show all posts
Showing posts with label NOTEBOOK. Show all posts

Tuesday, 1 October 2024

Synapse - Invalid notebook reference

Case
During the deployment of Synapse I'm getting an invalid referene error for my notebooks. They are referencing Spark pools that only exists in the Development workspace.























The document creation or update failed because of invalid reference 'SparkPoolJoost' 
An error occurred during execution: Error: Failed to fetch the deployment status 
{"code":"400","message":"CreateOrUpdateNotebook failed: [statusCode from ADF:BadRequest, 
ErrorMessage: {\"code\":\"BadRequest\",\"message\":\"The document creation or update failed because of invalid reference 'bitools'.\",
\"target\":\"/subscriptions/aaaaaaaa-bbbb-cccc-dddd-eeeeeeeeeeee/resourceGroups/rg-bitools/providers/Microsoft.Synapse/workspaces/bitools-tst/notebooks/myNotebook\",
\"details\":null,\"error\":null}, workspace: yp-tst, notebook: myNotebook, ArtifactId: a4581d64-96d3-4041-9ac4-ccc0d7235cc4]"}

Solution
This deployment error often happens when you don't have the same set of Spark pools in each environment of your DTAP. If one of your notebooks is still referencing a Spark Pool that doesn't exists in the target workspace then the deploymenty proces will throw an invalid reference error.

We often have one general Spark Pool doing all the work and that is available in each Synapse workspace, but also have a whole bunch of Spark Pools in development letting multiple developers do some work without getting in each other's way. Nothing is more enoying then waiting for a colleague to release the spark nodes. 

This deployment error can easily be solved by changing the attached Spark Pool in the notebook before you start deploying. This is ofcourse taking a lot of extra time (and patience) each time you forget to select the right Spark Pool that exists in all workspaces.
Attached Spark Pool







You can make it a little less enoying by adding an override to the YAML task Synapse workspace deployment@2 for the notebook property bigDataPool referenceName. You can even do this manually for a couple of notebooks, but the more notebooks you have the more enoying it gets.
###################################
# Validate and Deploy Synapse
###################################
- task: Synapse workspace deployment@2
  displayName: 'Validate and Deploy Synapse'
  inputs:
	operation: validateDeploy
	ArtifactsFolder: '$(Pipeline.Workspace)/SynapseArtifact'
	azureSubscription: ${{ parameters.ServiceConnection }}
	ResourceGroupName: ${{ parameters.Synapse_ResourceGroupName }}
	TargetWorkspaceName: ${{ parameters.Synapse_WorkspaceName }}
	DeleteArtifactsNotInTemplate: true
	DeployManagedPrivateEndpoints: true
	OverrideArmParameters: '
	  -LS_AKV_Secrets_properties_typeProperties_baseUrl				https://${{ parameters.KeyVault_Name }}.vault.azure.net/
	  -LS_ADLS_Datalake_properties_typeProperties_url				https://${{ parameters.Datalake_Name }}.dfs.core.windows.net/
	  -LS_ASQL_Metadata_connectionString							${{ parameters.Metadata_Connectionstring }}
	  -NB_myFirstNotebook_properties_bigDataPool_referenceName		${{ parameters.Synapse_SparkpoolName }}
	  -NB_mySecondNotebook_properties_bigDataPool_referenceName		${{ parameters.Synapse_SparkpoolName }}
	  -NB_myThirdNotebook_properties_bigDataPool_referenceName		${{ parameters.Synapse_SparkpoolName }}
	'

You can make it even easier by generating an override for each notebook in your Synapse workspace. For this we need these three steps:

1) Create variable
First create a YAML variable in your pipeline. For this example we used the name OverrideParams. The value is just an empty string.
jobs:
    - deployment: DeploymentJob${{ parameters.Env }}
      displayName: Deployment Job ${{ parameters.Env }} 
      environment: Deploy-to-${{ parameters.Env }}
      variables: 
      - name: OverrideParams
        value: ""

      strategy:
        runOnce:
          deploy:
            steps:

2) Add PowerShell task
Next is adding a PowerShell task that will be filling the above variable with an override list. The PowerShell loops through all your notebooks in the artifact. It creates one override for each notebook and stores it in a PowerShell string variable. The last step is to use the PowerShell variable to fill the YAML variable.
            ##################################
            # Edit Notebook sparkpool reference
            ##################################
            - powershell: |
                # Determine notebook subfolder in synapse artifact
                $Path = Join-Path -Path "$(Pipeline.Workspace)" -ChildPath "SynapseArtifact\notebook\"

                # Get all notebook files
                $notebooks = Get-ChildItem -Path $Path

				# Create string variable for all overrides
                [string]$overridelist = ""

                # Loop through notebook files
                foreach ($notebook in $notebooks)
                {
				  # Generate an override for each notebook, make sure to end
				  # with a space to separate each override. Don't use a line
				  # feed or carriage return, because the value should be 1 line
                  $overridelist += "-$($notebook.Basename)_properties_bigDataPool_referenceName   mysparkpool "
                }
				# Show list for debug purposes
                Write-Host "overridelist:`r`n$($overridelist)"
				# Fill the YAML variable value with the value of the PowerShell variable
                Write-Host "##vso[task.setvariable variable=OverrideParams;]$overridelist"
              displayName: 'Edit Notebook sparkpool reference'
Note if you are using the template files then you have to create an alternative loop

3) Use YAML variable in Override
Last step is to add the YAML variable in the OverrideArmParameters part. Now run your deployment and see your invalid reference errors disappear!
            ###################################
            # Validate and Deploy Synapse
            ###################################
            - task: Synapse workspace deployment@2
              displayName: 'Validate and Deploy Synapse'
              inputs:
                operation: validateDeploy
                ArtifactsFolder: '$(Pipeline.Workspace)/SynapseArtifact'
                azureSubscription: ${{ parameters.ServiceConnection }}
                ResourceGroupName: ${{ parameters.Synapse_ResourceGroupName }}
                TargetWorkspaceName: ${{ parameters.Synapse_WorkspaceName }}
                DeleteArtifactsNotInTemplate: true
                DeployManagedPrivateEndpoints: true
                OverrideArmParameters: '
				  -LS_AKV_Secrets_properties_typeProperties_baseUrl				https://${{ parameters.KeyVault_Name }}.vault.azure.net/
				  -LS_ADLS_Datalake_properties_typeProperties_url				https://${{ parameters.Datalake_Name }}.dfs.core.windows.net/
				  -LS_ASQL_Metadata_connectionString							${{ parameters.Metadata_Connectionstring }}
                  $(OverrideParams)
                  '
Conclusion
This solution uses a little, fairly simple PowerShell script so solve all your invalid reference errors during deployment. It doesn't change the Spark Pool in the Notebook Activity, but just the default Spark Pool in the notebook itself. So you can still have multiple Spark Pools for various jobs if you set it in the Notebook Activity.

As mentioned before, if you use the template files TemplateForWorkspace.json and TemplateParametersForWorkspace.json for the deployment then you have to retrieve the JSON objects for the notebooks in those files and create a similar loop as in step 2.



Sunday, 22 September 2024

Synapse - Run all notebooks in a foreach loop

Case
I have several notebooks to create/fill Silver and Gold tables which I want to execute. Is their a way to execute several notebooks in a foreach loop? That would help me not having to create and maintain a pipeline with just lots of Notebook activities.

Notebook activity in FOREACH
















Solution
The properties of the Notebook activity can be overridden with an expression in the Dynamic Content section. This means you can use a collection of notebook names and execute them within a Foreach activity. 

The smart part of the solution is creating the collection of notebooks so that you can iterate it, but is also always up to date. This solution uses the Synapse rest API to get all notebooks from Synapse and then use a filter to only get a selection of all those notebook.

The hard part is that the rest API will return mulitple pages with notebooks if you have a lot of notebooks. The Entire solutions looks like this. Only the last part is for looping and executing the notebooks. The first few activities are all for retrieving the collection of notebooks. It perhaps looks a bit complex, but looks can be deceiving.
The solution











1) Parameter
This solution uses one string pipeline parameter to provide a folder path that we can use to filter out a selection of notebooks. Name of the parameter is NotebookFolderPath.
String parameter to get a selection of notebooks











2) Variables
The UNTIL loop to retrieve all notebooks uses three variables. The first is a String variable containing the URL of the rest API: SynapseRestAPI. The second and third are Array variables to store the response of the Rest API: Notebooks and Notebooks_Temp.
The pipeline variables to retrieve and store the notebooks














3) Determine Rest API URL
The Rest API will return multiple pages with details of all notebooks if you exceed the max number of notebooks per page. Therefore we will first determine the initial URL for the first page before the UNTIL loop. Lateron the URL of the successive pages is retrieved within the UNTIL loop by reading the response of the Rest API.

For the initial URL we need the name of the Synapse workspace. Since this can be retrieved via an expression, we can make the URL dynamic:
@concat(
    'https://'
    ,pipeline().DataFactory
    ,'.dev.azuresynapse.net/notebooks?api-version=2021-04-01'
    )
For this we will start with a Set Variable activity in front of the UNTIL loop.
Set Variable task to determine Rest API URL











4) Until loop
Within the UNTIL loop we use a WEB activity for the first REST API call. The UNTIL loop uses its output to to check whether it contains a property nextLink. If that property is present in the output then there is a next page and this property will contain the REST API URL for it.

The name of the WEB activity is web_GetNotebooks and therefor the expression of the UNTIL is:
@not(
    contains(
            activity('web_GetNotebooks').output
            ,'nextLink'
            )
    )
UNTIL loop









5) Web activity
The first activity in the UNTIL is the WEB activity. It's easier to add this activity to the UNTIL before writing the expression of the step above.
As mentioned above its name is web_GetNotebooks. The important settings are:
  • URL - @variables('SynapseRestAPI')
  • Method - GET
  • Authenication - System-assigned managed identity
  • Resource - https://dev.azuresynapse.net/
And make sure to select your SHIR in the advanced settings if you use one. Also make sure to give the Synapse workspace at least the Reader role to itself otherwise it can't use its own Rest API.
WEB activity for REST API



















6) Union output to temp
Next we need two Set Variable activities in the UNTIL. With the first Set Variable activity we union the output of the Rest API with the value of the Notebooks array variable and store it in the Notebooks_Temp array variable. This has to be a two step task, hence the temp variable. The expression looks like:
@union(
    activity('WEB_GetNotebooks').output.value
    ,variables('Notebooks')
    )

The first iteration the Notebooks variable will still be empty, but for all next iterations it will be filled by the next activity.
Union output to temp array variable









7) Use temp to fill variable
In the previous activity we filled the Notebooks_Temp variable. In this next step we store the value of Notebooks_Temp in the main array variable Notebooks. Then we can use this value to union it in the next iteration.
Store temp value in main variable












8) Determine URL of next Rest API call
The last activity in the UNTIL is to check whether there a next page. If there is we will fill the string variable SynapseRestAPI with the URL and if not we will fill it with an empty string that will break the UNTIL loop.
@if(
    contains(
            activity('WEB_GetNotebooks').output
            , 'nextLink'
            )
    ,activity('WEB_GetNotebooks').output.nextLink
    ,''
    )
Retrieve URL of next page










9) Filter notebook array
The notebook array is now filled with all published notebooks. If you only want certain notebooks from a specific folder then we need to add a FILTER Activity. The path for filtering is retrieved from the pipeline parameter. The startswith will also retrieve notebooks from all subfolders. Replace with EQUALS if you don't want that:
@if(
    not(
        empty(item().properties.folder)
        )
    , startswith(
        item().properties.folder.name
        , pipeline().parameters.NotebookFolderPath
        )
    , false
    )
Filter notebooks by folder path









10) Foreach notebook
Now you can use the output of the FILTER Activity in the FOREACH loop. You should also tune the FOREACH settings to the available Spark node. If you use a small node then you probably won't run 20 notebooks at a time.
Foreach Notebook









11) Execute Notebook
Last step of the solution is executing all notebooks via the Notebook Activity. The notebook name property should be filled with an expression: @item().name
Execute all notebooks












The loop will ofcourse only work if the parameters and settings for each notebook are all equal. Otherwise you will have a lot of expression work to do making the solution probably to difficult to maintain.

Tip
In the User property tab of the Notebook activity you can add a few properties which you can show in the output making it very handy for debugging. In this case there is a NotebookName property with the expression @{item().name} and a FolderPath property with the expression @{item().properties.folder.name}.
Adding User Properties

















Now when debugging your pipeline you can add those properies as columns. Click on the icon in the User Property column to see the available columns. This is useful for loops like UNTIL and FOREACH. Now you can forexample instantly see which notebook fails and which one succeeds without checking the input of each Notebook activity.
Showing user properties in Output window










Conclusion
This solution will make it easy to iterate through a whole bunch of notebooks without adding them one by one to a pipeline. You will have to use folders for your notebooks (or a certain notebook naming convention) if you don't want them all to be executed. In development this only works if you first publish the notebooks, because the Rest API only returns published/live notebooks. Don't forget to use the User Properties tip to make debugging a lot easier.


Sunday, 5 December 2021

Delta Lake support in Azure Synapse Analytics

Case
Delta Lake is already widely used in Azure Data Bricks, but now it is also available in Synapse Analytics. How can I use it there to store history and do 'time travel' in my historical data?
Synapse now supports Delta Lake











Solution
Delta Lake is now so called General Available (GA) in Synapse Analytics, but at the time of writing Microsoft is still implementing new Delta Lake features in Synapse. 

This example uses a Synapse Pipeline with a Copy Data Activity to ingest data from the source and then calls a Python Delta Lake script (other languages are possible) via the Notebook activity.












Prerequisites
You need to make sure that you (for debugging) and the Managed Service Identity (MSI) of your Synapse Analytics workspace have access to the Azure Data Lake with the Role Storage Blob Data Contributor.
  • In the Azure Portal go to the Storage Account used by the Synapse Analytics workspace
  • In the left menu click on Access Control (IAM)
  • Click on + Add and choose Add role assignment
  • Search for Storage Blob Data Contributor, select the role and click on Next
  • Click on + Select members and find your Synapse workspace and find yourself and click Select
  • Optionally add an description about the why. Then click on Review + assign (twice)
At time of writing our Apache Spark Pool uses version 3.1 with Delta Lake 1.0. If you are using an older version (2.4) of Spark then you get Delta Lake version 0.6 which is slightly different. If newer versions appear then just try the newest Spark Pool.
Apache Spark pool Additional Settings




















1) Code cell 1: parameters
The first code cell is for the parameters that can be overridden by parameters from the Notebook activity in the pipeline. For more details see our post about notebook parameters. For debugging within the notebook we used real values.
# path of the data lake container
data_lake_container = 'abfss://yourbronzecontainer@yourdatalake.dfs.core.windows.net'

# The ingestion folder where your parquet file are located
ingest_folder = 'parquetstage'

# The bronze folder where your Delta Tables will be stored
bronze_folder = 'bronze'

# The name of the table
table_name = 'residences'

# The wildcard filter used within the bronze folder to find files
source_wildcard = 'residences*.parquet'

# A comma separated string of one or more key columns (for the merge)
key_columns_str = 'Id'
Parameters









2) Code cell 2: import modules and functions
The second code cell is for importing all required/useful modules. For this basic example we two import s:
  • DeltaTable.delta.tables for handling delta tables
  • notebookutils for file system utilities (removing delta table folder)
# Import modules
from delta.tables import DeltaTable
from notebookutils import mssparkutils
Imports







3) Code cell 3: filling delta lake
Now the actual code for filling the delta lake tables with parquet files from the data lake. Note: code is very basic. It checks whether the Delta Lake table already exists. If not it creates the Delta Lake table and if it already exists it merges the new data into the existing table. If you have transactional data then you could also do an append instead of a merge.

# Convert comma separated string with keys to array
key_columns = key_columns_str.split(',')  
 
# Convert array with keys to where-clause for merge statement
conditions_list = [f"existing.{key}=updates.{key}" for key in key_columns]
 
# Determine path of source files from ingest layer
source_path = os.path.join(data_lake_container_bronze, ingest_folder, source_wildcard)
 
# Determine path of Delta Lake Table 
delta_table_path = os.path.join(data_lake_container_bronze, bronze_folder, table_name)

# Read file(s) in spark data frame
sdf = spark.read.format('parquet').option("recursiveFileLookup", "true").load(source_path)
 
# Check if the Delta Table exists
if (DeltaTable.isDeltaTable(spark, delta_table_path)):
    print('Existing delta table')
    # Read the existing Delta Table
    delta_table = DeltaTable.forPath(spark, delta_table_path)
 
    # Merge new data into existing table
    delta_table.alias("existing").merge(
        source = sdf.alias("updates"),
        condition = " AND ".join(conditions_list)
         
    ).whenMatchedUpdateAll(
    ).whenNotMatchedInsertAll(
    ).execute()
 
    # For transactions you could do an append instead of a merge
    # sdf.write.format('delta').mode('append').save(delta_table_path)
 
else:
    print('New delta table')
    # Create new delta table with new data
    sdf.write.format('delta').save(delta_table_path)
Adding file to Delta Lake
















4) Viewing the Delta Table in notebook
If you run the notebook with the code of the first three steps a couple of times with changed/extra/less records then history will be build in the delta table. For debugging purposes you can add an extra code cell to view the data and the various versions of the data.

To check the current version of the data you can use the following code:
display(spark.read.format('delta').load(delta_table_path))
Get current version of data













And with this code you can investigage the history versions of the data. In this case there are two versions:
# Get all versions
delta_table = DeltaTable.forPath(spark, delta_table_path)
display(delta_table.history())
Get versions of data








To retrieve one specific version you could use something like this (where the 0 is the version from the above picture):
# Get one specific version
display(spark.read.format("delta").option("versionAsOf", "0").load(delta_table_path))
Get specific version of data












You can also use a datetime to retrieve data from the Delta Lake by using timestampAsOf instead of versionAsOf:
# Get one specific version with timestamp filter
display(spark.read.format("delta").option("timestampAsOf", "2021-12-05 19:07:00.000").load(delta_table_path))
Get specific version of data with datetime filter













To remove the entire Delta Lake table (and all history) you could use something like:
# Delete Delta Table (folder)
mssparkutils.fs.rm(delta_table_path, recurse=True)
Delete Delta Table







4) Viewing the Delta Table in Serverless SQL Pool
At the moment of writing you can query the Detla Lake in a Serverless SQL Pool, but you cannot yet use the 'time-travel' feature. Please upvote this feature here.

The first option is to use an OPENROWSET query within a SQL Script in your Synapse Workspace:
-- Query the Delta Lake
SELECT TOP 10 *
FROM OPENROWSET(
    BULK 'abfss://yourcontainer@yourdatalake.dfs.core.windows.net/deltalake/places/',
    FORMAT = 'delta') as rows
ORDER BY Id;
Query the Delta Lake via an OPENROWSET query



















A second option is using Polybase by creating an External Table on the Delta Lake. This does requery you to create a database within the Serverless SQL Pool because you can't do that on the master database.
-- Query the Delta Lake

-- Create database because it wont work on the master database
CREATE DATABASE MyDwh;

-- Create External Data Source
CREATE EXTERNAL DATA SOURCE DeltaLakeStorage
WITH ( location = 'abfss://yourcontainer@yourdatalake.dfs.core.windows.net/deltalake/' );

-- Create External File Format
CREATE EXTERNAL FILE FORMAT DeltaLakeFormat
WITH ( FORMAT_TYPE = DELTA );

-- Create External Table
CREATE EXTERNAL TABLE Residence (
     Id int,
     Residence VARCHAR(50)
) WITH (
        LOCATION = 'places', --> the root folder containing the Delta Lake files
        data_source = DeltaLakeStorage,
        FILE_FORMAT = DeltaLakeFormat
);

-- Get Data from your Delta Lake Table
SELECT          TOP 10 * 
FROM            Residence
ORDER BY        Id

























Conclusion
In this post you learned how to create and query a Delta Lake within your Synapse Analytics Workspace. The main advantage is of course that you now don't need Azure Data Bricks if you are already using Synapse. Making your Data Platform architecture just slightly more clearer and easier. 

A disadvantage, at the moment of writing, is the lack of time-traveling withing the Serverless SQL Pool environment. This means you're now forced to use notebooks to create your Data Warehouse when the latest version of your data is just not enough. So please upvote this feature here. There are some more limitations and know issues in the current version, but we think at least some of them will be solved in feature updates.

Thanks to colleague Jeroen Meidam for helping!


Saturday, 27 November 2021

Synapse pipeline pass parameter to notebook

Case
I have a Synapse workspace notebook that I call from a Synapse pipeline, but I want to make it more flexible by adding parameters. How do you add parameters to a notebook and fill them via a pipeline?
Adding Parameters to your Synapse Notebook
























Solution
You can add variables to a special Code cell in the notebook and then use those as parameters within the Notebook activity. At the moment there is no real gui retrieving the parameters from the Notebook so you have to copy the names from the notebook to the Notebook activity in the pipeline.

1) Add Code cell for parameters
We need to add a Code cell and change it in to a parameter cell. Note that you can have only one parameter cell in your notebook. You want to add it somewhere at the top so that you can use its variables/parameters in the cells below this parameter cell.
  • Go to your notebook and add a new Code cell
  • Move it up. It should probably be your top code cell allowing you to use it in the cells below.
  • Click in the cell and then on the ellipsis button of that cell (button up right with three dots)
  • Choose Toggle parameter cell and you will see the word Parameters appear in the bottom right corner
Toggle parameter cell










2) Add variables to parameters cell
Next we need to add some code to the parameter cell. Here you just need to add some variables and then each variable can be overridden by the pipeline and be used in the cells below. For debugging it is usefull to give the variables a value. For this example we used python code.
Adding variables








3) Adjust Synapse Notebook activity
Last step is to edit the Synapse Notebook activity and add the parameters. For each variable you added to the parameters cell you can add a paramater in the notebook activity. At the moment there is no smart interface that lets you select a parameter and set its value. You have to set the name and datatype manually.
Adding parameters














4) Testing
Now run the pipeline to see the result. For this example we added a second Code cell with a print function to show that the default values have changed. Trigger the pipeline and go to the Monitor. Then click on your pipeline and within that pipeline on the Notebook activity. If you click on the pencil icon the notebook will open and allow you to see the result.
Click on pencil te open the Notebook













Note the extra cell and the result of the third cell















Conclusion
In this short post you learned how to add parameters to your notebook and fill them via the pipeline. And as an additional bonus you saw how to check the result of the changes. Next step is forexample to add the Notebook to a Foreach loop that ingest data to the datalake and then execute the notebook to create a Delta Lake table for each item in the Foreach loop.