Tuesday, 13 September 2022

Sending test messages to Azure Event Hubs

Case
I want to send dummy messages to Azure Event Hubs to test the streaming process before the actual service starts sending real messages.  Is there an easy way to send a bulk load of test messages to Azure Event Hubs?
Sending messages to Azure Event Hubs











Solution
Yes we could use for example a little bit of PowerShell code to send dummy messages to Azure Event Hubs via a Rest API. To do that we first need to collect some names and a key from Azure Event Hubs.

1)  Namespace and Event Hub name
Go to your Event Hubs Namespace in the Azure Portal and click on Event Hubs on the left side. In the top left corner you will find the Event Hubs Namespace (1) and in the list in the center of the page you will find the name of your Event Hub (2). Copy these names to your Powershell editor.
Namespace and Event Hub name


















2) Shared access policies Name and Key
Now click on your Event Hub in the list above and then click on Shared access policies in the left menu. If there is no policy then create one (send is enough for testing). Then click on the policy to reveal the keys. Copy the Name (3) and one of the keys (4) to your Powershell editor.
Shared access policies name and key








3) The script
Now you have all the things you need from your event hub. Time to do some PowerShell coding. The top part of the script is to create a Shared Access Signature token (SAS token). This token is needed for authorization in the Rest API. In this part of the script you will also need to specify the names and key from the previous two steps under EventHubs Parameters.

The second part is sending a messsage via a Rest API to your event hub. To make it a little more usefull there is a loop to send multiple messages with a pause between each message. You must adjust the dummy message to your own needs by changing the column names and values. You can also specify the number of messages and the pause between each message.
####################################################################
# Create SAS TOKEN FOR AZURE EVENT HUBS
####################################################################
# EventHubs Parameters
$EventHubsNamespace = "bitools"
$EventHubsName = "myeventhub"
$SharedAccessPolicyName = "SendOnly"
$SharedAccessPolicyPrimaryKey = "1fhvzfOkVs+MxsZ/fakeZwrHTImD3YCCN7CGqYCAFN8kU="

# Create SAS Token
[Reflection.Assembly]::LoadWithPartialName("System.Web")| out-null
$URI = "$($EventHubsNamespace).servicebus.windows.net/$($EventHubsName)"
$Expires = ([DateTimeOffset]::Now.ToUnixTimeSeconds())+3600
$SignatureString = [System.Web.HttpUtility]::UrlEncode($URI)+ "`n" + [string]$Expires
$HMACSHA256 = New-Object System.Security.Cryptography.HMACSHA256
$HMACSHA256.key = [Text.Encoding]::ASCII.GetBytes($SharedAccessPolicyPrimaryKey)
$SignatureBytes = $HMACSHA256.ComputeHash([Text.Encoding]::ASCII.GetBytes($SignatureString))
$SignatureBase64 = [Convert]::ToBase64String($SignatureBytes)
$SASToken = "SharedAccessSignature sr=" + [System.Web.HttpUtility]::UrlEncode($URI) + "&sig=" + [System.Web.HttpUtility]::UrlEncode($SignatureBase64) + "&se=" + $Expires + "&skn=" + $SharedAccessPolicyName


####################################################################
# SEND DUMMY MESSAGES
####################################################################
# Message Parameters
$StartNumber = 1
$NumberOfMessages = 10
$MillisecondsToWait = 1000

# Determine URL and header
$RestAPI = "https://$($EventHubsNamespace).servicebus.windows.net/$($EventHubsName)/messages"

# API headers
$Headers = @{
            "Authorization"=$SASToken;
            "Content-Type"="application/atom+xml;type=entry;charset=utf-8";
            }

# Screenfeedback
Write-Host "Sending $($NumberOfMessages) messages to event hub [$($EventHubsName)] within [$($EventHubsNamespace)]"

# Loop to create X number of dummy messages
for($i = $StartNumber; $i -lt $NumberOfMessages+$StartNumber; $i++)
{
    # Create dummy message to sent to Azure Event Hubs
    $Body = "{'CallId':$($i), 'DurationInSeconds':$(Get-Random -Maximum 1000)}"

    # Screenfeedback
    Write-Host "Sending message nr $($i) and then waiting $($MillisecondsToWait) milliseconds"

    # execute the Azure REST API
    Invoke-RestMethod -Uri $RestAPI -Method "POST" -Headers $Headers -Body $Body

    # Wait a couple of milliseconds before sending next dummy message
    Start-Sleep -Milliseconds $MillisecondsToWait
}
When you run the PowerShell script with these parameters then 10 messages will be sent to your Event Hub.

