Peak signal detection in realtime timeseries data
Update: The best performing algorithm so far is this one .
This question explores robust algorithms for detecting sudden peaks in real-time timeseries data.
Consider the following dataset:
p = [1 1 1.1 1 0.9 1 1 1.1 1 0.9 1 1.1 1 1 0.9 1 1 1.1 1 1 1 1 1.1 0.9 1 1.1 1 1 0.9 1, ...
1.1 1 1 1.1 1 0.8 0.9 1 1.2 0.9 1 1 1.1 1.2 1 1.5 1 3 2 5 3 2 1 1 1 0.9 1 1 3, ...
2.6 4 3 3.2 2 1 1 0.8 4 4 2 2.5 1 1 1];
(Matlab format but it's not about the language but about the algorithm)
You can clearly see that there are three large peaks and some small peaks. This dataset is a specific example of the class of timeseries datasets that the question is about. This class of datasets has two general features:
Let's also assume the following:
For such a situation, a boundary value needs to be constructed which triggers signals. However, the boundary value cannot be static and must be determined realtime based on an algorithm.
My Question: what is a good algorithm to calculate such thresholds in realtime? Are there specific algorithms for such situations? What are the most well-known algorithms?
Robust algorithms or useful insights are all highly appreciated. (can answer in any language: it's about the algorithm)
Smoothed z-score algo (very robust thresholding algorithm)
I have constructed an algorithm that works very well for these types of datasets. It is based on the principle of dispersion: if a new datapoint is a given x number of standard deviations away from some moving mean, the algorithm signals (also called z-score). The algorithm is very robust because it constructs a separate moving mean and deviation, such that signals do not corrupt the threshold. Future signals are therefore identified with approximately the same accuracy, regardless of the amount of previous signals. The algorithm takes 3 inputs: lag = the lag of the moving window
, threshold = the z-score at which the algorithm signals
and influence = the influence (between 0 and 1) of new signals on the mean and standard deviation
. For example, a lag
of 5 will use the last 5 observations to smooth the data. A threshold
of 3.5 will signal if a datapoint is 3.5 standard deviations away from the moving mean. And an influence
of 0.5 gives signals half of the influence that normal datapoints have. Likewise, an influence
of 0 ignores signals completely for recalculating the new threshold: an influence of 0 is therefore the most robust option; 1 is the least.
It works as follows:
Pseudocode
# Let y be a vector of timeseries data of at least length lag+2
# Let mean() be a function that calculates the mean
# Let std() be a function that calculates the standard deviaton
# Let absolute() be the absolute value function
# Settings (the ones below are examples: choose what is best for your data)
set lag to 5; # lag 5 for the smoothing functions
set threshold to 3.5; # 3.5 standard deviations for signal
set influence to 0.5; # between 0 and 1, where 1 is normal influence, 0.5 is half
# Initialise variables
set signals to vector 0,...,0 of length of y; # Initialise signal results
set filteredY to y(1),...,y(lag) # Initialise filtered series
set avgFilter to null; # Initialise average filter
set stdFilter to null; # Initialise std. filter
set avgFilter(lag) to mean(y(1),...,y(lag)); # Initialise first value
set stdFilter(lag) to std(y(1),...,y(lag)); # Initialise first value
for i=lag+1,...,t do
if absolute(y(i) - avgFilter(i-1)) > threshold*stdFilter(i-1) then
if y(i) > avgFilter(i-1) then
set signals(i) to +1; # Positive signal
else
set signals(i) to -1; # Negative signal
end
# Make influence lower
set filteredY(i) to influence*y(i) + (1-influence)*filteredY(i-1);
else
set signals(i) to 0; # No signal
set filteredY(i) to y(i);
end
# Adjust the filters
set avgFilter(i) to mean(filteredY(i-lag),...,filteredY(i));
set stdFilter(i) to std(filteredY(i-lag),...,filteredY(i));
end
Demo
The Matlab code for this demo can be found at the end of this answer. To use the demo, simply run it and create a time series yourself by clicking on the upper chart. The algorithm starts working after drawing lag
number of observations.
Appendix 1: Matlab and R code for the algorithm
Matlab code
function [signals,avgFilter,stdFilter] = ThresholdingAlgo(y,lag,threshold,influence)
% Initialise signal results
signals = zeros(length(y),1);
% Initialise filtered series
filteredY = y(1:lag+1);
% Initialise filters
avgFilter(lag+1,1) = mean(y(1:lag+1));
stdFilter(lag+1,1) = std(y(1:lag+1));
% Loop over all datapoints y(lag+2),...,y(t)
for i=lag+2:length(y)
% If new value is a specified number of deviations away
if abs(y(i)-avgFilter(i-1)) > threshold*stdFilter(i-1)
if y(i) > avgFilter(i-1)
% Positive signal
signals(i) = 1;
else
% Negative signal
signals(i) = -1;
end
% Make influence lower
filteredY(i) = influence*y(i)+(1-influence)*filteredY(i-1);
else
% No signal
signals(i) = 0;
filteredY(i) = y(i);
end
% Adjust the filters
avgFilter(i) = mean(filteredY(i-lag:i));
stdFilter(i) = std(filteredY(i-lag:i));
end
% Done, now return results
end
Example:
% Data
y = [1 1 1.1 1 0.9 1 1 1.1 1 0.9 1 1.1 1 1 0.9 1 1 1.1 1 1,...
1 1 1.1 0.9 1 1.1 1 1 0.9 1 1.1 1 1 1.1 1 0.8 0.9 1 1.2 0.9 1,...
1 1.1 1.2 1 1.5 1 3 2 5 3 2 1 1 1 0.9 1,...
1 3 2.6 4 3 3.2 2 1 1 0.8 4 4 2 2.5 1 1 1];
% Settings
lag = 30;
threshold = 5;
influence = 0;
% Get results
[signals,avg,dev] = ThresholdingAlgo(y,lag,threshold,influence);
figure; subplot(2,1,1); hold on;
x = 1:length(y); ix = lag+1:length(y);
area(x(ix),avg(ix)+threshold*dev(ix),'FaceColor',[0.9 0.9 0.9],'EdgeColor','none');
area(x(ix),avg(ix)-threshold*dev(ix),'FaceColor',[1 1 1],'EdgeColor','none');
plot(x(ix),avg(ix),'LineWidth',1,'Color','cyan','LineWidth',1.5);
plot(x(ix),avg(ix)+threshold*dev(ix),'LineWidth',1,'Color','green','LineWidth',1.5);
plot(x(ix),avg(ix)-threshold*dev(ix),'LineWidth',1,'Color','green','LineWidth',1.5);
plot(1:length(y),y,'b');
subplot(2,1,2);
stairs(signals,'r','LineWidth',1.5); ylim([-1.5 1.5]);
R code
ThresholdingAlgo <- function(y,lag,threshold,influence) {
signals <- rep(0,length(y))
filteredY <- y[0:lag]
avgFilter <- NULL
stdFilter <- NULL
avgFilter[lag] <- mean(y[0:lag])
stdFilter[lag] <- sd(y[0:lag])
for (i in (lag+1):length(y)){
if (abs(y[i]-avgFilter[i-1]) > threshold*stdFilter[i-1]) {
if (y[i] > avgFilter[i-1]) {
signals[i] <- 1;
} else {
signals[i] <- -1;
}
filteredY[i] <- influence*y[i]+(1-influence)*filteredY[i-1]
} else {
signals[i] <- 0
filteredY[i] <- y[i]
}
avgFilter[i] <- mean(filteredY[(i-lag):i])
stdFilter[i] <- sd(filteredY[(i-lag):i])
}
return(list("signals"=signals,"avgFilter"=avgFilter,"stdFilter"=stdFilter))
}
Example:
# Data
y <- c(1,1,1.1,1,0.9,1,1,1.1,1,0.9,1,1.1,1,1,0.9,1,1,1.1,1,1,1,1,1.1,0.9,1,1.1,1,1,0.9,
1,1.1,1,1,1.1,1,0.8,0.9,1,1.2,0.9,1,1,1.1,1.2,1,1.5,1,3,2,5,3,2,1,1,1,0.9,1,1,3,
2.6,4,3,3.2,2,1,1,0.8,4,4,2,2.5,1,1,1)
lag <- 30
threshold <- 5
influence <- 0
# Run algo with lag = 30, threshold = 5, influence = 0
result <- ThresholdingAlgo(y,lag,threshold,influence)
# Plot result
par(mfrow = c(2,1),oma = c(2,2,0,0) + 0.1,mar = c(0,0,2,1) + 0.2)
plot(1:length(y),y,type="l",ylab="",xlab="")
lines(1:length(y),result$avgFilter,type="l",col="cyan",lwd=2)
lines(1:length(y),result$avgFilter+threshold*result$stdFilter,type="l",col="green",lwd=2)
lines(1:length(y),result$avgFilter-threshold*result$stdFilter,type="l",col="green",lwd=2)
plot(result$signals,type="S",col="red",ylab="",xlab="",ylim=c(-1.5,1.5),lwd=2)
This code (both languages) will yield the following result for the data of the original question:
Implementations in other languages:
Golang (Xeoncross)
Python (R Kiselev)
Swift (me)
Groovy (JoshuaCWebDeveloper)
C++ (brad)
C++ (Animesh Pandey)
Rust (swizard)
Scala (Mike Roberts)
Kotlin (leoderprofi)
Ruby (Kimmo Lehto)
Appendix 2: Matlab demonstration code (click to make data)
function [] = RobustThresholdingDemo()
%% SPECIFICATIONS
lag = 5; % lag for the smoothing
threshold = 3.5; % number of st.dev. away from the mean to signal
influence = 0.3; % when signal: how much influence for new data? (between 0 and 1)
% 1 is normal influence, 0.5 is half
%% START DEMO
DemoScreen(30,lag,threshold,influence);
end
function [signals,avgFilter,stdFilter] = ThresholdingAlgo(y,lag,threshold,influence)
signals = zeros(length(y),1);
filteredY = y(1:lag+1);
avgFilter(lag+1,1) = mean(y(1:lag+1));
stdFilter(lag+1,1) = std(y(1:lag+1));
for i=lag+2:length(y)
if abs(y(i)-avgFilter(i-1)) > threshold*stdFilter(i-1)
if y(i) > avgFilter(i-1)
signals(i) = 1;
else
signals(i) = -1;
end
filteredY(i) = influence*y(i)+(1-influence)*filteredY(i-1);
else
signals(i) = 0;
filteredY(i) = y(i);
end
avgFilter(i) = mean(filteredY(i-lag:i));
stdFilter(i) = std(filteredY(i-lag:i));
end
end
% Demo screen function
function [] = DemoScreen(n,lag,threshold,influence)
figure('Position',[200 100,1000,500]);
subplot(2,1,1);
title(sprintf(['Draw data points (%.0f max) [settings: lag = %.0f, '...
'threshold = %.2f, influence = %.2f]'],n,lag,threshold,influence));
ylim([0 5]); xlim([0 50]);
H = gca; subplot(2,1,1);
set(H, 'YLimMode', 'manual'); set(H, 'XLimMode', 'manual');
set(H, 'YLim', get(H,'YLim')); set(H, 'XLim', get(H,'XLim'));
xg = []; yg = [];
for i=1:n
try
[xi,yi] = ginput(1);
catch
return;
end
xg = [xg xi]; yg = [yg yi];
if i == 1
subplot(2,1,1); hold on;
plot(H, xg(i),yg(i),'r.');
text(xg(i),yg(i),num2str(i),'FontSize',7);
end
if length(xg) > lag
[signals,avg,dev] = ...
ThresholdingAlgo(yg,lag,threshold,influence);
area(xg(lag+1:end),avg(lag+1:end)+threshold*dev(lag+1:end),...
'FaceColor',[0.9 0.9 0.9],'EdgeColor','none');
area(xg(lag+1:end),avg(lag+1:end)-threshold*dev(lag+1:end),...
'FaceColor',[1 1 1],'EdgeColor','none');
plot(xg(lag+1:end),avg(lag+1:end),'LineWidth',1,'Color','cyan');
plot(xg(lag+1:end),avg(lag+1:end)+threshold*dev(lag+1:end),...
'LineWidth',1,'Color','green');
plot(xg(lag+1:end),avg(lag+1:end)-threshold*dev(lag+1:end),...
'LineWidth',1,'Color','green');
subplot(2,1,2); hold on; title('Signal output');
stairs(xg(lag+1:end),signals(lag+1:end),'LineWidth',2,'Color','blue');
ylim([-2 2]); xlim([0 50]); hold off;
end
subplot(2,1,1); hold on;
for j=2:i
plot(xg([j-1:j]),yg([j-1:j]),'r'); plot(H,xg(j),yg(j),'r.');
text(xg(j),yg(j),num2str(j),'FontSize',7);
end
end
end
WARNING: The code above always loops over all datapoints everytime it runs. When implementing this code, make sure to split the calculation of the signal into a separate function (without the loop). Then when a new datapoint arrives, update filteredY
, avgFilter
and stdFilter
once. Do not recalculate the signals for all data everytime there is a new datapoint (like in the example above), that would be extremely inefficient and slow!
Other ways to modify the algorithm (for potential improvements) are:
If you use this function somewhere, please credit me or this answer. If you have any questions regarding this algorithm, post them in the comments below or reach out to me on LinkedIn.
One approach is to detect peaks based on the following observation:
It avoids false positives by waiting until the uptrend is over. It is not exactly "real-time" in the sense that it will miss the peak by one dt. sensitivity can be controlled by requiring a margin for comparison. There is a trade off between noisy detection and time delay of detection. You can enrich the model by adding more parameters:
where dt and m are parameters to control sensitivity vs time-delay
This is what you get with the mentioned algorithm:
here is the code to reproduce the plot in python:
import numpy as np
import matplotlib.pyplot as plt
input = np.array([ 1. , 1. , 1. , 1. , 1. , 1. , 1. , 1.1, 1. , 0.8, 0.9,
1. , 1.2, 0.9, 1. , 1. , 1.1, 1.2, 1. , 1.5, 1. , 3. ,
2. , 5. , 3. , 2. , 1. , 1. , 1. , 0.9, 1. , 1. , 3. ,
2.6, 4. , 3. , 3.2, 2. , 1. , 1. , 1. , 1. , 1. ])
signal = (input > np.roll(input,1)) & (input > np.roll(input,-1))
plt.plot(input)
plt.plot(signal.nonzero()[0], input[signal], 'ro')
plt.show()
By setting m = 0.5
, you can get a cleaner signal with only one false positive:
Here is the Python
/ numpy
implementation of the smoothed z-score algorithm (see answer above). You can find the gist here.
#!/usr/bin/env python
# Implementation of algorithm from https://stackoverflow.com/a/22640362/6029703
import numpy as np
import pylab
def thresholding_algo(y, lag, threshold, influence):
signals = np.zeros(len(y))
filteredY = np.array(y)
avgFilter = [0]*len(y)
stdFilter = [0]*len(y)
avgFilter[lag - 1] = np.mean(y[0:lag])
stdFilter[lag - 1] = np.std(y[0:lag])
for i in range(lag, len(y)):
if abs(y[i] - avgFilter[i-1]) > threshold * stdFilter [i-1]:
if y[i] > avgFilter[i-1]:
signals[i] = 1
else:
signals[i] = -1
filteredY[i] = influence * y[i] + (1 - influence) * filteredY[i-1]
avgFilter[i] = np.mean(filteredY[(i-lag):i])
stdFilter[i] = np.std(filteredY[(i-lag):i])
else:
signals[i] = 0
filteredY[i] = y[i]
avgFilter[i] = np.mean(filteredY[(i-lag):i])
stdFilter[i] = np.std(filteredY[(i-lag):i])
return dict(signals = np.asarray(signals),
avgFilter = np.asarray(avgFilter),
stdFilter = np.asarray(stdFilter))
Below is the test on the same dataset that yields the same plot as in the original answer for R
/ Matlab
# Data
y = np.array([1,1,1.1,1,0.9,1,1,1.1,1,0.9,1,1.1,1,1,0.9,1,1,1.1,1,1,1,1,1.1,0.9,1,1.1,1,1,0.9,
1,1.1,1,1,1.1,1,0.8,0.9,1,1.2,0.9,1,1,1.1,1.2,1,1.5,1,3,2,5,3,2,1,1,1,0.9,1,1,3,
2.6,4,3,3.2,2,1,1,0.8,4,4,2,2.5,1,1,1])
# Settings: lag = 30, threshold = 5, influence = 0
lag = 30
threshold = 5
influence = 0
# Run algo with settings from above
result = thresholding_algo(y, lag=lag, threshold=threshold, influence=influence)
# Plot result
pylab.subplot(211)
pylab.plot(np.arange(1, len(y)+1), y)
pylab.plot(np.arange(1, len(y)+1),
result["avgFilter"], color="cyan", lw=2)
pylab.plot(np.arange(1, len(y)+1),
result["avgFilter"] + threshold * result["stdFilter"], color="green", lw=2)
pylab.plot(np.arange(1, len(y)+1),
result["avgFilter"] - threshold * result["stdFilter"], color="green", lw=2)
pylab.subplot(212)
pylab.step(np.arange(1, len(y)+1), result["signals"], color="red", lw=2)
pylab.ylim(-1.5, 1.5)
链接地址: http://www.djcxy.com/p/26790.html
上一篇: 如何将数组插入数据库?
下一篇: 实时时间序列数据中的峰值信号检测