Showing posts with label PYTHON. Show all posts
Showing posts with label PYTHON. Show all posts

Tuesday 24 October 2023

Review Synapse notebooks with your GPT model

Case
Since the introduction of ChatGPT in late 2022, people have started to discover the various applications of Large Language Models (LLMs) in coding and beyond. LLM’s can be a helpful sparring partner in developing and reviewing code. As an IT consultant, I've started using LLMs in my coding practices. This made me wonder: ‘Can I automate the use of LLM in my development process?’. Which is Azure Synapse in my case.

And the short answer is: ‘yes, you can’


 

 

 





 



Solution
This blog serves as a step-by-step guide to integrating GPT 3.5 into your Azure DevOps pipeline for automated code reviews. The solution that I’m proposing checks which files have changed between the source branch and the target branch in a pull request. If one of these changed files is a Synapse notebook, the code is passed on the GPT model on your tenant with the request to provide feedback. The feedback given by the GPT model is posted as a comment in the pull request.

Before you start, make sure you have the following three resources set up and configured correctly. I’ll include links to other step-by-step guides to create the required resources.

Required resources:

*Access to the Microsoft Azure OpenAI service is limited at the time of writing.

1) Create a GPT model in Azure OpenAI service

Once you have gained access to the Azure OpenAI service you need to create an OpenAI service and deploy a model. Click here to follow the comprehensive guide by Christopher Tearpak on creating and deploying your GPT model in Azure. The result should be an OpenAI service resource:

Expected result after creating your own OpenAI Service










With a GPT model deployment:
Expected result after deploying your own GPT model











2) Setup and configure Azure DevOps

Scripts
Three scripts must be present in your repository. These scripts can be found at the SynapseBuildValidations repository (which also contains other useful build validation scripts).

Download the scripts from the SynapseBuildValidations repository. Here is what each script does:

  • get_GPT_feedback.py: Retrieves and passes your code to GPT and posts the feedback to the pull request. 
  • requirements.txt: Lists the required Python packages for the above script. 
  • GPT_code_review.yml: Contains the pipeline configuration.
Expected folder/file structure for your repository

 


 



 

Create variable group
Create a new library under “Pipelines” and give it the name: “GPT_connection_data”.
Add the following variables:

  • openai_api_base
  • openai_api_key
  • openai_api_type
  • openai_api_version
  • synapse_root_folder

The variables openai_api_base and openai_api_key can be found in the “Keys and Endpoint” tab in your OpenAI resource.

Find your Key and Endpoint in "Keys and Endpoint" of your OpenAI service








Copy the “Endpoint” to “Openai_api_base” and one of the keys to “openai_api_key”.
Openai_api_type needs to be filled with: “azure” and openai_api_version “2023-03-15-preview”.

The variable synapse_root_folder contains the path to the root folder containing the synapse files. In my case it’s just “Synapse”, because all the synapse files can be found in {repository_root}/Synapse
An example of my repository, where the synapse_root_folder is "Synapse"













After you’ve set all your variables, the resulting variable group should look like this:

Expected variable group in Azure DevOps












Create a pipeline

The GPT_code_review.yml contains the pipeline code needed to configure the agent and execute the get_GPT_feedback.py script. You need to create a new pipeline based on the GTP_code_review.yml.

Click here to follow the comprehensive guide by Xeladu on creating a pipeline.

The result should be as follows:

The resulting pipeline after following the guide




Disable “Override YAML continuous integration (CI) trigger”

Now you’ll need to disable the CI trigger of your new pipeline.
Open the pipeline in edit mode, select the three vertical dots and select “Trigger”
Select triggers to change the trigger settings














Then select your project under “Continuous integration”, check “Override the YAML CI trigger”, check disable CI and select “save”.

Steps to disable the CI trigger






Permit pipeline access to variable group resource

After you’ve disabled the CI trigger, you’ll need to start a run. During the run you’ll get a notification that the pipeline needs permission to access a resource. Click on the “View” button and permit the access to the variable group GPT_connection_data by clicking the “Permit” button. The run will continue and eventually fail.

Permit the pipeline to access the variable group







Note that this pipeline is designed to operate only within the context of a pull request. Because it’s dependent on a few system variables that are only present on the build agent during a pull request.

