Showing posts with label SPARK_SQL. Show all posts
Showing posts with label SPARK_SQL. Show all posts

Sunday, 26 March 2023

Synapse - Change Data Feed (CDF) on delta tables

Case
I like the data versioning of the Delta Tables and I know how to get data from different versions, but how can I combine that in one query to get for example the changes during a year to create a nice fact table about certain changes.
Change Data Feed in Synapse




















Solution
Change Data Feed (CDF) is still a bit new. The currently supported Delta Lake version in the Synapse workspace is 2.2.0. This version does not yet not support CDF for SQL queries. This shoud be available in Delta Lake 2.3.0 according to the release documentation. Luckily you can already use PySpark to get this information.
Current Delta Lake version in Synapse










1) Enable Change Data Feed
First you have to enable the Change Data Feed option on your Delta table. From that point in time you can use CDF. The property is called enableChangeDataFeed.

You can alter your existing tables with an Alter statement
%%sql

ALTER TABLE silver.Cities
  SET TBLPROPERTIES (delta.enableChangeDataFeed = true);
For new Delta Tables you can also do this in the Create Table command.
%%sql

CREATE TABLE Silver.Cities (Id INT, City STRING, Population INT)
  TBLPROPERTIES (delta.enableChangeDataFeed = true);
And if you used the PySpark code from our previous post, then you can add an option just in front of the save.
sdf.write.format('delta').option("delta.enableChangeDataFeed", "true").save(delta_table_path)
To check whether it is enabled on your Delta Table you can use the following command.
%%sql

SHOW TBLPROPERTIES silver.cities
CDF is enabled













2) Check available data versions
Now that we have the Change Data Feed option available, lets check which data versions we have with the DESCRIBE HISTORY command. In the first example you will see that CDF is enabled after table creation in the second version (1). This means you can not include the first version (0) in the CDF command.

You will get an error if you set the range wrong while getting CDF info:
AnalysisException: Error getting change data for range [0 , 4] as change data was not
recorded for version [0]. If you've enabled change data feed on this table,
use `DESCRIBE HISTORY` to see when it was first enabled.
Otherwise, to start recording change data, use `ALTER TABLE table_name SET TBLPROPERTIES
(delta.enableChangeDataFeed=true)`.
CDF available from version 1







In the second example it was enabled during the Delta table creation and therefore CDF is available from the first version (0).
CDF available from version 0








3) Query CDF data
When you query the CDF data you will get some extra columns:
  • _change_type: showing what action was taken to change the data - insert, update_preimage, update_postimage and delete
  • _commit_version: showing the version number of the data
  • _commit_timestamp: showing the timestamp of the data change
If you want particular versionnumbers when getting the data, then you can use startingVersion and endingVersion as an option while reading the data. Only startingVersion is also permitted.
%%pyspark

df = spark.read.format("delta") \
  .option("readChangeFeed", "true") \
  .option("startingVersion", 0) \
  .option("endingVersion", 5) \
  .table("Silver.cities")

display(df.sort("City","_commit_version"))
Filter CDF on version numbers













Probably more useful is to query date ranges, then you can use startingTimestamp and endingTimestamp as an option. Only startingTimestamp is also permitted.
%%pyspark

df = spark.read.format("delta") \
  .option("readChangeFeed", "true") \
  .option("startingTimestamp", '2023-03-26 11:07:23.008') \
  .table("Silver.cities")

display(df.sort("City","_commit_version"))
Filter CDF on timestamps














If you want to use the new column _commit_timestamp from the next record to create a new column called end_timestamp in the current record, then you need to play with the lead() function (just like in TSQL).
%%pyspark

from pyspark.sql.window import Window
from pyspark.sql.functions import lead 

df = spark.read.format("delta") \
  .option("readChangeFeed", "true") \
  .option("startingTimestamp", '2023-03-26 11:07:23.008') \
  .table("Silver.cities")

# Create window for lead
windowSpec  = Window.partitionBy("City").orderBy("_commit_version")

