Posted by on December 15, 2017

If you’ve used Azure Stream Analytics, you’ve probably encountered the message “Exactly one temporal window is expected.” While obviously only one temporal window is possible, this error occurs any time that a query has more than one GROUP BY clause. It seems perfectly normal to me to specify a temporal window, then to want to aggregate based on the initial set of aggregates. Lots of statistical methods use mutliple levels of aggregation.

It’s possible to aggregate more than once by performing all but one of the aggregations inside a Javascript User Defined Aggregate (UDA).

The UDA examples provided by Microsoft all use simple variables to accumulate totals. It is, however, possible to include arbitrarily complex Javascript. In particular, it is possible for a UDA to keep a history of all of the values that have been passed to it. And then to group and aggregate and re-group and re-aggregate as many times as is desired.

Here is a basic example of the code used to keep the history, which returns the oldest event still in the window:

function main() {
    this.init = function () {
        this.events = [];
    }

    this.accumulate = function (value, timestamp) {
        this.events.push(value);
    }

    this.deaccumulate = function (value, timestamp) {
        this.events.shift();
    }

    this.deaccumulateState = function (otherState) {
        //Other state is an object of the same class as "this" 
        //which contains the events which are leaving
        for (i in otherState.events) {
            this.events.shift();
        }
    }

    this.computeResult = function () {
        if (0 < this.events.length ) {
            return this.events[0];
        } else {
            return null;
        }
    }
}

For reference, the most recent event can be remembered without the array:

function main() {
    this.init = function () {
        this.count = 0;
    }

    this.accumulate = function (value, timestamp) {
        this.count++;
        this.lastValue = value;
    }

    this.deaccumulate = function (value, timestamp) {
        this.count--;
    }

    this.deaccumulateState = function (otherState) {
          this.count -= otherState.count;
    }

    this.computeResult = function () {
        if (0 < this.count ) {
            return this.lastValue;
        } else {
            return null;
        }
    }
}

Once the history is populated any arbitrary logic can be performed. This example makes a prediction based on linear regression.

function main() {
    this.init = function () {
        this.events = [];
        this.predictions = [];
    }

    this.linearRegression =  function (y,x){
        var lr = {};
        var n = y.length;
        var sum_x = 0;
        var sum_y = 0;
        var sum_xy = 0;
        var sum_xx = 0;
        var sum_yy = 0;

        for (var i = 0; i < y.length; i++) {

            sum_x += x[i];
            sum_y += y[i];
            sum_xy += (x[i]*y[i]);
            sum_xx += (x[i]*x[i]);
            sum_yy += (y[i]*y[i]);
        } 

        lr['slope'] = (n * sum_xy - sum_x * sum_y) / (n*sum_xx - sum_x * sum_x);
        lr['intercept'] = (sum_y - lr.slope * sum_x) / n;
        lr['r2'] = Math.pow((n*sum_xy - sum_x*sum_y)/Math.sqrt((n*sum_xx-sum_x*sum_x)*(n*sum_yy-sum_y*sum_y)),2);

        return lr;
    }

    var arr = {	
        sum: function(array) {
            var num = 0;
            for (var i = 0, l = array.length; i < l; i++) num += array[i];
            return num;
        },
        
        mean: function(array) {
            return arr.sum(array) / array.length;
        }
    }

    this.linearPrediction = function (value) {
        var N = this.events.length;
        // This technique doesn't work on really small data sets. 10 is arbitrary.
        if (10 <= N) {
            xAxis = Array.apply(null, {length: N}).map(Number.call, Number);
            linearModel = this.linearRegression(this.events, xAxis);
            result = linearModel.intercept + (linearModel.slope * (N+1));
        } else {
            result = arr.mean(this.events);
        }
        return result;
    }

    this.accumulate = function (value, timestamp) {
        p = this.linearPrediction(); //Add current element after prediction, or it will modify the prediction
        this.events.push(value);
        this.predictions.push(p);
    }

    this.deaccumulate = function (value, timestamp) {
        this.events.shift();
    }

    this.deaccumulateState = function (otherState) {
        for (i in otherState.events) {
            this.events.shift();
            this.predictions.shift();
        }
    }

    this.computeResult = function () {
        if (0 < this.predictions.length ) {
            return this.predictions[this.predictions.length - 1]
        } else {
            return null;
        }
    }
}

Of course, this technique can also be used for simple anomaly detection:

