Showing posts with label JOOST. Show all posts
Showing posts with label JOOST. Show all posts

Sunday 19 March 2023

Synapse - Using Spark SQL to time travel Delta Tables

Case
In a previous blog post you showed how to create and query Delta Tables with PySpark for a Lake House, however most Data Warehouse people are more familiar with the SQL language. How can you query a Delta Table with the good old SQL language?
Using Spark SQL to time travel Delta Tables
















Solution
In that previous blog post we showed you that you can query the Delta Tables in for example a SQL Serverless pool by creating External Tables on those Delta Tables. This allows you to use TSQL to query Delta Tables, but it doesn't allow you to use time travel. You always get the latest version of the data.
External Tables on Delta in Serverless SQL Pool
























However we can use Synapse Notebooks with Spark SQL as a language which is very similar to TSQL to query Delta Tables. This allows you to time travel the data in a familiar language.

1) Add Delta Table to Lake Database
For easily querying Delta Tables you first need make the Delta Tables visible in Synapse by adding them to the Lake Database. We explained this in the previous blog post.
Adding Delta Table to Lake Database









Once the Delta Table is available in the Lake Database you can query it like a regular table. By default you will see the latest version of the data.
%%sql
SELECT * FROM silver.cities
The alternative is to use the entire path:
%%sql
SELECT * FROM delta.`abfss://mysource@mydatalake.dfs.core.windows.net/silver/cities`
2) Show historical versions
You can check which historical versions are available with the DESCRIBE HISTORY command.
%%sql
DESCRIBE HISTORY silver.cities
Show versions of the Delta Table

















Besides showing the history you can also check where the Delta Table is stored in your Data Lake with the DESCRIBE EXTENDED command. It will give you various details like the location of the Delta Table.
See details of Delta Table
















3) Show specifict version by version number
With the DESCRIBE HISTORY command you get a table with various versions of your table. The fist column shows the version number that starts with 0 for the initial version of the table.

When you query a Delta Table you can add VERSION AS OF X behind the query where you replace the X by the version number. In this example we take version 2 (the third version of the table).
%%sql
SELECT * FROM silver.cities VERSION AS OF 2
Showing version 2 of the Delta Table















4) Show specifict version by date
Time traveling with a specific version number is cumbersome because you first need to determine the version you need. Lucily you can also get a version that was active on a specific date by adding TIMESTAMP AS OF "2022-01-01" behind the query.
%%sql
SELECT * FROM silver.cities TIMESTAMP AS OF "2022-01-01"
Showing version of a specific date

















Conclusions
In this post you learned how to time travel a Delta Table with Spark SQL. The same options as with PySpark, but for some people just a little bit more readable. In a next post we will discus Change Data Feed to get data changes between versions.

Synapse - Add existing Delta Table to Lake Database

Case
How can I query my lake house files and tables in Synapse without specifying the entire data lake path for each file or table?
Querying your Lake House in Synapse













Solution
In the Data tab (left menu) of the Synapse Workspace you can create a Lake database and then add your files and tables to it. By default there is already a Lake Database present called 'default', but it won't be visible until you add tables to it or add other databases.

1) Create database
Let's create a new database. We will create one for bronze, silver and gold. For the files we could create a separate database called Ingest for easy querying. In the documentation they will either mention a CREATE DATABASE or a CREATE SCHEMA command. They are the same thing.
  • Create a new notebook with either SPARK SQL or PySpark as language.
  • Attach it to a spark pool
  • In the first Code block add the code below and execute it. The IF NOT EXISTS and the COMMENT are optional.
%%sql
CREATE DATABASE IF NOT EXISTS Ingest COMMENT 'Raw files';
CREATE DATABASE IF NOT EXISTS Bronze COMMENT 'Raw Layer';
CREATE DATABASE IF NOT EXISTS Silver COMMENT 'Validated Layer';
CREATE DATABASE IF NOT EXISTS Gold COMMENT 'Enriched Layer';
%%pyspark
spark.sql(f'CREATE DATABASE IF NOT EXISTS Ingest COMMENT \'Raw files\';');
spark.sql(f'CREATE DATABASE IF NOT EXISTS Bronze COMMENT \'Raw Layer\';');
spark.sql(f'CREATE DATABASE IF NOT EXISTS Silver COMMENT \'Validated Layer\';');
spark.sql(f'CREATE DATABASE IF NOT EXISTS Gold COMMENT \'Enriched Layer\';');
SPARK SQL Code block