Set rights for build agent
The GPT feedback is posted in comments of the pull request. The build agent needs to have the right to post in the pull request. Go to “Project settings” and select “Repositories”, when you’re in “Repositories” select security and select your “{Projectname} Build Service“.
Steps to select the build service user




















Once the build service user is selected, you must grant this user permission to contribute to pull requests.

Set the "Contribute to pull request" to Allow





















After these actions, your build agent user has access to write comments to a pull request.

Add the pipeline as build validation
The final step is adding your GPT_Feedback_pipeline as build validation on your desired branch. Go to “Project settings”, select “Repositories” and select the repository where you want to add the pipeline (“DemoProject” in my example). With the repository select “Policies”
Steps to get to the branch you want to set the build validation to
















Select the branch where you want to have the build validation.
Select the branch you want to set the build validation to










Within this branch, select the plus icon in the build validation component.
Select the plus to add a pipeline as build validation






In the “Add build policy” pop-up, select the build pipeline: “GPT_Feedback_Pipeline”, set a display name and select “Save”

Pop-up to select a pipeline
as build validation
























Now you’re good to go! When you merge another branch into the branch on which you have enabled the branch validation the GPT_Feedback_pipeline will run.

3) Testing

Now its time to perform a pull request. You will see that the validation will first be queued. So this extra validation will take a little extra time, especially when you have a busy agent. The pipeline should always run without errors. When there is no feedback, there won’t be any comments. This means that your code is perfect :) or there aren’t any changes to any of the notebooks. However, when there is feedback, it will be posted in the comments.

Let’s see de build pipeline in action. First off, we need a new branch, in my case a feature branch
  1. Create a new branch in Synapse called Feature
  2. Create a new notebook
  3. Create some sample code in python or SQL
Sample code in Azure Synapse


















  1. Create a pull request based on your new branch
  2. Fill out the details of your pull request and make sure you’re merging into the branch with the GPT build validation
  3. Confirm the creation of your pull request.
  4. Your pull request is created and the GPT_Feedback pipeline starts running
  5. After the pipeline has run successfully and the GPT model gave feedback for improvement. The feedback is posted in the comments of the merge request
GPT response in the comments of your pull request
























4) Altering the feedback to your situation

The prompt sent to the GPT model is pre-determined and might not suit your specific situation. At the time of writing, the prompt is included in the get_GPT_feedback.py script. This script contains a get_gpt_response function. The first lines of this function are used to set three strings. These strings contain the prompt for the “system”, “user” and “assistant” roles passed to the GPT model. More on the use of these roles can be found here. To alter the prompts passed to the GPT model, you need to alter the strings: content_system_string, content_assistant_string and/or content_user_string.
Subset of the get_GPT_feedback.py where the GPT commands are set
















Conclusion
In this post you learned how to integrate GPT into your deployment pipeline. Your pipeline will provide feedback on your changed of added synapse notebooks. The feedback is posted as comments in the pull request in Azure DevOps. You can customize the prompt to suit your specific needs.

Sunday 26 March 2023

Synapse - Change Data Feed (CDF) on delta tables

Case
I like the data versioning of the Delta Tables and I know how to get data from different versions, but how can I combine that in one query to get for example the changes during a year to create a nice fact table about certain changes.
Change Data Feed in Synapse




















Solution
Change Data Feed (CDF) is still a bit new. The currently supported Delta Lake version in the Synapse workspace is 2.2.0. This version does not yet not support CDF for SQL queries. This shoud be available in Delta Lake 2.3.0 according to the release documentation. Luckily you can already use PySpark to get this information.
Current Delta Lake version in Synapse










1) Enable Change Data Feed
First you have to enable the Change Data Feed option on your Delta table. From that point in time you can use CDF. The property is called enableChangeDataFeed.

You can alter your existing tables with an Alter statement
%%sql

ALTER TABLE silver.Cities
  SET TBLPROPERTIES (delta.enableChangeDataFeed = true);
For new Delta Tables you can also do this in the Create Table command.
%%sql

CREATE TABLE Silver.Cities (Id INT, City STRING, Population INT)
  TBLPROPERTIES (delta.enableChangeDataFeed = true);
