Peak signal detection in realtime timeseries data


Update: The best performing algorithm so far is this one .


This question explores robust algorithms for detecting sudden peaks in real-time timeseries data.

Consider the following dataset:

p = [1 1 1.1 1 0.9 1 1 1.1 1 0.9 1 1.1 1 1 0.9 1 1 1.1 1 1 1 1 1.1 0.9 1 1.1 1 1 0.9 1, ...
     1.1 1 1 1.1 1 0.8 0.9 1 1.2 0.9 1 1 1.1 1.2 1 1.5 1 3 2 5 3 2 1 1 1 0.9 1 1 3, ... 
     2.6 4 3 3.2 2 1 1 0.8 4 4 2 2.5 1 1 1];

(Matlab format but it's not about the language but about the algorithm)

数据图

You can clearly see that there are three large peaks and some small peaks. This dataset is a specific example of the class of timeseries datasets that the question is about. This class of datasets has two general features:

  • There is basic noise with a general mean
  • There are large 'peaks' or 'higher data points' that significantly deviate from the noise.
  • Let's also assume the following:

  • the width of the peaks cannot be determined beforehand
  • the height of the peaks clearly deviates from the other values
  • the used algorithm must calculate realtime (so change with each new datapoint)
  • For such a situation, a boundary value needs to be constructed which triggers signals. However, the boundary value cannot be static and must be determined realtime based on an algorithm.


    My Question: what is a good algorithm to calculate such thresholds in realtime? Are there specific algorithms for such situations? What are the most well-known algorithms?


    Robust algorithms or useful insights are all highly appreciated. (can answer in any language: it's about the algorithm)


    Smoothed z-score algo (very robust thresholding algorithm)

    I have constructed an algorithm that works very well for these types of datasets. It is based on the principle of dispersion: if a new datapoint is a given x number of standard deviations away from some moving mean, the algorithm signals (also called z-score). The algorithm is very robust because it constructs a separate moving mean and deviation, such that signals do not corrupt the threshold. Future signals are therefore identified with approximately the same accuracy, regardless of the amount of previous signals. The algorithm takes 3 inputs: lag = the lag of the moving window , threshold = the z-score at which the algorithm signals and influence = the influence (between 0 and 1) of new signals on the mean and standard deviation . For example, a lag of 5 will use the last 5 observations to smooth the data. A threshold of 3.5 will signal if a datapoint is 3.5 standard deviations away from the moving mean. And an influence of 0.5 gives signals half of the influence that normal datapoints have. Likewise, an influence of 0 ignores signals completely for recalculating the new threshold: an influence of 0 is therefore the most robust option; 1 is the least.

    It works as follows:

    Pseudocode

    # Let y be a vector of timeseries data of at least length lag+2
    # Let mean() be a function that calculates the mean
    # Let std() be a function that calculates the standard deviaton
    # Let absolute() be the absolute value function
    
    # Settings (the ones below are examples: choose what is best for your data)
    set lag to 5;          # lag 5 for the smoothing functions
    set threshold to 3.5;  # 3.5 standard deviations for signal
    set influence to 0.5;  # between 0 and 1, where 1 is normal influence, 0.5 is half
    
    # Initialise variables
    set signals to vector 0,...,0 of length of y;   # Initialise signal results
    set filteredY to y(1),...,y(lag)                # Initialise filtered series
    set avgFilter to null;                          # Initialise average filter
    set stdFilter to null;                          # Initialise std. filter
    set avgFilter(lag) to mean(y(1),...,y(lag));    # Initialise first value
    set stdFilter(lag) to std(y(1),...,y(lag));     # Initialise first value
    
    for i=lag+1,...,t do
      if absolute(y(i) - avgFilter(i-1)) > threshold*stdFilter(i-1) then
        if y(i) > avgFilter(i-1) then
          set signals(i) to +1;                     # Positive signal
        else
          set signals(i) to -1;                     # Negative signal
        end
        # Make influence lower
        set filteredY(i) to influence*y(i) + (1-influence)*filteredY(i-1);
      else
        set signals(i) to 0;                        # No signal
        set filteredY(i) to y(i);
      end
      # Adjust the filters
      set avgFilter(i) to mean(filteredY(i-lag),...,filteredY(i));
      set stdFilter(i) to std(filteredY(i-lag),...,filteredY(i));
    end
    

    Demo

    演示鲁棒阈值算法

    The Matlab code for this demo can be found at the end of this answer. To use the demo, simply run it and create a time series yourself by clicking on the upper chart. The algorithm starts working after drawing lag number of observations.


    Appendix 1: Matlab and R code for the algorithm

    Matlab code

    function [signals,avgFilter,stdFilter] = ThresholdingAlgo(y,lag,threshold,influence)
    % Initialise signal results
    signals = zeros(length(y),1);
    % Initialise filtered series
    filteredY = y(1:lag+1);
    % Initialise filters
    avgFilter(lag+1,1) = mean(y(1:lag+1));
    stdFilter(lag+1,1) = std(y(1:lag+1));
    % Loop over all datapoints y(lag+2),...,y(t)
    for i=lag+2:length(y)
        % If new value is a specified number of deviations away
        if abs(y(i)-avgFilter(i-1)) > threshold*stdFilter(i-1)
            if y(i) > avgFilter(i-1)
                % Positive signal
                signals(i) = 1;
            else
                % Negative signal
                signals(i) = -1;
            end
            % Make influence lower
            filteredY(i) = influence*y(i)+(1-influence)*filteredY(i-1);
        else
            % No signal
            signals(i) = 0;
            filteredY(i) = y(i);
        end
        % Adjust the filters
        avgFilter(i) = mean(filteredY(i-lag:i));
        stdFilter(i) = std(filteredY(i-lag:i));
    end
    % Done, now return results
    end
    

    Example:

    % Data
    y = [1 1 1.1 1 0.9 1 1 1.1 1 0.9 1 1.1 1 1 0.9 1 1 1.1 1 1,...
        1 1 1.1 0.9 1 1.1 1 1 0.9 1 1.1 1 1 1.1 1 0.8 0.9 1 1.2 0.9 1,...
        1 1.1 1.2 1 1.5 1 3 2 5 3 2 1 1 1 0.9 1,...
        1 3 2.6 4 3 3.2 2 1 1 0.8 4 4 2 2.5 1 1 1];
    
    % Settings
    lag = 30;
    threshold = 5;
    influence = 0;
    
    % Get results
    [signals,avg,dev] = ThresholdingAlgo(y,lag,threshold,influence);
    
    figure; subplot(2,1,1); hold on;
    x = 1:length(y); ix = lag+1:length(y);
    area(x(ix),avg(ix)+threshold*dev(ix),'FaceColor',[0.9 0.9 0.9],'EdgeColor','none');
    area(x(ix),avg(ix)-threshold*dev(ix),'FaceColor',[1 1 1],'EdgeColor','none');
    plot(x(ix),avg(ix),'LineWidth',1,'Color','cyan','LineWidth',1.5);
    plot(x(ix),avg(ix)+threshold*dev(ix),'LineWidth',1,'Color','green','LineWidth',1.5);
    plot(x(ix),avg(ix)-threshold*dev(ix),'LineWidth',1,'Color','green','LineWidth',1.5);
    plot(1:length(y),y,'b');
    subplot(2,1,2);
    stairs(signals,'r','LineWidth',1.5); ylim([-1.5 1.5]);
    

    R code

    ThresholdingAlgo <- function(y,lag,threshold,influence) {
      signals <- rep(0,length(y))
      filteredY <- y[0:lag]
      avgFilter <- NULL
      stdFilter <- NULL
      avgFilter[lag] <- mean(y[0:lag])
      stdFilter[lag] <- sd(y[0:lag])
      for (i in (lag+1):length(y)){
        if (abs(y[i]-avgFilter[i-1]) > threshold*stdFilter[i-1]) {
          if (y[i] > avgFilter[i-1]) {
            signals[i] <- 1;
          } else {
            signals[i] <- -1;
          }
          filteredY[i] <- influence*y[i]+(1-influence)*filteredY[i-1]
        } else {
          signals[i] <- 0
          filteredY[i] <- y[i]
        }
        avgFilter[i] <- mean(filteredY[(i-lag):i])
        stdFilter[i] <- sd(filteredY[(i-lag):i])
      }
      return(list("signals"=signals,"avgFilter"=avgFilter,"stdFilter"=stdFilter))
    }
    

    Example:

    # Data
    y <- c(1,1,1.1,1,0.9,1,1,1.1,1,0.9,1,1.1,1,1,0.9,1,1,1.1,1,1,1,1,1.1,0.9,1,1.1,1,1,0.9,
           1,1.1,1,1,1.1,1,0.8,0.9,1,1.2,0.9,1,1,1.1,1.2,1,1.5,1,3,2,5,3,2,1,1,1,0.9,1,1,3,
           2.6,4,3,3.2,2,1,1,0.8,4,4,2,2.5,1,1,1)
    
    lag       <- 30
    threshold <- 5
    influence <- 0
    
    # Run algo with lag = 30, threshold = 5, influence = 0
    result <- ThresholdingAlgo(y,lag,threshold,influence)
    
    # Plot result
    par(mfrow = c(2,1),oma = c(2,2,0,0) + 0.1,mar = c(0,0,2,1) + 0.2)
    plot(1:length(y),y,type="l",ylab="",xlab="") 
    lines(1:length(y),result$avgFilter,type="l",col="cyan",lwd=2)
    lines(1:length(y),result$avgFilter+threshold*result$stdFilter,type="l",col="green",lwd=2)
    lines(1:length(y),result$avgFilter-threshold*result$stdFilter,type="l",col="green",lwd=2)
    plot(result$signals,type="S",col="red",ylab="",xlab="",ylim=c(-1.5,1.5),lwd=2)
    

    This code (both languages) will yield the following result for the data of the original question:

    来自Matlab代码的阈值示例


    Implementations in other languages:

  • Golang (Xeoncross)

  • Python (R Kiselev)

  • Swift (me)

  • Groovy (JoshuaCWebDeveloper)

  • C++ (brad)

  • C++ (Animesh Pandey)

  • Rust (swizard)

  • Scala (Mike Roberts)

  • Kotlin (leoderprofi)

  • Ruby (Kimmo Lehto)


  • Appendix 2: Matlab demonstration code (click to make data)

    function [] = RobustThresholdingDemo()
    
    %% SPECIFICATIONS
    lag         = 5;       % lag for the smoothing
    threshold   = 3.5;     % number of st.dev. away from the mean to signal
    influence   = 0.3;     % when signal: how much influence for new data? (between 0 and 1)
                           % 1 is normal influence, 0.5 is half      
    %% START DEMO
    DemoScreen(30,lag,threshold,influence);
    
    end
    
    function [signals,avgFilter,stdFilter] = ThresholdingAlgo(y,lag,threshold,influence)
    signals = zeros(length(y),1);
    filteredY = y(1:lag+1);
    avgFilter(lag+1,1) = mean(y(1:lag+1));
    stdFilter(lag+1,1) = std(y(1:lag+1));
    for i=lag+2:length(y)
        if abs(y(i)-avgFilter(i-1)) > threshold*stdFilter(i-1)
            if y(i) > avgFilter(i-1)
                signals(i) = 1;
            else
                signals(i) = -1;
            end
            filteredY(i) = influence*y(i)+(1-influence)*filteredY(i-1);
        else
            signals(i) = 0;
            filteredY(i) = y(i);
        end
        avgFilter(i) = mean(filteredY(i-lag:i));
        stdFilter(i) = std(filteredY(i-lag:i));
    end
    end
    
    % Demo screen function
    function [] = DemoScreen(n,lag,threshold,influence)
    figure('Position',[200 100,1000,500]);
    subplot(2,1,1);
    title(sprintf(['Draw data points (%.0f max)      [settings: lag = %.0f, '...
        'threshold = %.2f, influence = %.2f]'],n,lag,threshold,influence));
    ylim([0 5]); xlim([0 50]);
    H = gca; subplot(2,1,1);
    set(H, 'YLimMode', 'manual'); set(H, 'XLimMode', 'manual');
    set(H, 'YLim', get(H,'YLim')); set(H, 'XLim', get(H,'XLim'));
    xg = []; yg = [];
    for i=1:n
        try
            [xi,yi] = ginput(1);
        catch
            return;
        end
        xg = [xg xi]; yg = [yg yi];
        if i == 1
            subplot(2,1,1); hold on;
            plot(H, xg(i),yg(i),'r.'); 
            text(xg(i),yg(i),num2str(i),'FontSize',7);
        end
        if length(xg) > lag
            [signals,avg,dev] = ...
                ThresholdingAlgo(yg,lag,threshold,influence);
            area(xg(lag+1:end),avg(lag+1:end)+threshold*dev(lag+1:end),...
                'FaceColor',[0.9 0.9 0.9],'EdgeColor','none');
            area(xg(lag+1:end),avg(lag+1:end)-threshold*dev(lag+1:end),...
                'FaceColor',[1 1 1],'EdgeColor','none');
            plot(xg(lag+1:end),avg(lag+1:end),'LineWidth',1,'Color','cyan');
            plot(xg(lag+1:end),avg(lag+1:end)+threshold*dev(lag+1:end),...
                'LineWidth',1,'Color','green');
            plot(xg(lag+1:end),avg(lag+1:end)-threshold*dev(lag+1:end),...
                'LineWidth',1,'Color','green');
            subplot(2,1,2); hold on; title('Signal output');
            stairs(xg(lag+1:end),signals(lag+1:end),'LineWidth',2,'Color','blue');
            ylim([-2 2]); xlim([0 50]); hold off;
        end
        subplot(2,1,1); hold on;
        for j=2:i
            plot(xg([j-1:j]),yg([j-1:j]),'r'); plot(H,xg(j),yg(j),'r.');
            text(xg(j),yg(j),num2str(j),'FontSize',7);
        end
    end
    end
    

    WARNING: The code above always loops over all datapoints everytime it runs. When implementing this code, make sure to split the calculation of the signal into a separate function (without the loop). Then when a new datapoint arrives, update filteredY , avgFilter and stdFilter once. Do not recalculate the signals for all data everytime there is a new datapoint (like in the example above), that would be extremely inefficient and slow!

    Other ways to modify the algorithm (for potential improvements) are:

  • Use median instead of mean
  • Use MAD (median absolute deviation) instead of standard deviation
  • Use a signalling margin, so the signal doesn't switch too often
  • Change the way the influence parameter works
  • Treat up and down signals differently (asymmetric treatment)

  • If you use this function somewhere, please credit me or this answer. If you have any questions regarding this algorithm, post them in the comments below or reach out to me on LinkedIn.



    One approach is to detect peaks based on the following observation:

  • Time t is a peak if (y(t) > y(t-1)) && (y(t) > y(t+1))
  • It avoids false positives by waiting until the uptrend is over. It is not exactly "real-time" in the sense that it will miss the peak by one dt. sensitivity can be controlled by requiring a margin for comparison. There is a trade off between noisy detection and time delay of detection. You can enrich the model by adding more parameters:

  • peak if (y(t) - y(t-dt) > m) && (y(t) - y(t+dt) > m)
  • where dt and m are parameters to control sensitivity vs time-delay

    This is what you get with the mentioned algorithm: 在这里输入图像描述

    here is the code to reproduce the plot in python:

    import numpy as np
    import matplotlib.pyplot as plt
    input = np.array([ 1. ,  1. ,  1. ,  1. ,  1. ,  1. ,  1. ,  1.1,  1. ,  0.8,  0.9,
        1. ,  1.2,  0.9,  1. ,  1. ,  1.1,  1.2,  1. ,  1.5,  1. ,  3. ,
        2. ,  5. ,  3. ,  2. ,  1. ,  1. ,  1. ,  0.9,  1. ,  1. ,  3. ,
        2.6,  4. ,  3. ,  3.2,  2. ,  1. ,  1. ,  1. ,  1. ,  1. ])
    signal = (input > np.roll(input,1)) & (input > np.roll(input,-1))
    plt.plot(input)
    plt.plot(signal.nonzero()[0], input[signal], 'ro')
    plt.show()
    

    By setting m = 0.5 , you can get a cleaner signal with only one false positive: 在这里输入图像描述


    Here is the Python / numpy implementation of the smoothed z-score algorithm (see answer above). You can find the gist here.

    #!/usr/bin/env python
    # Implementation of algorithm from https://stackoverflow.com/a/22640362/6029703
    import numpy as np
    import pylab
    
    def thresholding_algo(y, lag, threshold, influence):
        signals = np.zeros(len(y))
        filteredY = np.array(y)
        avgFilter = [0]*len(y)
        stdFilter = [0]*len(y)
        avgFilter[lag - 1] = np.mean(y[0:lag])
        stdFilter[lag - 1] = np.std(y[0:lag])
        for i in range(lag, len(y)):
            if abs(y[i] - avgFilter[i-1]) > threshold * stdFilter [i-1]:
                if y[i] > avgFilter[i-1]:
                    signals[i] = 1
                else:
                    signals[i] = -1
    
                filteredY[i] = influence * y[i] + (1 - influence) * filteredY[i-1]
                avgFilter[i] = np.mean(filteredY[(i-lag):i])
                stdFilter[i] = np.std(filteredY[(i-lag):i])
            else:
                signals[i] = 0
                filteredY[i] = y[i]
                avgFilter[i] = np.mean(filteredY[(i-lag):i])
                stdFilter[i] = np.std(filteredY[(i-lag):i])
    
        return dict(signals = np.asarray(signals),
                    avgFilter = np.asarray(avgFilter),
                    stdFilter = np.asarray(stdFilter))
    

    Below is the test on the same dataset that yields the same plot as in the original answer for R / Matlab

    # Data
    y = np.array([1,1,1.1,1,0.9,1,1,1.1,1,0.9,1,1.1,1,1,0.9,1,1,1.1,1,1,1,1,1.1,0.9,1,1.1,1,1,0.9,
           1,1.1,1,1,1.1,1,0.8,0.9,1,1.2,0.9,1,1,1.1,1.2,1,1.5,1,3,2,5,3,2,1,1,1,0.9,1,1,3,
           2.6,4,3,3.2,2,1,1,0.8,4,4,2,2.5,1,1,1])
    
    # Settings: lag = 30, threshold = 5, influence = 0
    lag = 30
    threshold = 5
    influence = 0
    
    # Run algo with settings from above
    result = thresholding_algo(y, lag=lag, threshold=threshold, influence=influence)
    
    # Plot result
    pylab.subplot(211)
    pylab.plot(np.arange(1, len(y)+1), y)
    
    pylab.plot(np.arange(1, len(y)+1),
               result["avgFilter"], color="cyan", lw=2)
    
    pylab.plot(np.arange(1, len(y)+1),
               result["avgFilter"] + threshold * result["stdFilter"], color="green", lw=2)
    
    pylab.plot(np.arange(1, len(y)+1),
               result["avgFilter"] - threshold * result["stdFilter"], color="green", lw=2)
    
    pylab.subplot(212)
    pylab.step(np.arange(1, len(y)+1), result["signals"], color="red", lw=2)
    pylab.ylim(-1.5, 1.5)
    
    链接地址: http://www.djcxy.com/p/26790.html

    上一篇: 如何将数组插入数据库?

    下一篇: 实时时间序列数据中的峰值信号检测