Now go to the Data and then you will see a Lake database list with 4 databases (default + the three you created)
Synapse Lake databases









2) Create table on Parquet file
For the raw files we will create a table base on the parquet (or flat) files from your ingestion from the source into the data lake. If the file has timestamp in the name you could even use a wildcard in the path.
  • Go back to your notebook and create a second Code block
  • Add the code below and execute it. It will create a table in the Ingest Lake database
  • Then go to the Data tab and then unfold the Ingest database and then its tables (if you are to fast then you might have to refresh the tables list.
%%sql
CREATE TABLE IF NOT EXISTS Ingest.Cities
USING PARQUET
Location 'abfss://mysource@mydatalake.dfs.core.windows.net/Ingestfolder/Cities*.parquet'
%%pyspark
spark.sql(f'CREATE TABLE IF NOT EXISTS Ingest.{table_name} USING PARQUET LOCATION \'{parquet_path}\'')

Lake table based on a Parquet file from the data lake









3) Create table on Delta Table
For the Bronze (or Silver or Gold) layer we will create a table based on an existing Delta Table from the data lake.
  • Go back to your notebook and create a third Code block
  • Add the code below and execute it. It will create a table in the Bronze Lake database
  • Then go to the Data tab and then unfold the Bronze (or Silver or Gold) database and then its tables (if you are to fast then you might have to refresh the tables list.
%%sql
CREATE TABLE IF NOT EXISTS Bronze.Cities
USING DELTA
Location 'abfss://mysource@mydatalake.dfs.core.windows.net/Bronze/Cities'
$$pyspark
spark.sql(f'CREATE TABLE IF NOT EXISTS Bronze.{table_name} USING DELTA LOCATION \'{delta_table_path}\'')

Lake table based on a Delta Table from the data lake










4) Query the new tables
Now you can query the files and delta tables like a regular database table in either a notebook with a Spark pool running or a SQL script. There is one difference between those two. In the SQL Script you get the default dbo schema between the database name and the table name. This in mandatory for the SQL script, but not allowed in a notebook.
Query in notenbook with SPARK SQL





















Query in a SQL Script











5) Create views on new tables
You can also create views on those new tables where you already add some business logic that is usefull for other colleagues working with this data or you can create views for dimensions and facts. You could even create views for dimensions and facts on your silver table and then serve them to Power BI
Create view on new tables




















Conclusion
In this post you learned how to make your data lake files and delta lake tables easier to query. A few extra steps for each table, but after that querying is much easier. In a future post we will show you how to do this directy during the INGEST and DELTA steps. Then you don't have any manual steps for each new file or table.

In the next post we will show you how to use timetravel on those Delta Tables with SPARK SQL. This post is the start position of that post.

Special thanks to colleague Martijn Broeks for helping out.

Sunday 5 March 2023

Synapse snack - Get child pipeline value to parent

Case
We can pass values from the parent pipeline to the child pipeline via parameters, but how do we get return values from the child pipeline to the parent pipeline?
Pipeline return value

















Solution
In the past we used a Webhook activity to call a child pipeline via the Rest API and then we can use a Web activity in the child pipeline to return a value via the callbackuri. This workaround was way to complex and you end up with to different executions that are not related to each other.

Last month Microsoft introduced for both Synapse Worksapce and Azure Data Factory, the new Pipeline return value option in the Set variable activity. This allows you to return one or more HARDCODED values to the child pipeline.

1) Child pipeline - Set Variable
First we need to create a child pipe that we will be calling from a parant pipeline in the next step. The only required activity is the Set Variable activity
  • So first create a new pipeline. We called it PL_Child
  • Add a Set Variable activity to the canvas of your child pipeline. Ours is called Return Value
  • In the Settings tab of the activity set the Variable type to Pipeline return value (preview)
  • Now you can create a new string variable with a hardcoded return value. For expressions you need to change the type to for example Expression. Note that compared to pipeline variables you have way more types to choose from.
Return value

















Note that this is not an existing pipeline variable so other activities cannot change its value. Also note that the Value field cannot be overruled with an expression. So only hardcode values.

