2

I'm implementing a peak detection algorithm in Python that detects only those peaks that are above a threshold magnitude. I don't want to use the inbuilt function as I have to extend this simulation to Hardware implementation also.

from math import sin,isnan
from pylab import *

def peakdet(v, delta,thresh,x):
    delta=abs(delta)
    maxtab = []
    mintab = []

    v = asarray(v)

    mn, mx = v[0], v[0]
    mnpos, mxpos = NaN, NaN

    lookformax = True

    for i in arange(len(v)):
        this = v[i]
        if abs(this)>thresh:
            if this > mx:
                mx = this
                mxpos = x[i]
            if this < mn:
                mn = this
                mnpos = x[i]
            if lookformax:
                if (this < mx-delta):
                    if (mx>abs(thresh)) and not isnan(mxpos):
                        maxtab.append((mxpos, mx))
                    mn = this
                    mnpos = x[i]
                    lookformax = False
            else:
                if (this > mn+delta):
                    if (mn<-abs(thresh)) and not isnan(mnpos):
                        mintab.append((mnpos, mn))
                    mx = this
                    mxpos = x[i]
                    lookformax = True
    return array(maxtab), array(mintab)

#Input Signal
t=array(range(100))
series=0.3*sin(t)+0.7*cos(2*t)-0.5*sin(1.2*t)

thresh=0.95 #Threshold value
delta=0.0 #

a=zeros(len(t)) #
a[:]=thresh #

maxtab, mintab = peakdet(series,delta,thresh,t)

#Plotting output
scatter(array(maxtab)[:,0], array(maxtab)[:,1], color='red')
scatter(array(mintab)[:,0], array(mintab)[:,1], color='blue')
xlim([0,t[-1]])
title('Peak Detector')
grid(True)
plot(t,a,color='green',linestyle='--',dashes=(5,3))
plot(t,-a,color='green',linestyle='--',dashes=(5,3))
annotate('Threshold',xy=(t[-1],thresh),fontsize=9)
plot(t,series,'k')
show()

The problem with this program is that it is unable to detect some peaks even though they are above the threshold. This is the output I got:

Peak Detection Output

I saw other posts with peak detection problems but couldn't find any solution. Please help and suggest corrections.

4
  • What is the meaning of the delta parameter? Commented Jun 8, 2018 at 9:24
  • 1
    you are using lookformax variable that is set true once you found min So if you found max than your code look for min and ignore max ..... Commented Jun 8, 2018 at 9:27
  • @mkrieger1 Delta defines the sensitivity of detecting peaks. Like if you set a higher delta, it will not detect the small peaks. This is particularly useful if you have noise in the signal. Commented Jun 8, 2018 at 11:24
  • Isn't this the purpose of threshold? Commented Jun 8, 2018 at 12:38

4 Answers 4

9

Solution with find_peaks from scipy.signal

from scipy.signal import find_peaks
import numpy as np
import matplotlib.pyplot as plt

# Input signal
t = np.arange(100)
series = 0.3*np.sin(t)+0.7*np.cos(2*t)-0.5*np.sin(1.2*t)

# Threshold value (for height of peaks and valleys)
thresh = 0.95

# Find indices of peaks
peak_idx, _ = find_peaks(series, height=thresh)

# Find indices of valleys (from inverting the signal)
valley_idx, _ = find_peaks(-series, height=thresh)

# Plot signal
plt.plot(t, series)

# Plot threshold
plt.plot([min(t), max(t)], [thresh, thresh], '--')
plt.plot([min(t), max(t)], [-thresh, -thresh], '--')

# Plot peaks (red) and valleys (blue)
plt.plot(t[peak_idx], series[peak_idx], 'r.')
plt.plot(t[valley_idx], series[valley_idx], 'b.')

plt.show()

The resulting plot is shown below.

enter image description here

Note that find_peaks has a parameter height which is what we here called thresh. It also has a parameter called threshold, which is doing something else.

Documentation for find_peaks

Sign up to request clarification or add additional context in comments.

2 Comments

What is the underline in peak_idx, _ = there for?
@Unknow0059: Good question. It's a standard Python convention used by the community for indicating a value that will never be used. The 'find_peaks' function returns (1) an array with the peaks, and (2) a dict with properties from the solved problem. We don't care about the properties in this example, but we can't avoid it getting returned. Thus, we assign it to '_' to indicate to readers of the code that it will not be used.
1

