diff --git a/wasp/gadgetbridge.py b/wasp/gadgetbridge.py deleted file mode 100644 index dc5479a..0000000 --- a/wasp/gadgetbridge.py +++ /dev/null @@ -1,68 +0,0 @@ -# SPDX-License-Identifier: LGPL-3.0-or-later -# Copyright (C) 2020 Daniel Thompson -"""Gadgetbridge/Bangle.js protocol - -Currently implemented messages are: - - * t:"notify", id:int, src,title,subject,body,sender,tel:string - new - notification - * t:"notify-", id:int - delete notification - * t:"alarm", d:[{h,m},...] - set alarms - * t:"find", n:bool - findDevice - * t:"vibrate", n:int - vibrate - * t:"weather", temp,hum,txt,wind,loc - weather report - * t:"musicstate", state:"play/pause",position,shuffle,repeat - music - play/pause/etc - * t:"musicinfo", artist,album,track,dur,c(track count),n(track num) - - currently playing music track - * t:"call", cmd:"accept/incoming/outgoing/reject/start/end", name: "name", number: "+491234" - call -""" - -import io -import json -import sys -import wasp - -# JSON compatibility -null = None -true = True -false = False - -def _info(msg): - json.dump({'t': 'info', 'msg': msg}, sys.stdout) - sys.stdout.write('\r\n') - - -def _error(msg): - json.dump({'t': 'error', 'msg': msg}, sys.stdout) - sys.stdout.write('\r\n') - - -def GB(cmd): - task = cmd['t'] - del cmd['t'] - - try: - if task == 'find': - wasp.watch.vibrator.pin(not cmd['n']) - elif task == 'notify': - id = cmd['id'] - del cmd['id'] - wasp.system.notify(id, cmd) - wasp.watch.vibrator.pulse(ms=wasp.system.notify_duration) - elif task == 'notify-': - wasp.system.unnotify(cmd['id']) - elif task == 'musicstate': - wasp.system.toggle_music(cmd) - elif task == 'musicinfo': - wasp.system.set_music_info(cmd) - elif task == 'weather': - wasp.system.set_weather_info(cmd) - else: - pass - #_info('Command "{}" is not implemented'.format(cmd)) - except Exception as e: - msg = io.StringIO() - sys.print_exception(e, msg) - _error(msg.getvalue()) - msg.close() diff --git a/wasp/steplogger.py b/wasp/steplogger.py deleted file mode 100644 index c008d8e..0000000 --- a/wasp/steplogger.py +++ /dev/null @@ -1,164 +0,0 @@ -# SPDX-License-Identifier: LGPL-3.0-or-later -# Copyright (C) 2020 Daniel Thompson - -"""Step logger -~~~~~~~~~~~~~~ - -Capture and record data from the step counter -""" - -import array -import os -import time -import wasp - -from micropython import const - -TICK_PERIOD = const(6 * 60) -DUMP_LENGTH = const(30) -DUMP_PERIOD = const(DUMP_LENGTH * TICK_PERIOD) - -class StepIterator: - def __init__(self, fname, data=None): - self._fname = fname - self._f = None - self._d = data - - def __del__(self): - self.close() - - def __iter__(self): - self.close() - self._f = open(self._fname, 'rb') - self._c = 0 - return self - - def __next__(self): - self._c += 1 - if self._c > (24*60*60) // TICK_PERIOD: - raise StopIteration - - if self._f: - spl = self._f.read(2) - if spl: - return spl[0] + (spl[1] << 8) - self.close() - self._i = 0 - - if self._d and self._i < len(self._d): - i = self._i - self._i += 1 - return self._d[i] - - return 0 - - def close(self): - if self._f: - self._f.close() - self._f = None - -class StepLogger: - def __init__(self, manager): - self._data = array.array('H', (0,) * DUMP_LENGTH) - self._steps = wasp.watch.accel.steps - - try: - os.mkdir('logs') - except: - pass - - # Queue a tick - self._t = int(wasp.watch.rtc.time()) // TICK_PERIOD * TICK_PERIOD - manager.set_alarm(self._t + TICK_PERIOD, self._tick) - - def _tick(self): - """Capture the current step count in N minute intervals. - - The samples are queued in a small RAM buffer in order to reduce - the number of flash access. The data is written out every few hours - in a binary format ready to be reloaded and graphed when it is - needed. - """ - t = self._t - - # Work out where we are in the dump period - i = t % DUMP_PERIOD // TICK_PERIOD - - # Get the current step count and record it - steps = wasp.watch.accel.steps - self._data[i] = steps - self._steps - self._steps = steps - - # Queue the next tick - wasp.system.set_alarm(t + TICK_PERIOD, self._tick) - self._t += TICK_PERIOD - - if i < (DUMP_LENGTH-1): - return - - # Record the data in the flash - walltime = time.localtime(t) - yyyy = walltime[0] - mm = walltime[1] - dd = walltime[2] - - # Find when (in seconds) "today" started - then = int(time.mktime((yyyy, mm, dd, 0, 0, 0, 0, 0, 0))) - elapsed = t - then - - # Work out how dumps we expect to find in today's dumpfile - dump_num = elapsed // DUMP_PERIOD - - # Update the log data - try: - os.mkdir('logs/' + str(yyyy)) - except: - pass - fname = 'logs/{}/{:02d}-{:02d}.steps'.format(yyyy, mm, dd) - offset = dump_num * DUMP_LENGTH * 2 - try: - sz = os.stat(fname)[6] - except: - sz = 0 - f = open(fname, 'ab') - # This is a portable (between real Python and MicroPython) way to - # grow the file to the right size. - f.seek(min(sz, offset)) - for _ in range(sz, offset, 2): - f.write(b'\x00\x00') - f.write(self._data) - f.close() - - # Wipe the data - data = self._data - for i in range(DUMP_LENGTH): - data[i] = 0 - - def data(self, t): - try: - yyyy = t[0] - except: - t = time.localtime(t) - yyyy = t[0] - mm = t[1] - dd = t[2] - - fname = 'logs/{}/{:02d}-{:02d}.steps'.format(yyyy, mm, dd) - try: - os.stat(fname) - except: - return None - - # Record the data in the flash - now = time.localtime(self._t) - if now[:3] == t[:3]: - latest = self._data - - # Work out where we are in the dump period and update - # with the latest counts - i = self._t % DUMP_PERIOD // TICK_PERIOD - latest[i] = wasp.watch.accel.steps - self._steps - else: - latest = None - - return StepIterator(fname, latest)