From 728c9f3c5fea9c8acf117ab0b25f1783b61731b8 Mon Sep 17 00:00:00 2001 From: ml server Date: Mon, 14 Jun 2021 11:25:12 -0600 Subject: [PATCH] rm ppg --- wasp/boards/pinetime/manifest.py | 1 - wasp/ppg.py | 188 ------------------------------- 2 files changed, 189 deletions(-) delete mode 100644 wasp/ppg.py diff --git a/wasp/boards/pinetime/manifest.py b/wasp/boards/pinetime/manifest.py index 5ad43a2..7a8280d 100644 --- a/wasp/boards/pinetime/manifest.py +++ b/wasp/boards/pinetime/manifest.py @@ -19,7 +19,6 @@ freeze('../..', manifest_240x240.manifest + 'drivers/signal.py', 'drivers/st7789.py', 'drivers/vibrator.py', - 'ppg.py', 'shell.py', 'wasp.py', ), diff --git a/wasp/ppg.py b/wasp/ppg.py deleted file mode 100644 index 4fe0ab2..0000000 --- a/wasp/ppg.py +++ /dev/null @@ -1,188 +0,0 @@ -# SPDX-License-Identifier: LGPL-3.0-or-later -# Copyright (C) 2020 Daniel Thompson - -"""Photoplethysmogram (PPG) Signal Processing -~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ - -Algorithms and signal processing primatives that can be used to convert -raw PPG signals into something useful. -""" - -import array -import micropython -import watch - -@micropython.viper -def _compare(d1, d2, count: int, shift: int) -> int: - """Compare two sequences of (signed) bytes and quantify how dissimilar - they are. - """ - p1 = ptr8(d1) - p2 = ptr8(d2) - - e = 0 - for i in range(count): - s1 = int(p1[i]) - if s1 > 127: - s1 -= 256 - - s2 = int(p2[i]) - if s2 > 127: - s2 -= 256 - - d = s1 - s2 - e += d*d - return e - -class Biquad(): - """Direct Form II Biquad Filter""" - - def __init__(self, b0, b1, b2, a1, a2): - self._coeff = (b0, b1, b2, a1, a2) - self._v1 = 0 - self._v2 = 0 - - def step(self, x): - c = self._coeff - v1 = self._v1 - v2 = self._v2 - - v = x - (c[3] * v1) - (c[4] * v2) - y = (c[0] * v) + (c[1] * v1) + (c[2] * v2) - - self._v2 = v1 - self._v1 = v - return y - -class PTAGC(): - """Peak Tracking Automatic Gain Control - - In order for the correlation checks to work correctly we must - aggressively reject spikes caused by fast DC steps. Setting a - threshold based on the median is very effective at killing - spikes but needs an extra 1k for sample storage which isn't - really plausible for a microcontroller. - """ - def __init__(self, start, decay, threshold): - self._peak = start - self._decay = decay - self._boost = 1 / decay - self._threshold = threshold - - def step(self, spl): - # peak tracking - peak = self._peak - if abs(spl) > peak: - peak *= self._boost - else: - peak *= self._decay - self._peak = peak - - # rejection filter (clipper) - threshold = self._threshold - if spl > (peak * threshold) or spl < (peak * -threshold): - return 0 - - # booster - spl = 100 * spl / (2 * peak) - - return spl - -class PPG(): - """ - """ - - def __init__(self, spl): - self._offset = spl - self.data = array.array('b') - self.debug = None - - self._hpf = Biquad(0.87033078, -1.74066156, 0.87033078, - -1.72377617, 0.75754694) - self._agc = PTAGC(20, 0.971, 2) - self._lpf = Biquad(0.11595249, 0.23190498, 0.11595249, - -0.72168143, 0.18549138) - - def preprocess(self, spl): - """Preprocess a PPG sample. - - Must be called at 24Hz for accurate heart rate calculations. - """ - if self.debug != None: - self.debug.append(spl) - spl -= self._offset - spl = self._hpf.step(spl) - spl = self._agc.step(spl) - spl = self._lpf.step(spl) - spl = int(spl) - - self.data.append(spl) - return spl - - def _get_heart_rate(self): - def compare(d, shift): - return _compare(d[shift:], d[:-shift], len(d)-shift, shift) - - def trough(d, mn, mx): - z2 = compare(d, mn-2) - z1 = compare(d, mn-1) - for i in range(mn, mx+1): - z = compare(d, i) - if z2 > z1 and z1 < z: - return i - z2 = z1 - z1 = z - - return -1 - - data = memoryview(self.data) - - # Search initially from ~210 to 30 bpm - t0 = trough(data, 7, 48) - if t0 < 0: - return None - - # Check the second cycle ... - t1 = t0 * 2 - t1 = trough(data, t1 - 5, t1 + 5) - if t1 < 0: - return None - - # ... and the third - t2 = (t1 * 3) // 2 - t2 = trough(data, t2 - 5, t2 + 4) - if t2 < 0: - return None - - # If we can find a fourth cycle then use that for the extra - # precision otherwise report whatever we've found - t3 = (t2 * 4) // 3 - t3 = trough(data, t3 - 4, t3 + 4) - if t3 < 0: - return (60 * 24 * 3) // t2 - return (60 * 24 * 4) // t3 - - def get_heart_rate(self): - if len(self.data) < 200: - return None - - hr = self._get_heart_rate() - - # Clear out the accumulated data - self.data = array.array('b') - - # Dump the debug data - if self.debug: - with open('hrs.data', 'ab') as f: - # Re-sync marker - f.write(b'\xff\xff') - now = watch.rtc.get_localtime() - f.write(array.array('H', now[:6])) - f.write(self.debug) - self.debug = array.array('H') - - return hr - - def enable_debug(self): - if self.debug == None: - self.debug = array.array('H')