And if you used the PySpark code from our previous post, then you can add an option just in front of the save.
sdf.write.format('delta').option("delta.enableChangeDataFeed", "true").save(delta_table_path)
To check whether it is enabled on your Delta Table you can use the following command.
%%sql

SHOW TBLPROPERTIES silver.cities
CDF is enabled













2) Check available data versions
Now that we have the Change Data Feed option available, lets check which data versions we have with the DESCRIBE HISTORY command. In the first example you will see that CDF is enabled after table creation in the second version (1). This means you can not include the first version (0) in the CDF command.

You will get an error if you set the range wrong while getting CDF info:
AnalysisException: Error getting change data for range [0 , 4] as change data was not
recorded for version [0]. If you've enabled change data feed on this table,
use `DESCRIBE HISTORY` to see when it was first enabled.
Otherwise, to start recording change data, use `ALTER TABLE table_name SET TBLPROPERTIES
(delta.enableChangeDataFeed=true)`.
CDF available from version 1







In the second example it was enabled during the Delta table creation and therefore CDF is available from the first version (0).
CDF available from version 0








3) Query CDF data
When you query the CDF data you will get some extra columns:
  • _change_type: showing what action was taken to change the data - insert, update_preimage, update_postimage and delete
  • _commit_version: showing the version number of the data
  • _commit_timestamp: showing the timestamp of the data change
If you want particular versionnumbers when getting the data, then you can use startingVersion and endingVersion as an option while reading the data. Only startingVersion is also permitted.
%%pyspark

df = spark.read.format("delta") \
  .option("readChangeFeed", "true") \
  .option("startingVersion", 0) \
  .option("endingVersion", 5) \
  .table("Silver.cities")

display(df.sort("City","_commit_version"))
Filter CDF on version numbers













Probably more useful is to query date ranges, then you can use startingTimestamp and endingTimestamp as an option. Only startingTimestamp is also permitted.
%%pyspark

df = spark.read.format("delta") \
  .option("readChangeFeed", "true") \
  .option("startingTimestamp", '2023-03-26 11:07:23.008') \
  .table("Silver.cities")

display(df.sort("City","_commit_version"))
Filter CDF on timestamps














If you want to use the new column _commit_timestamp from the next record to create a new column called end_timestamp in the current record, then you need to play with the lead() function (just like in TSQL).
%%pyspark

from pyspark.sql.window import Window
from pyspark.sql.functions import lead 

df = spark.read.format("delta") \
  .option("readChangeFeed", "true") \
  .option("startingTimestamp", '2023-03-26 11:07:23.008') \
  .table("Silver.cities")

# Create window for lead
windowSpec  = Window.partitionBy("City").orderBy("_commit_version")

# Remove update_preimage records, add new column with Lead() and then sort
display(df.filter("_change_type != 'update_preimage'")
           .withColumn("_end_timestamp",lead("_commit_timestamp",1).over(windowSpec))
           .sort("City","_commit_version"))
Create end_timestamp with lead function











Conclusions
In this post you learned the basics of the Change Data Feed options in Synapse. This feature is available in Delta Lake 2.0.0 and above, but it is still in experimental support mode. For now you have to use PySpark instead of Spark SQL to query the data in Synapse.

Besides creating nice fact tables to show data changes during a certain periode this feature could also be useful to incremental load a large fact table with only changes from the silver layer. Creating audit trails for data changes over time could also be an interesting option. The CDF option is probably the most useful when there are not that many changes in a table.

In a later post, when Delta Lake 2.3.0 is available in Synapse, we will explain the Spark SQL options for CDF. Special thanks to colleagues Roelof Jonkers and Martijn Broeks for helping out.



Monday 20 March 2023

Synapse - Creating Silver Delta Tables

Case
I want to create and fill a Silver layer based on parquet files in my bronze layer. Is there a simple way to create and populate the tables automatically.
Adding files to your Silver layer












Solution
You can create a notebook for this and then call that notebook from your Synapse pipeline with some parameters (location, table name and keys). This allows you to for example loop through all your ingested source files from the bronze (raw/ingest) layer and then call this notebook for each file to add them to the Silver layer. We can also add the silver tables directly to the Lake database for easy querying later on.

