I want to aggregate data in a stream. How does that work in Stream Analytics?
Example data: count people in front of booth |
Solution
Because it's a stream you aggregate data in a certain time window instead of the whole dataset. The Stream Analytics Query language is very similar to TSQL and it it has some extensions like the Windowing functions to aggregate data in time windows. At the moment there are three Windowing extensions (Hopping, Sliding and Tumbling), but it is not inconceivable that more windowing functions will be added in the near future.
General rules:
- The length of each window is fixed
- Windowing only work in combination with the GROUP BY clause
- The time units can be day, hour, minute, second, millisecond or microsecond, but the maximum size of the window in all cases is 7 days.
1) Tumbling Window
The Tumbling Window is the easiest to explain. It aggregates data within a X second/minute/etc. time window and does that every X seconds/minutes/etc.
For example: Tell me the average number of visitors per booth over the last 10 seconds every 10 seconds:
SELECT Booth, avg(HeadCount) as AvgHeadCount FROM HeadCountStream TIMESTAMP BY MeasurementTime GROUP BY Booth, TumblingWindow(second, 10)
Tumbling Window |
Tumbling Window |
2) Hopping Window
The Hopping Window is very similar to the Tumbling Window, but here the windows have a overlap. It aggregates data within a X second/minute/etc. time window and does that every Y seconds/minutes/etc.
For example: Tell me the average number of visitors per booth over the last 10 seconds every 5 seconds:
SELECT Booth, avg(HeadCount) as AvgHeadCount FROM HeadCountStream TIMESTAMP BY MeasurementTime GROUP BY Booth, HoppingWindow(second, 10, 5)
Hopping Window |
Hopping Window |
3) Sliding Window
The Sliding Window is the most difficult to explain. It aggregates the values in the time window every time a new event/measurement occurs or an existing event/measurement falls out of the time window.
So when using the Sliding Window you are interested in aggregating values when ever an event occurs. This is in contrast to the Hopping and Tumbling windows which have a fixed interval.
For example: Tell me the average number of visitors per booth in the last 10 seconds:
SELECT Booth, avg(HeadCount) as AvgHeadCount FROM HeadCountStream TIMESTAMP BY MeasurementTime GROUP BY Booth, SlidingWindow(second, 10)
Sliding Window |
- The first aggregation occurs when the first measurement value (1) is streamed.
- The second aggregation occurs when a new measurement value (3) is streamed.
- The third, fourth, fifth, etc. is equal to the second aggregation because each time a new measurement value (1) is streamed.
- The last aggregation occurs when no more new measurement values are streamed and the second last measurement value (2) falls out of the time window.
Sliding Window |
Compared to the Hopping Window (with the same data) you only get an extra result row at the start (1) and one at the end (1) because in this example the events happen in a fixed interval. It gets more interesting when the events are coming in more randomly like tweets about a certain subject.
Timestamp by
You probably noticed the TIMESTAMP BY measurementTime clause after the FROM. This tag lets you set the exact timestamp that an event occurred, rather than the arrival time in the IoT Hub. This timestamp is used by the windowing functions.
Testing query with Windowing functions
In the old portal you can test the query and study the result (at the moment of writing, testing is not yet supported in the new portal). For this you need a json file with some messages in it. These messages should look identical like the messages you send via the IoT Hub. For this example I created a text file with the following text in it:
{"headCount":1,"measurementTime":"2016-09-17T18:25:43.511Z","sensorName":"A"}
{"headCount":3,"measurementTime":"2016-09-17T18:25:47.511Z","sensorName":"A"}
{"headCount":2,"measurementTime":"2016-09-17T18:25:53.511Z","sensorName":"A"}
{"headCount":3,"measurementTime":"2016-09-17T18:25:57.511Z","sensorName":"A"}
{"headCount":4,"measurementTime":"2016-09-17T18:26:03.511Z","sensorName":"A"}
{"headCount":2,"measurementTime":"2016-09-17T18:26:07.511Z","sensorName":"A"}
{"headCount":2,"measurementTime":"2016-09-17T18:26:13.511Z","sensorName":"A"}
{"headCount":1,"measurementTime":"2016-09-17T18:26:17.511Z","sensorName":"A"}
When you hit the test button in the query editor you need to upload a json file for testing. Then it will use that data to test your Stream Analytics query and show the result. In the pictures below you will see the test data in the upper right corner and the query result at the bottom. The red numbers show how the average was calculated.
Sliding Window |
When you leave out one measurement, two rows will change in the result.
Sliding Window, leaving out 1 measurement |
When you leave out two successive measurements, two rows will change in the result and one row will disappear
Sliding Window, leaving out 2 successive measurements |
Conclusion
Tumbling and hopping window are easy to understand. Sliding window is a little harder to understand, but writing the query is very easy. Using the windowing functions is something you would probably want to do in the hot path to stream to PowerBI. This way you don't get to much data in the stream.