function main() {
    this.init = function () {
        this.events = [];
        this.predictions = [];
        this.errors = [];
    }

    var arr = {	
        max: function(array) {
            return Math.max.apply(null, array);
        },
        
        min: function(array) {
            return Math.min.apply(null, array);
        },
        
        range: function(array) {
            return arr.max(array) - arr.min(array);
        },
        
        midrange: function(array) {
            return arr.range(array) / 2;
        },
    
        sum: function(array) {
            var num = 0;
            for (var i = 0, l = array.length; i < l; i++) num += array[i];
            return num;
        },
        
        mean: function(array) {
            return arr.sum(array) / array.length;
        },
        
        median: function(array) {
            array.sort(function(a, b) {
                return a - b;
            });
            var mid = array.length / 2;
            return mid % 1 ? array[mid - 0.5] : (array[mid - 1] + array[mid]) / 2;
        },
        
        modes: function(array) {
            if (!array.length) return [];
            var modeMap = {},
                maxCount = 0,
                modes = [];
    
            array.forEach(function(val) {
                if (!modeMap[val]) modeMap[val] = 1;
                else modeMap[val]++;
    
                if (modeMap[val] > maxCount) {
                    modes = [val];
                    maxCount = modeMap[val];
                }
                else if (modeMap[val] === maxCount) {
                    modes.push(val);
                    maxCount = modeMap[val];
                }
            });
            return modes;
        },
        
        variance: function(array) {
            var mean = arr.mean(array);
            return arr.mean(array.map(function(num) {
                return Math.pow(num - mean, 2);
            }));
        },
        
        standardDeviation: function(array) {
            return Math.sqrt(arr.variance(array));
        },
        
        meanAbsoluteDeviation: function(array) {
            var mean = arr.mean(array);
            return arr.mean(array.map(function(num) {
                return Math.abs(num - mean);
            }));
        },
        
        zScores: function(array) {
            var mean = arr.mean(array);
            var standardDeviation = arr.standardDeviation(array);
            return array.map(function(num) {
                return (num - mean) / standardDeviation;
            });
        }
    };
    
    this.linearRegression =  function (y,x){
        var lr = {};
        var n = y.length;
        var sum_x = 0;
        var sum_y = 0;
        var sum_xy = 0;
        var sum_xx = 0;
        var sum_yy = 0;

        for (var i = 0; i < y.length; i++) {

            sum_x += x[i];
            sum_y += y[i];
            sum_xy += (x[i]*y[i]);
            sum_xx += (x[i]*x[i]);
            sum_yy += (y[i]*y[i]);
        } 

        lr['slope'] = (n * sum_xy - sum_x * sum_y) / (n*sum_xx - sum_x * sum_x);
        lr['intercept'] = (sum_y - lr.slope * sum_x) / n;
        lr['r2'] = Math.pow((n*sum_xy - sum_x*sum_y)/Math.sqrt((n*sum_xx-sum_x*sum_x)*(n*sum_yy-sum_y*sum_y)),2);

        return lr;
    }

    this.linearPrediction = function (value) {
        var N = this.events.length;
        // This technique doesn't work on really small data sets. 10 is arbitrary.
        if (10 <= N) {
            xAxis = Array.apply(null, {length: N}).map(Number.call, Number);
            linearModel = this.linearRegression(this.events, xAxis);
            result = linearModel.intercept + (linearModel.slope * (N+1));
        } else {
            result = arr.mean(this.events);
        }
        return result;
    }

    this.accumulate = function (value, timestamp) {
        p = this.linearPrediction(); //Add current element after prediction, or it will modify the prediction
        this.events.push(value);
        this.predictions.push(p);
        this.errors.push(value - p);
    }

    this.deaccumulate = function (value, timestamp) {
        this.events.shift();
    }

    this.deaccumulateState = function (otherState) {
        for (i in otherState.events) {
            this.events.shift();
            this.predictions.shift();
            this.errors.shift();
        }
    }

    this.computeResult = function () {
        if (10 < this.errors.length ) {
            stdDev = arr.standardDeviation(this.errors);
            thisDev = this.errors[this.errors.length - 1];
            return Math.abs( thisDev / stdDev);
        } else {
            return null;
        }
    }
}

The one caveat that I feel obliged to mention, is that this technique is not going to scale. If your temporal window as a few thousand events this works fine. With a few million you’ll want a different approach. Still, for most use cases short windows will be preferred, so this approach should be broadly applicable.

Of course, these examples are proof of concept rather than production ready. Do let me know if you’re interested in more detail.

Comments

Be the first to comment.

Leave a Reply