Note: that this example is a technical source based Silver layer. So not realy cleansed, curated or conformed.

1) Create notebook
Go to the developer tab in Synapse and create a new Notebook. Give it a suitable name and make sure the language is PySpark. Sooner or later you want to test this Notebook, so attach it to a SparkPool. Optionally you can add a Markdown cell to explain this notebook.
New Synapse Notebook









2) Code cell 1: parameters
The first code cell is for the parameters that can be overridden by parameters from the Notebook activity in the pipeline. Toogle the parameters option to make is a parameter cell. For more details see our post about notebook parameters. For debugging within the notebook we used real values.

For this example everything (bronze and silver) is in the same container. So you might want to add more parameters to split those up. This example uses parquet files as a source. If you want for example CSV then you need to change the format in the mail code to fill the Spark Data Frame with data.
# path of the data lake container (bronze and silver for this example)
data_lake_container = 'abfss://mysource@datalakesvb.dfs.core.windows.net'
# The ingestion folder where your parquet file are located
bronze_folder = 'Bronze'
# The silver folder where your Delta Tables will be stored
silver_folder = 'Silver'
# The name of the table
table_name = 'SalesOrderHeader'
# The wildcard filter used within the bronze folder to find files
source_wildcard = 'SalesOrderHeader*.parquet'
# A comma separated string of one or more key columns (for the merge)
key_columns_str = 'SalesOrderID'
Parameters








3) Code cell 2: import modules and functions
The second code cell is for importing all required/useful modules. For this basic example we have only one import:
# Import modules
from delta.tables import DeltaTable
Import Delta Table module








3) Code cell 3: filling delta lake
Now the actual code for filling the delta lake tables with parquet files from the data lake. Note: code is very basic. It checks whether the Delta Lake table already exists. If not it creates the Delta Lake table and if it already exists it merges the new data into the existing table. If you have transactional data then you could also do an append instead of a merge.

# Convert comma separated string with keys to array
key_columns = key_columns_str.split(',')  

# Convert array with keys to where-clause for merge statement
conditions_list = [f"existing.{key}=updates.{key}" for key in key_columns]

# Determine path of source files from ingest layer
source_path = data_lake_container + '/' + bronze_folder + '/' + source_wildcard 

# Determine path of Delta Lake Table 
delta_table_path = data_lake_container + '/' + silver_folder + '/' + table_name

# Read file(s) in spark data frame
sdf = spark.read.format('parquet').option("recursiveFileLookup", "true").load(source_path)

# Check if the Delta Table exists
if (DeltaTable.isDeltaTable(spark, delta_table_path)):
    print('Existing delta table')
    # Read the existing Delta Table
    delta_table = DeltaTable.forPath(spark, delta_table_path)

    # Merge new data into existing table
    delta_table.alias("existing").merge(
        source = sdf.alias("updates"),
        condition = " AND ".join(conditions_list)
        
    ).whenMatchedUpdateAll(
    ).whenNotMatchedInsertAll(
    ).execute()

    # For transactions you could do an append instead of a merge
    # sdf.write.format('delta').mode('append').save(delta_table_path)

else:
    print('New delta table')
    # Create new delta table with new data
    sdf.write.format('delta').save(delta_table_path)
Adding data to new or existing Delta Table



















4) Code cell 4: Adding Delta Table to Lake Database
The last step is optional, but very useful: adding the Delta Table to the Lake Database. This allows you to query the Delta Table by its name instead of its path in the Data Lake. Make sure you first add a Silver layer to that Lake database. See this post for more details (step 1).
# Adding the Delta Table to the Delta Database for easy querying in other notebooks or scripts within Synapse.
spark.sql(f'CREATE TABLE IF NOT EXISTS Silver.{table_name} USING DELTA LOCATION \'{delta_table_path}\'')

# Spark SQL version
#  CREATE TABLE Silver.MyTable
#  USING DELTA
#  LOCATION 'abfss://yourcontainer@yourdatalake.dfs.core.windows.net/Silver/MyTable'
Adding Delta Table to Lake Database