Executing the Powershell script

















4) Check messages in the Event Hub
Now we can check the messages in your event hub. Go to your eventhub (myeventhub in our case) and click on Process data. Then find the Stream Analytics Query editor and execute the query.
Stream Analytics Query editor























Here can see the contents of your 10 dummy messages with a very basic query.
The result












Conclusion
In this post you learned how to test the setup of your Event Hub and if you for example also connect to Azure Stream Analytics and a Power BI streaming dataset with a report and dashboard then you can also see the messages arriving live in your Power BI Dashboard. This will be shown in a separate post.

Note that the script is sending the messages one by one with an even period between each message. You could for example also make the pause period random or even execute the script in multiple PowerShell ISE editors at once to simulate a more random load of arriving messages.

Do you have an easier way to send test messages or a more sophisticated script then please share your knowledge in the comments below.

Wednesday, 31 August 2022

DevOps snack: If condition in YAML code

Case
Can you have an IF condition in your YAML code to make it more flexible? I want to do for example something different for my Data Factory deployment in Development, Test and Production.
IF conditions in YAML?
















Solution
Yes the expression language also supports Conditional insertion (IF, ELSE, ELSEIF) which you can use to make your YAML code more flexible. However use it with moderation because it also makes is a bit less readable. The expressions won't get any color formatting in the browser.

If for example you want to adjust a parameter value depending on which branch is triggering the pipeline. Without the IF you would start with something like this:
jobs:
  - template: DeployADF.yml
    parameters:
      environment: tst