# Remove update_preimage records, add new column with Lead() and then sort
display(df.filter("_change_type != 'update_preimage'")
           .withColumn("_end_timestamp",lead("_commit_timestamp",1).over(windowSpec))
           .sort("City","_commit_version"))
Create end_timestamp with lead function











Conclusions
In this post you learned the basics of the Change Data Feed options in Synapse. This feature is available in Delta Lake 2.0.0 and above, but it is still in experimental support mode. For now you have to use PySpark instead of Spark SQL to query the data in Synapse.

Besides creating nice fact tables to show data changes during a certain periode this feature could also be useful to incremental load a large fact table with only changes from the silver layer. Creating audit trails for data changes over time could also be an interesting option. The CDF option is probably the most useful when there are not that many changes in a table.

In a later post, when Delta Lake 2.3.0 is available in Synapse, we will explain the Spark SQL options for CDF. Special thanks to colleagues Roelof Jonkers and Martijn Broeks for helping out.



Sunday, 19 March 2023

Synapse - Using Spark SQL to time travel Delta Tables

Case
In a previous blog post you showed how to create and query Delta Tables with PySpark for a Lake House, however most Data Warehouse people are more familiar with the SQL language. How can you query a Delta Table with the good old SQL language?
Using Spark SQL to time travel Delta Tables
















Solution
In that previous blog post we showed you that you can query the Delta Tables in for example a SQL Serverless pool by creating External Tables on those Delta Tables. This allows you to use TSQL to query Delta Tables, but it doesn't allow you to use time travel. You always get the latest version of the data.
External Tables on Delta in Serverless SQL Pool
























However we can use Synapse Notebooks with Spark SQL as a language which is very similar to TSQL to query Delta Tables. This allows you to time travel the data in a familiar language.

1) Add Delta Table to Lake Database
For easily querying Delta Tables you first need make the Delta Tables visible in Synapse by adding them to the Lake Database. We explained this in the previous blog post.
Adding Delta Table to Lake Database









Once the Delta Table is available in the Lake Database you can query it like a regular table. By default you will see the latest version of the data.
%%sql
SELECT * FROM silver.cities
The alternative is to use the entire path:
%%sql
SELECT * FROM delta.`abfss://mysource@mydatalake.dfs.core.windows.net/silver/cities`
2) Show historical versions
You can check which historical versions are available with the DESCRIBE HISTORY command.
%%sql
DESCRIBE HISTORY silver.cities
Show versions of the Delta Table

















Besides showing the history you can also check where the Delta Table is stored in your Data Lake with the DESCRIBE EXTENDED command. It will give you various details like the location of the Delta Table.
See details of Delta Table
















3) Show specifict version by version number
With the DESCRIBE HISTORY command you get a table with various versions of your table. The fist column shows the version number that starts with 0 for the initial version of the table.

When you query a Delta Table you can add VERSION AS OF X behind the query where you replace the X by the version number. In this example we take version 2 (the third version of the table).
%%sql
SELECT * FROM silver.cities VERSION AS OF 2
Showing version 2 of the Delta Table















4) Show specifict version by date
Time traveling with a specific version number is cumbersome because you first need to determine the version you need. Lucily you can also get a version that was active on a specific date by adding TIMESTAMP AS OF "2022-01-01" behind the query.
%%sql
SELECT * FROM silver.cities TIMESTAMP AS OF "2022-01-01"
Showing version of a specific date

















Conclusions
In this post you learned how to time travel a Delta Table with Spark SQL. The same options as with PySpark, but for some people just a little bit more readable. In a next post we will discus Change Data Feed to get data changes between versions.

Synapse - Add existing Delta Table to Lake Database

Case
How can I query my lake house files and tables in Synapse without specifying the entire data lake path for each file or table?
Querying your Lake House in Synapse













Solution
In the Data tab (left menu) of the Synapse Workspace you can create a Lake database and then add your files and tables to it. By default there is already a Lake Database present called 'default', but it won't be visible until you add tables to it or add other databases.