5) Creating Pipeline
Now it is time to loop through your ingested files and call this new Notebook for each file to create the Silver Layer Delta Tables. You have to provide values for all parameters in the notebook. Since you need the key column(s) of each table to do the merge you probably need to store these somewhere.

For the ingestion we often store the table/file names from each source that we want to download to the data lake in a meta data table. In this table we also store the key column(s) from each table.

Call Notebook in ForEach loop













Synapse doesn't retrieve the parameters from the Notebook. You have to add them manually as Base parameters in the Settings tab.
Calling Notebook
















If you enter a column or set of columns for the key that are not unique you will get an error the second time you run (first time the merge is not used). 
Cannot perform Merge as multiple source rows matched and attempted to modify the same target row in the Delta table in possibly conflicting ways. By SQL semantics of Merge, when multiple source rows match on the same target row, the result may be ambiguous as it is unclear which source row should be used to update or delete the matching target row. You can preprocess the source table to eliminate the possibility of multiple matches.

6) Result
Now you can run your pipeline and check whether the silver layer of you Lake database is populated with new tables. And you can create a new notebook with Spark SQL or PySpark to check the contents of the tables to see wether the Time Travel works.
Running the pipeline that calls the new Notebook












Delta Lake folders in the Data Lake





















Conclusions
In this post you learned how to create and populate a (source based) silver layer of your Lake House Delta Tables. An easy quick start for your lake house. If you have multiple sources with similar data then you should also consider creating a real cleansed, curated and conformed silver layer manually. In a later post we will show you some of those manual steps in Spark SQL or PySpark.

Special thanks to colleague Heleen Eisen for helping out with the PySpark.





Sunday 19 March 2023

Synapse - Add existing Delta Table to Lake Database

Case
How can I query my lake house files and tables in Synapse without specifying the entire data lake path for each file or table?
Querying your Lake House in Synapse













Solution
In the Data tab (left menu) of the Synapse Workspace you can create a Lake database and then add your files and tables to it. By default there is already a Lake Database present called 'default', but it won't be visible until you add tables to it or add other databases.

1) Create database
Let's create a new database. We will create one for bronze, silver and gold. For the files we could create a separate database called Ingest for easy querying. In the documentation they will either mention a CREATE DATABASE or a CREATE SCHEMA command. They are the same thing.
  • Create a new notebook with either SPARK SQL or PySpark as language.
  • Attach it to a spark pool
  • In the first Code block add the code below and execute it. The IF NOT EXISTS and the COMMENT are optional.
%%sql
CREATE DATABASE IF NOT EXISTS Ingest COMMENT 'Raw files';
CREATE DATABASE IF NOT EXISTS Bronze COMMENT 'Raw Layer';
CREATE DATABASE IF NOT EXISTS Silver COMMENT 'Validated Layer';
CREATE DATABASE IF NOT EXISTS Gold COMMENT 'Enriched Layer';
%%pyspark
spark.sql(f'CREATE DATABASE IF NOT EXISTS Ingest COMMENT \'Raw files\';');
spark.sql(f'CREATE DATABASE IF NOT EXISTS Bronze COMMENT \'Raw Layer\';');
spark.sql(f'CREATE DATABASE IF NOT EXISTS Silver COMMENT \'Validated Layer\';');
spark.sql(f'CREATE DATABASE IF NOT EXISTS Gold COMMENT \'Enriched Layer\';');
SPARK SQL Code block









Now go to the Data and then you will see a Lake database list with 4 databases (default + the three you created)
Synapse Lake databases









