Showing posts with label NOTEBOOK. Show all posts
Showing posts with label NOTEBOOK. Show all posts

Sunday 5 December 2021

Delta Lake support in Azure Synapse Analytics

Case
Delta Lake is already widely used in Azure Data Bricks, but now it is also available in Synapse Analytics. How can I use it there to store history and do 'time travel' in my historical data?
Synapse now supports Delta Lake











Solution
Delta Lake is now so called General Available (GA) in Synapse Analytics, but at the time of writing Microsoft is still implementing new Delta Lake features in Synapse. 

This example uses a Synapse Pipeline with a Copy Data Activity to ingest data from the source and then calls a Python Delta Lake script (other languages are possible) via the Notebook activity.












Prerequisites
You need to make sure that you (for debugging) and the Managed Service Identity (MSI) of your Synapse Analytics workspace have access to the Azure Data Lake with the Role Storage Blob Data Contributor.
  • In the Azure Portal go to the Storage Account used by the Synapse Analytics workspace
  • In the left menu click on Access Control (IAM)
  • Click on + Add and choose Add role assignment
  • Search for Storage Blob Data Contributor, select the role and click on Next
  • Click on + Select members and find your Synapse workspace and find yourself and click Select
  • Optionally add an description about the why. Then click on Review + assign (twice)
At time of writing our Apache Spark Pool uses version 3.1 with Delta Lake 1.0. If you are using an older version (2.4) of Spark then you get Delta Lake version 0.6 which is slightly different. If newer versions appear then just try the newest Spark Pool.
Apache Spark pool Additional Settings




















1) Code cell 1: parameters
The first code cell is for the parameters that can be overridden by parameters from the Notebook activity in the pipeline. For more details see our post about notebook parameters. For debugging within the notebook we used real values.
# path of the data lake container
data_lake_container = 'abfss://yourbronzecontainer@yourdatalake.dfs.core.windows.net'

# The ingestion folder where your parquet file are located
ingest_folder = 'parquetstage'

# The bronze folder where your Delta Tables will be stored
bronze_folder = 'bronze'

# The name of the table
table_name = 'residences'

# The wildcard filter used within the bronze folder to find files
source_wildcard = 'residences*.parquet'

# A comma separated string of one or more key columns (for the merge)
key_columns_str = 'Id'
Parameters









2) Code cell 2: import modules and functions
The second code cell is for importing all required/useful modules. For this basic example we two import s:
  • DeltaTable.delta.tables for handling delta tables
  • notebookutils for file system utilities (removing delta table folder)
# Import modules
from delta.tables import DeltaTable
from notebookutils import mssparkutils
Imports







3) Code cell 3: filling delta lake
Now the actual code for filling the delta lake tables with parquet files from the data lake. Note: code is very basic. It checks whether the Delta Lake table already exists. If not it creates the Delta Lake table and if it already exists it merges the new data into the existing table. If you have transactional data then you could also do an append instead of a merge.

# Convert comma separated string with keys to array
key_columns = key_columns_str.split(',')  
 
# Convert array with keys to where-clause for merge statement
conditions_list = [f"existing.{key}=updates.{key}" for key in key_columns]
 
# Determine path of source files from ingest layer
source_path = os.path.join(data_lake_container_bronze, ingest_folder, source_wildcard)
 
# Determine path of Delta Lake Table 
delta_table_path = os.path.join(data_lake_container_bronze, bronze_folder, table_name)

# Read file(s) in spark data frame
sdf = spark.read.format('parquet').option("recursiveFileLookup", "true").load(source_path)
 
# Check if the Delta Table exists
if (DeltaTable.isDeltaTable(spark, delta_table_path)):
    print('Existing delta table')
    # Read the existing Delta Table
    delta_table = DeltaTable.forPath(spark, delta_table_path)
 
    # Merge new data into existing table
    delta_table.alias("existing").merge(
        source = sdf.alias("updates"),
        condition = " AND ".join(conditions_list)
         
    ).whenMatchedUpdateAll(
    ).whenNotMatchedInsertAll(
    ).execute()
 
    # For transactions you could do an append instead of a merge
    # sdf.write.format('delta').mode('append').save(delta_table_path)
 
else:
    print('New delta table')
    # Create new delta table with new data
    sdf.write.format('delta').save(delta_table_path)
Adding file to Delta Lake
















4) Viewing the Delta Table in notebook
If you run the notebook with the code of the first three steps a couple of times with changed/extra/less records then history will be build in the delta table. For debugging purposes you can add an extra code cell to view the data and the various versions of the data.

To check the current version of the data you can use the following code:
display(spark.read.format('delta').load(delta_table_path))
Get current version of data













And with this code you can investigage the history versions of the data. In this case there are two versions:
# Get all versions
delta_table = DeltaTable.forPath(spark, delta_table_path)
display(delta_table.history())
Get versions of data








To retrieve one specific version you could use something like this (where the 0 is the version from the above picture):
# Get one specific version
display(spark.read.format("delta").option("versionAsOf", "0").load(delta_table_path))
Get specific version of data