1) Create database
Let's create a new database. We will create one for bronze, silver and gold. For the files we could create a separate database called Ingest for easy querying. In the documentation they will either mention a CREATE DATABASE or a CREATE SCHEMA command. They are the same thing.
  • Create a new notebook with either SPARK SQL or PySpark as language.
  • Attach it to a spark pool
  • In the first Code block add the code below and execute it. The IF NOT EXISTS and the COMMENT are optional.
%%sql
CREATE DATABASE IF NOT EXISTS Ingest COMMENT 'Raw files';
CREATE DATABASE IF NOT EXISTS Bronze COMMENT 'Raw Layer';
CREATE DATABASE IF NOT EXISTS Silver COMMENT 'Validated Layer';
CREATE DATABASE IF NOT EXISTS Gold COMMENT 'Enriched Layer';
%%pyspark
spark.sql(f'CREATE DATABASE IF NOT EXISTS Ingest COMMENT \'Raw files\';');
spark.sql(f'CREATE DATABASE IF NOT EXISTS Bronze COMMENT \'Raw Layer\';');
spark.sql(f'CREATE DATABASE IF NOT EXISTS Silver COMMENT \'Validated Layer\';');
spark.sql(f'CREATE DATABASE IF NOT EXISTS Gold COMMENT \'Enriched Layer\';');
SPARK SQL Code block









Now go to the Data and then you will see a Lake database list with 4 databases (default + the three you created)
Synapse Lake databases









2) Create table on Parquet file
For the raw files we will create a table base on the parquet (or flat) files from your ingestion from the source into the data lake. If the file has timestamp in the name you could even use a wildcard in the path.
  • Go back to your notebook and create a second Code block
  • Add the code below and execute it. It will create a table in the Ingest Lake database
  • Then go to the Data tab and then unfold the Ingest database and then its tables (if you are to fast then you might have to refresh the tables list.
%%sql
CREATE TABLE IF NOT EXISTS Ingest.Cities
USING PARQUET
Location 'abfss://mysource@mydatalake.dfs.core.windows.net/Ingestfolder/Cities*.parquet'
%%pyspark
spark.sql(f'CREATE TABLE IF NOT EXISTS Ingest.{table_name} USING PARQUET LOCATION \'{parquet_path}\'')

Lake table based on a Parquet file from the data lake









3) Create table on Delta Table
For the Bronze (or Silver or Gold) layer we will create a table based on an existing Delta Table from the data lake.
  • Go back to your notebook and create a third Code block
  • Add the code below and execute it. It will create a table in the Bronze Lake database
  • Then go to the Data tab and then unfold the Bronze (or Silver or Gold) database and then its tables (if you are to fast then you might have to refresh the tables list.
%%sql
CREATE TABLE IF NOT EXISTS Bronze.Cities
USING DELTA
Location 'abfss://mysource@mydatalake.dfs.core.windows.net/Bronze/Cities'
$$pyspark
spark.sql(f'CREATE TABLE IF NOT EXISTS Bronze.{table_name} USING DELTA LOCATION \'{delta_table_path}\'')

Lake table based on a Delta Table from the data lake










4) Query the new tables
Now you can query the files and delta tables like a regular database table in either a notebook with a Spark pool running or a SQL script. There is one difference between those two. In the SQL Script you get the default dbo schema between the database name and the table name. This in mandatory for the SQL script, but not allowed in a notebook.
Query in notenbook with SPARK SQL





















Query in a SQL Script











5) Create views on new tables
You can also create views on those new tables where you already add some business logic that is usefull for other colleagues working with this data or you can create views for dimensions and facts. You could even create views for dimensions and facts on your silver table and then serve them to Power BI
Create view on new tables




















Conclusion
In this post you learned how to make your data lake files and delta lake tables easier to query. A few extra steps for each table, but after that querying is much easier. In a future post we will show you how to do this directy during the INGEST and DELTA steps. Then you don't have any manual steps for each new file or table.

In the next post we will show you how to use timetravel on those Delta Tables with SPARK SQL. This post is the start position of that post.

Special thanks to colleague Martijn Broeks for helping out.