2) Create table on Parquet file
For the raw files we will create a table base on the parquet (or flat) files from your ingestion from the source into the data lake. If the file has timestamp in the name you could even use a wildcard in the path.
  • Go back to your notebook and create a second Code block
  • Add the code below and execute it. It will create a table in the Ingest Lake database
  • Then go to the Data tab and then unfold the Ingest database and then its tables (if you are to fast then you might have to refresh the tables list.
%%sql
CREATE TABLE IF NOT EXISTS Ingest.Cities
USING PARQUET
Location 'abfss://mysource@mydatalake.dfs.core.windows.net/Ingestfolder/Cities*.parquet'
%%pyspark
spark.sql(f'CREATE TABLE IF NOT EXISTS Ingest.{table_name} USING PARQUET LOCATION \'{parquet_path}\'')

Lake table based on a Parquet file from the data lake









3) Create table on Delta Table
For the Bronze (or Silver or Gold) layer we will create a table based on an existing Delta Table from the data lake.
  • Go back to your notebook and create a third Code block
  • Add the code below and execute it. It will create a table in the Bronze Lake database
  • Then go to the Data tab and then unfold the Bronze (or Silver or Gold) database and then its tables (if you are to fast then you might have to refresh the tables list.
%%sql
CREATE TABLE IF NOT EXISTS Bronze.Cities
USING DELTA
Location 'abfss://mysource@mydatalake.dfs.core.windows.net/Bronze/Cities'
$$pyspark
spark.sql(f'CREATE TABLE IF NOT EXISTS Bronze.{table_name} USING DELTA LOCATION \'{delta_table_path}\'')

Lake table based on a Delta Table from the data lake










4) Query the new tables
Now you can query the files and delta tables like a regular database table in either a notebook with a Spark pool running or a SQL script. There is one difference between those two. In the SQL Script you get the default dbo schema between the database name and the table name. This in mandatory for the SQL script, but not allowed in a notebook.
Query in notenbook with SPARK SQL





















Query in a SQL Script











5) Create views on new tables
You can also create views on those new tables where you already add some business logic that is usefull for other colleagues working with this data or you can create views for dimensions and facts. You could even create views for dimensions and facts on your silver table and then serve them to Power BI
Create view on new tables




















Conclusion
In this post you learned how to make your data lake files and delta lake tables easier to query. A few extra steps for each table, but after that querying is much easier. In a future post we will show you how to do this directy during the INGEST and DELTA steps. Then you don't have any manual steps for each new file or table.

In the next post we will show you how to use timetravel on those Delta Tables with SPARK SQL. This post is the start position of that post.

Special thanks to colleague Martijn Broeks for helping out.

Saturday 27 November 2021

Synapse pipeline pass parameter to notebook

Case
I have a Synapse workspace notebook that I call from a Synapse pipeline, but I want to make it more flexible by adding parameters. How do you add parameters to a notebook and fill them via a pipeline?
Adding Parameters to your Synapse Notebook
























Solution
You can add variables to a special Code cell in the notebook and then use those as parameters within the Notebook activity. At the moment there is no real gui retrieving the parameters from the Notebook so you have to copy the names from the notebook to the Notebook activity in the pipeline.

1) Add Code cell for parameters
We need to add a Code cell and change it in to a parameter cell. Note that you can have only one parameter cell in your notebook. You want to add it somewhere at the top so that you can use its variables/parameters in the cells below this parameter cell.
  • Go to your notebook and add a new Code cell
  • Move it up. It should probably be your top code cell allowing you to use it in the cells below.
  • Click in the cell and then on the ellipsis button of that cell (button up right with three dots)
  • Choose Toggle parameter cell and you will see the word Parameters appear in the bottom right corner
Toggle parameter cell










2) Add variables to parameters cell
Next we need to add some code to the parameter cell. Here you just need to add some variables and then each variable can be overridden by the pipeline and be used in the cells below. For debugging it is usefull to give the variables a value. For this example we used python code.
Adding variables








3) Adjust Synapse Notebook activity
Last step is to edit the Synapse Notebook activity and add the parameters. For each variable you added to the parameters cell you can add a paramater in the notebook activity. At the moment there is no smart interface that lets you select a parameter and set its value. You have to set the name and datatype manually.
Adding parameters














4) Testing
Now run the pipeline to see the result. For this example we added a second Code cell with a print function to show that the default values have changed. Trigger the pipeline and go to the Monitor. Then click on your pipeline and within that pipeline on the Notebook activity. If you click on the pencil icon the notebook will open and allow you to see the result.
Click on pencil te open the Notebook













Note the extra cell and the result of the third cell















Conclusion
In this short post you learned how to add parameters to your notebook and fill them via the pipeline. And as an additional bonus you saw how to check the result of the changes. Next step is forexample to add the Notebook to a Foreach loop that ingest data to the datalake and then execute the notebook to create a Delta Lake table for each item in the Foreach loop.