2) Parent pipeline - Execute Pipeline
Now we need the parent pipeline that will be calling the child pipeline to get the return value.
  • Create a new pipeline. Ours is called PL_Parent
  • Add a Execute Pipeline activity to the canvas of your new pipeline
  • Set it to execute the Child pipeline of step 1
  • Make sure the Wait on completion is checked!
Execute Pipeline to retrieve return value














Note that if you execute this parent pipeline that you won't see the return value in the Output window. This means we need an other activity to see that return value.
Output with no visible return value













3) Get return value
To get the return value you can use an expression with a pipelineReturnValue after the output of your Execute Pipeline activity followed by the name of your return value: @activity('EPL_GetAnswer').output.pipelineReturnValue.MyAnswer

In this example we will store the return value in a pipeline variable. Note that if you used anything else than String, Boolean or Array that you need to add a type conversion in the expression:
@string(activity('EPL_GetAnswer').output.pipelineReturnValue.MyAnswer)
Read return value from output













Output showing return value














Note that you cannot have multiple Set Variable activities running and all returning values (even if they use different names). Then it will only return one of those causing an error in your expression. You can have mulitple Set Variable activities returning values if you put them in for example an if construction so that only one will run.













Conclusions
In this little snack you learn about the new preview(!) feature in ADF and Synapase to return values from the child to the parent pipeline. A very nice new feature that we were waiting for al long time, it could use some small improvements like the expressions (why do we need an expression type?) and an option to see the output of the Execute pipeline activity.






Thursday 23 February 2023

Streaming data Azure & Power BI - Streaming dataset

Case
I want to send streaming data to Power BI for realtime reporting purposes. This post is part of a series on Streaming Data in Azure and Power BI. It focusses on showing the live data in Power BI.
Power BI Streaming dataset























Solution
In the previous Stream Analytics post we pushed the data from the Event Hub to a new Streaming dataset in Power BI and in this post we will create a Power BI report and dashboard to see the streaming data actually moving on our screen.






Posts in this series:

1) Check dataset
First make sure your Stream Analytics job is running and that data is send to the Event Hub. Only then a new Power BI Dataset will be created. Note that the icon for a streaming Dataset differs from a normal dataset. You can edit the dataset to see which columns are available, but don't change the columns or settings because Stream Analytics is managing it.
New Streaming dataset in Power BI














2) Create report
Now create a new Power BI report on the newly created dataset, publish it and  then view it in Power BI online.
Creating and publish our beautifull report














The workspace with dataset and report











The live report













Now while viewing the report make sure data is still streaming into the Power BI dataset. You will probably notice that nothing is changing or moving in your report. Once you hit the refresh visual button you will see the new data. However continuously hitting the refresh button is not likely an option for you.

3) Pin report visual to Dashboard
Hover your mouse over your report visual and look for the little push pin button. Click on it to create a new dashboard. Repeat this for all the visuals you want to see in live mode.
Pin Report Visual to Dashboard













The workspace with the new dashboard










Now open the newly created dashboard and watch the data streaming live into your visuals. You can adjust the size of the visuals to see more details. We left the bottom right empty to show a PowerShell ISE window pushing data to the Event Hub and with the lowest and slowest settings (basic Event Hub, 1 streaming unit in Stream Analytics and the max of 1 message a second) it just takes a view seconds for the data to appear in Power BI.
Streaming live data into Power BI













Conclusions
In this post we showed you the end of the hot path by creating a very basic report and pinning its visuals to a dashboard to see the data streaming live into your dashboard. You can enrich the dashboard with visuals from your cold path reports to also compare the live data to for example daily everages. The storage part of the cold path will be explained in a next post from this series.

Tip: during testing you can empty the streaming dataset by temporary switching off Historic data analysis in the edit screem of the dataset. You wont see new data until you switch it on again.


Sunday 19 February 2023

Streaming data Azure & Power BI - Stream Analytics

Case
I want to send streaming data to Power BI for realtime reporting purposes. This post is part of a series on Streaming Data in Azure and Power BI and focusses on getting the streaming data from the Event Hub and sending it to a Power BI streaming dataset with Azure Stream Analylics.
Azure Stream Analytics
















Solution
In the previous post we sent messages to the Event Hub and in this post we will read those messages and send them to Power BI. For this we need a service that can handle streaming data. Azure Stream Analytics is the query tool for streaming data in Azure. It uses the regular SQL language (so easy to learn). However it has some extras like the windowing option for the GROUP BY because you can not aggregate the entire stream (it never ends), but you can aggregate within a certain time window.