You can also use a datetime to retrieve data from the Delta Lake by using timestampAsOf instead of versionAsOf:
# Get one specific version with timestamp filter
display(spark.read.format("delta").option("timestampAsOf", "2021-12-05 19:07:00.000").load(delta_table_path))
Get specific version of data with datetime filter













To remove the entire Delta Lake table (and all history) you could use something like:
# Delete Delta Table (folder)
mssparkutils.fs.rm(delta_table_path, recurse=True)
Delete Delta Table







4) Viewing the Delta Table in Serverless SQL Pool
At the moment of writing you can query the Detla Lake in a Serverless SQL Pool, but you cannot yet use the 'time-travel' feature. Please upvote this feature here.

The first option is to use an OPENROWSET query within a SQL Script in your Synapse Workspace:
-- Query the Delta Lake
SELECT TOP 10 *
FROM OPENROWSET(
    BULK 'abfss://yourcontainer@yourdatalake.dfs.core.windows.net/deltalake/places/',
    FORMAT = 'delta') as rows
ORDER BY Id;
Query the Delta Lake via an OPENROWSET query



















A second option is using Polybase by creating an External Table on the Delta Lake. This does requery you to create a database within the Serverless SQL Pool because you can't do that on the master database.
-- Query the Delta Lake

-- Create database because it wont work on the master database
CREATE DATABASE MyDwh;

-- Create External Data Source
CREATE EXTERNAL DATA SOURCE DeltaLakeStorage
WITH ( location = 'abfss://yourcontainer@yourdatalake.dfs.core.windows.net/deltalake/' );

-- Create External File Format
CREATE EXTERNAL FILE FORMAT DeltaLakeFormat
WITH ( FORMAT_TYPE = DELTA );

-- Create External Table
CREATE EXTERNAL TABLE Residence (
     Id int,
     Residence VARCHAR(50)
) WITH (
        LOCATION = 'places', --> the root folder containing the Delta Lake files
        data_source = DeltaLakeStorage,
        FILE_FORMAT = DeltaLakeFormat
);

-- Get Data from your Delta Lake Table
SELECT          TOP 10 * 
FROM            Residence
ORDER BY        Id

























Conclusion
In this post you learned how to create and query a Delta Lake within your Synapse Analytics Workspace. The main advantage is of course that you now don't need Azure Data Bricks if you are already using Synapse. Making your Data Platform architecture just slightly more clearer and easier. 

A disadvantage, at the moment of writing, is the lack of time-traveling withing the Serverless SQL Pool environment. This means you're now forced to use notebooks to create your Data Warehouse when the latest version of your data is just not enough. So please upvote this feature here. There are some more limitations and know issues in the current version, but we think at least some of them will be solved in feature updates.

Thanks to colleague Jeroen Meidam for helping!


Saturday 27 November 2021

Synapse pipeline pass parameter to notebook

Case
I have a Synapse workspace notebook that I call from a Synapse pipeline, but I want to make it more flexible by adding parameters. How do you add parameters to a notebook and fill them via a pipeline?
Adding Parameters to your Synapse Notebook
























Solution
You can add variables to a special Code cell in the notebook and then use those as parameters within the Notebook activity. At the moment there is no real gui retrieving the parameters from the Notebook so you have to copy the names from the notebook to the Notebook activity in the pipeline.

1) Add Code cell for parameters
We need to add a Code cell and change it in to a parameter cell. Note that you can have only one parameter cell in your notebook. You want to add it somewhere at the top so that you can use its variables/parameters in the cells below this parameter cell.
  • Go to your notebook and add a new Code cell
  • Move it up. It should probably be your top code cell allowing you to use it in the cells below.
  • Click in the cell and then on the ellipsis button of that cell (button up right with three dots)
  • Choose Toggle parameter cell and you will see the word Parameters appear in the bottom right corner
Toggle parameter cell










2) Add variables to parameters cell
Next we need to add some code to the parameter cell. Here you just need to add some variables and then each variable can be overridden by the pipeline and be used in the cells below. For debugging it is usefull to give the variables a value. For this example we used python code.
Adding variables








3) Adjust Synapse Notebook activity
Last step is to edit the Synapse Notebook activity and add the parameters. For each variable you added to the parameters cell you can add a paramater in the notebook activity. At the moment there is no smart interface that lets you select a parameter and set its value. You have to set the name and datatype manually.
Adding parameters














4) Testing
Now run the pipeline to see the result. For this example we added a second Code cell with a print function to show that the default values have changed. Trigger the pipeline and go to the Monitor. Then click on your pipeline and within that pipeline on the Notebook activity. If you click on the pencil icon the notebook will open and allow you to see the result.
Click on pencil te open the Notebook













Note the extra cell and the result of the third cell















Conclusion
In this short post you learned how to add parameters to your notebook and fill them via the pipeline. And as an additional bonus you saw how to check the result of the changes. Next step is forexample to add the Notebook to a Foreach loop that ingest data to the datalake and then execute the notebook to create a Delta Lake table for each item in the Foreach loop.