Your function uses quite a lot of parameters. You can decompose the problem to a few steps:

  1. First detect all points above the threshold. Add those points to a maxthresh and minthresh list.
  2. Iterate through the maxthresh list and if the y value prior to the point is less than the point, and the y value after the point is less than the point, then the point is a peak.
  3. Iterate through the minthresh list and if the y value prior to the point is greater than the point, and the y value after the point is greather than the point, then the point is a peak.

Code implementation:

from math import sin
from matplotlib import pylab
from pylab import *

def peakdet(v, thresh):
    maxthresh = []
    minthresh = []
    peaks = []
    valleys = []

    for x, y in v:
        if y > thresh:
            maxthresh.append((x, y))
        elif y < -thresh:
            minthresh.append((x, y))

    for x, y in maxthresh:
        try:
            if (v[x - 1][1] < y) & (v[x + 1][1] < y):
                peaks.append((x, y))
        except Exception:
            pass

    for x, y in minthresh:
        try:
            if (v[x - 1][1] > y) & (v[x + 1][1] > y):
                valleys.append((x, y))
        except Exception:
            pass

    return peaks, valleys

Testing the code:

# input signal
t = array(range(100))
series = 0.3 * sin(t) + 0.7 * cos(2 * t) - 0.5 * sin(1.2 * t)

arr = [*zip(t, series)]  # create a list of tuples where the tuples represent the (x, y) values of the function
thresh = 0.95

peaks, valleys = peakdet(arr, thresh)

scatter([x for x, y in peaks], [y for x, y in peaks], color = 'red')
scatter([x for x, y in valleys], [y for x, y in valleys], color = 'blue')
plot(t, 100 * [thresh], color='green', linestyle='--', dashes=(5, 3))
plot(t, 100 * [-thresh], color='green', linestyle='--', dashes=(5, 3))
plot(t, series, 'k')
show()

enter image description here

Additional test to make sure peak is detected when multiple points above threshold:

# input signal
t = array(range(100))
series = 6.3 * sin(t) + 4.7 * cos(2 * t) - 3.5 * sin(1.2 * t)

arr = [*zip(t, series)]
thresh = 0.95

peaks, valleys = peakdet(arr, thresh)

scatter([x for x, y in peaks], [y for x, y in peaks], color = 'red')
scatter([x for x, y in valleys], [y for x, y in valleys], color = 'blue')
plot(t, 100 * [thresh], color='green', linestyle='--', dashes=(5, 3))
plot(t, 100 * [-thresh], color='green', linestyle='--', dashes=(5, 3))
plot(t, series, 'k')
show()

enter image description here

5 Comments

plt.axhline would add flexibility to your plot. And I am not sure that the OP can guess the imports.
@Mr.T Thanks for the feedback. I'm using the same imports that the OP used in his question but I'll edit to add clarity.
@JugalMChoksi If an answer solves your problem, please consider accepting it
@Mr.T I appreciate your approach to be very effective, but as I mentioned in the question I am looking to implement this in hardware too because of which I need real time peak detection. Anyways, thanks a lot for your help.
1

So, here you have a numpythonic solution (which is much better than doing a loop explicitly).

I use the roll function to shift the numbers +1 or -1 in the position. Also a "peak" is defined as a local maximum, where the previous and posterior number are smaller than the central value.

The full code is:

import numpy as np
import matplotlib.pyplot as plt

# input signal
x = np.arange(1,100,1)
y = 0.3 * np.sin(x) + 0.7 * np.cos(2 * x) - 0.5 * np.sin(1.2 * x)
threshold = 0.95

# max
maxi = np.where(np.where([(y - np.roll(y,1) > 0) & (y - np.roll(y,-1) > 0)],y, 0)> threshold, y,np.nan)
# min
mini = np.where(np.where([(y - np.roll(y,1) < 0) & (y - np.roll(y,-1) < 0)],y, 0)< -threshold, y,np.nan)

if you plot it, you get:

enter image description here

Comments

-3

these code

        if lookformax:
            if (this < mx-delta):
                if (mx>abs(thresh)) and not isnan(mxpos):
                    maxtab.append((mxpos, mx))
                mn = this
                mnpos = x[i]
                lookformax = False
        else:
            if (this > mn+delta):
                if (mn<-abs(thresh)) and not isnan(mnpos):
                    mintab.append((mnpos, mn))
                mx = this
                mxpos = x[i]
                lookformax = True

only run under the condition

    if abs(this)>thresh:

so your can only find a peak when the next point above the thresh is smaller than it.

put it out the condition

2 Comments

Thanks for your help! Can you elaborate what you mean by put it out the condition?
put the ( if lookformax: ... else: ...) outside the ( if abs(this)>thresh:) condition

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.