Posts in this series:

First explain the three most important items of the Stream Analytics Jobs.

Streaming inputs and reference inputs
There are three types of input sources in the query for streaming data: Azure Event Hubs, Azure IoT Hub and Azure Data Lake. But there is also a reference input where your can use an Azure SQL Database table or a blob storage file to enrich or translate your streaming data. Streaming messages are often kept small to speed up everything by using IDs or Codes instead of long strings. With the reference data you can transform them for reporting purposes.

Streaming Outputs
There are several streaming outputs like the Storage Accounts, a SQL database or even a Cosmos DB, but the one we are using for this example is the Power BI output. The Power BI output has a limitation that it can be called roughly once every second and the messages can't be larger then 15KB. This means that you will need a windowing function to aggreate the data to make sure you don't overflood Power BI.

Query
The query language in Stream Analytics is a subset of T-SQL, so very similar and easy to learn if you already know how to query a Microsoft SQL server database. The most common query pattern is a SELECT INTO with a GROUP BY.
SELECT      SomeColumn, Count(*) as Count
INTO        OutputStream
FROM        InputStream TIMESTAMP BY CreatedAt
GROUP BY    SomeColumn, TumblingWindow(second, 30)

Let's create a new Azure Stream Analytics Job and configure a query to push data to Power BI.


1) Create Stream Analytics Job
Creating the new SA Job is very straightforward. For this example the region and the number of streaming units are the important parts.
  • Go to the Azure portal an create a new Stream Analytics job
  • Select the correct subscription and resource group
  • Come up with a good descriptive name for your job. Keep in mind that there is only one query window but you can run multiple queries within that.
  • Choose the correct region. The same region as your even hub and your Power BI tenant is the best for performance.
  • Hosting environment is Cloud (unless you have an on-premises Edge environment.
  • Last thing is the number of Streaming units. This is where you start paying. The default is 3 SUs, but for testing purposes or small jobs 1 SU is more then enough.
  • Under storage you can setup a secure storage account. For this example you can leave that empty.
  • Optionally add some Tags and then review and create the ne SA Job.
Create Streaming Analytics Job




































2) Create Input stream
Now that we have a SA job we first need to create an input to connect to our Event Hub.
  • Go to your SA job in the Azure portal and click on Inputs in the left menu under Job topology
  • In the upper left corner click on + Add stream input and choose Event Hub
  • A new pane appears on the right site. First enter a descriptive name for your Event Hub.
  • Now select the correct Subscription and Event Hub Namespace.
  • Select the existing Event hub name that we created in the previous blog post.
  • Select the existing Event Hub consumer group which we left default in the previous blog post
  • For the Authenication mode the easiest way is to select Create system assigned managed identity. This means this specific SA job will get access to the selected Event Hub.
  • Partition key is for optimizing performance if your input in indeed partitionized. You can leave it empty for this example
  • We used a JSON structure for our test messages. Therefore select JSON as Event serialization format.
  • Select UTF-8 as encoding (the only option at this moment)
  • Leave the Event compression type to None for this example

Create new Input



















After saving you will see a couple of notifications to create and test your new input. This takes about a minutes to complete.
Notifications for new Input












3) Create Output stream
After the input it's now time for adding the output to Power BI. For this you need a Power BI workspace where you have admin rights. Once the job is running for the first time and new events are streaming then Stream Analytics will create a Streaming Dataset in your workspace.
  • Go to your SA job in the Azure portal and click on Outputs in the left menu under Job topology
  • In the upper left corner click on + Add and choose Power BI
  • A new pane appears on the right site. First enter a descriptive name for your Power BI output
  • Next select the Power BI workspace where your streaming dataset will appear
  • For Authenication mode choose Managed Identity-System assigned. This specific job will then be added as a Contributor
  • At last enter a Dataset name and a Table name. Note that you can only have one table in a streaming dataset. So don't create a second output to the same Power BI streaming dataset with a different table (we tried).
Create new Output



















After saving you will see a couple of notifications to create and test your new output. This takes about a minute to complete.
Notifictions for new Output