Between ${{ and }}: you can add an if construction. In this example it checks whether the name if the branch (that is triggering the pipeline) is equals to 'Development', 'Test' or 'Main'. Note the extra indention on the line below the IF.
jobs:
  - template: DeployADF.yml
    parameters:
      ${{ if eq(variables['Build.SourceBranchName'], 'Development' }}:
        environment: dev
      ${{ if eq(variables['Build.SourceBranchName'], 'Test' }}:
        environment: tst
      ${{ if eq(variables['Build.SourceBranchName'], 'Main' }}:
        environment: Prd

When the line below the IF would normally start with a minus sign then the if should also start with a minus sign. For example with the variable groups. Without the if you would start like below:
variables:
- group: ADFParamsTst
If you want to make the groupname depending on the branch name triggering the pipeline then you can add the IF between ${{ and }}: but the line should then start with a minus and also the line below the IF should start with a minus. Also note the extra indentions.
variables:
- ${{ if eq(variables['Build.SourceBranchName'], 'Development' }}:
  - group: ADFParamsDev
- ${{ if eq(variables['Build.SourceBranchName'], 'Test' }}:
  - group: ADFParamsTst
- ${{ if eq(variables['Build.SourceBranchName'], 'Main' }}:
  - group: ADFParamsPrd

Conclusion
In this blog you learned how to add an IF statement in your YAML code. Don't forget the extra indention on the next line below the IF and add the extra minus symbol before the IF when the line after the IF requires one. And don't make it too complex for your colleagues.

Thanks Collin Mezach for helping out.

Sunday, 31 July 2022

Skipped dependency between pipeline activities

Case
Where is the Skipped dependency between ADF or Synapse pipeline activities useful for?
Skipped dependency




















Solution
The Skipped dependency will execute the next activity if the previous activity is not executed. For example because the activity before that previous activity failed.

You could for example use this to create a general event handler for the entire pipeline by connecting the end of each 'line' to a dummy activity and then to an event handler or notification activity. The dummy activity is required because it will be skipped if anything fails. The (1 sec) Wait activity is probably the easiest to use.
If something fails the Web activity will execute














If any of these activities above fails then the 'Dummy' Wait activity will not be executed and as a result the Web activity with the Skipped dependency will be executed. If all activities are succesful then the 'Dummy' Wait activity will be executed, but the Web activity won't .

Conclusion
In this post you learned one of the usages of the Skipped dependency, but please post your usage of the Skipped dependency in the comments below.

Note that it will wait for all 'lines' to be finished before the Dummy activity is skipped. So if Stored Procedure 1 fails then the Dummy activity still needs to wait for Stored Procedure 2, 3 and 4 to be ready (failed or succeeded). This is because the dependencies is ADF are a logial AND (in SSIS you could also change it to be a logical OR).


Wednesday, 8 June 2022

Break or stop ForEach loop in ADF and Synapse

Case
I have a pipeline with a parallel Foreach loop to execute for example multiple Stored Procedures, but if one of those fails I want to stop the Foreach executing more Stored Procedures. Can I break the Foreach loop in case of an error without turning it from parallel into sequential? 
Can you stop a ForEach Loop? Nope!















Solution
You can't stop the foreach loop itself on error, but if an iteration fails then you can cancel the entire pipeline that is running the foreach loop. When the pipeline is running it gets a RUN ID and you can use that to call a Rest API to cancel the current pipeline run if one of the Stored Procedures inside the ForEach fails.
Cancel the entire pipeline














1) Give ADF/Synapse access to its own Rest APIs
To use the Rest APIs of ADF or Synapse within a pipeline you first need to give ADF or Synapse access to its own Rest APIs. This can be done in the Azure Portal on the overview page of the service.
  • Open the Azure Portal in your browser and go to the overview page of your Data Factory or Synapse Workspace.
  • In the left menu click on Access control (IAM)
  • Click on +Add and then choose Add role assignment
  • Select the role Contributor or for ADF Data Factory Contributor and click on Next
  • Under Assign access to select Manged identity
  • Under Members click on +Select members
  • Now you need to select Synapse workspace or Data Factory (V2) and search for your ADF or Synapse
  • Click on your Synapse or ADF and click on the Select button
  • Optionally enter a description: "Give ADF/Synapse access to its own Rest APIs"
  • Click on the Review + assign button (twice) 
The animated GIF is from a Synapse workspace, but it is identical to Data Factory. Just select Data Factory as Managed identity type.
Giving Synapse access to its own Rest APIs
















2) Add Web Activity in Foreach on fail
In the ForEach loop construction we need to add a Web Activity to call the Rest API that cancels the Pipeline run. Data Factory and Synapse have a different URL to cancel the pipeline. Within the example URL you need to replace the red parts with the curly brackets by either a hard code value, a parameter or a System Variable. For Data Factory and Synapse you can use the same System Variabes to retrieve the factoryName/workspaceName and the runId: pipeline().DataFactory / pipeline().RunId

Azure Data Factory:
https://management.azure.com/subscriptions/{subscriptionId}/resourceGroups/{resourceGroupName}/providers/Microsoft.DataFactory/factories/{factoryName}/pipelineruns/{runId}/cancel?api-version=2018-06-01

@concat('https://management.azure.com/subscriptions/',
pipeline().parameters.subscriptionId
'/resourceGroups/',
pipeline().parameters.resourceGroup,
'/providers/Microsoft.DataFactory/factories/',
pipeline().DataFactory,
'/pipelineruns/',
pipeline().RunId,
'/cancel?api-version=2018-06-01')

Azure Synapse Workspace
{endpoint}/pipelineruns/{runId}/cancel?api-version=2020-12-01

or a little easier:
https://{workspaceName}.dev.azuresynapse.net/pipelineruns/{runId}/cancel?api-version=2020-12-01

@concat('https://',
pipeline().DataFactory,
'.dev.azuresynapse.net/pipelineruns/',
pipeline().RunId,
'/cancel?api-version=2020-12-01')

  • In the ForEach add a Web Activity and connect it to your existing activity
  • Make sure to change the type of the line from Success to Failure
  • Give the Web Activty a useful name and go to the Settings tab
  • For URL use and expression like above (don't forget to add parameters if you use them)
  • Set the Method to POST
  • Enter a fake/dummy JSON message in the body. We don't need it, but it is required
  • Set the Authentication to System Assigned Managed Identity
  • And last set the Resource to https://management.azure.com/
Web Activity to Cancel the pipeline run





















3) Testing
For this example we used 10 Stored Procedures of which one of them fails on purpose. First make sure to publish the pipeline and then trigger the pipeline (don't use Debug). Then check the monitor to see the result.
One Stored Procedure Failed, rest is cancelled




















Conclusion
In this post you learned how to stop a running pipeline when one of the activities within a foreach loop fails so that is doesn't unnecessary executes new activities. One of the downsides is that in the logs the status of your pipeline will be 'Cancelled' instead of 'Failed', but it saves you running (and paying for) unnecessary activities.

Another downside is that it does not work in debug mode because then you don't get a RUN ID and there is no pipeline run to cancel. The workaround for that is to put the Web Activity that cancels your pipeline in an IF construction that only cancels if the RUN ID is not 0.

The usefulness of this construction get better if you have a lot of iterations and hopefully it's not the last iteration that failed. Let us know in the comments what you think of a construction like this and if you have an alternative solutions.