`








In Power BI your will see your new Stream Analytics job as a Contributor in your workspace. However the streaming dataset will only appear once the job is running and pushing data.
Stream Analytics Job as Contributor












4) Create query
With the new Input and Output we will now create a very basic query to push the test data from Azure Event Hub via Azure Stream Analytics to Power BI.
  • Go to your SA job in the Azure portal and click on Query in the left menu under Job topology
  • A basic but working query will alread be created for you if you have an input and output
  • Once opened wait a few moments for Azure Event Hub to refresh. You will see a turning circle icon behind your input. If there is data in your event hub then it will appear after a few moments
Default query and waiting for input data









Default query with data from input

















  • Now you can adjust your query by only selecting the columns you need in Power BI (less is more). The windowing function is not required if the number of messages don't exceed the limits of Power BI
  • After editing the query hit the Test query button and check the Test results.
Test result of basic query without time window
















Test result of basic query with TumblingWindow












  • Once you are satisfied with the query result hit the Save Query button above the editor
Save query once you're ready











The query for testing (without windowing).
SELECT
    CallId
,   DurationInSeconds
,   EventEnqueuedUtcTime as CallTimeStampStr
INTO
    [pbbitools]
FROM
    [ehbitools]
If you send more request to Power BI than it can handle, then Stream Analytics will try batching multiple messages into one request. For small messages that occasionally exceed the max number of messages this could be a 'workaround'. However batching multiple messages could also cause to exceed an other limit: the max message size. A better solution is to use a windowing function to slow down the stream.

5) Start SA job
Now go back to the overview page of your Stream Analytics job and hit the Start button to start your SA job. The first time you can choose between Now or Custom as a start time to recieve new messages. The second time you can also choose for When last stopped. For this example we choose Now. It will take a view moments to change the status from Created to Starting to Running. 
SA Job is running
















Once it is started AND new data is send to the Event Hub then the streaming dataset will appear in your Power BI workspace. Notice that the red icon is different compared to a regular dataset.
A new dataset appeared for the streaming data









Conclusion
In this post you learned how to create a (very basic) Stream Analytics job. In a follow up post we will explain the Window Functions in more details. For now you can read our old post about Window Functions, but compared to 6,5 years ago we now have 2 new Windowing functions. However you probably end up using the old TumblingWindow. The next post in this series will be showing the live data in Power BI with automatically changing visuals when new data is collected.




Monday 13 February 2023

Show dataset parameters in Azure Synapse Dataflow

Case
I have a parameterized dataset. The parameter is showing in the pipeline's Copy Data Activity, but it's not showing in the Synappse (or ADF) dataflow under source or sink. When going the the Data preview it shows me an error message: No value provided for Parameter 'MyFileName'.
No value provided for Parameter 'abc'










Solution
You added a parameter to your dataset so you can for example use it in a foreach construction in your pipeline.
Dataset with parameter














In your pipeline's Copy Data Activity you can see the dataset parameter as soon as you select your dataset with the parameter.
Parameter showing in Copy Data Activity














However when you select that same dataset as a source (or sink) you won't see the parameter appearing in the editor. On this screen there is no way to provide the parameter and when you debug the source you will get an error stating that you need to provide a value for that parameter: No value provided for Parameter 'MyFileName'.
Dataset selected, but no parameter




















The first option is to provide a default value for your dataset parameter in the dataset itself. Now you won't get that error. This is perhaps in some cases suitable, but for most cases it isn't.
Default value for parameter in dataset













Now go to your pipeline and add a Dataflow Activity for your Dataflow. You will see the Parameter appear in the settings tab (not in the Parameters tab which is for Dataflow parameters only). This is handy for when you when want to debug the Pipeline and Dataflow at the same time, but not for when you just want to debug your Dataflow to see the data preview.

Parameter is back again

















Now go to your Dataflow and click on Debug Settings. Whitin the Debug Settings go to the Parameters tab. Find your source under Dataset parameters and provide a value for debugging.
Provide Value for parameter in Dataflow Debug Settings








Go to your source (or sink) in the Dataflow and then go to Data preview to see the actual data.
There is data in our preview














Conclusion
In this post you learned how to debug your Dataflow when using a parameterized Dataset. Compared to other parts of Synapse (or ADF) it would probably make more sense to first create a Dataflow Parameter (with a default value for debugging). Then show the Dataset Parameter it in the Source (or Sink) settings page where you override it with the Dataflow Parameter.