Tools: Storage API (#1161)

* filereader

* support URLs in filereader, logreader

* unused

* use route files api; add auth file

* Implement browser auth

* Update readme, fix up cache paths

* Add tests, clear token on 401

* Factor out URLFile

* space
albatross
Andy 2020-02-24 21:24:54 -05:00 committed by GitHub
parent 7f390b3875
commit c4af05868b
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
12 changed files with 336 additions and 203 deletions

View File

@ -81,6 +81,7 @@ jobs:
$RUN "$UNIT_TEST selfdrive/car"
$RUN "$UNIT_TEST selfdrive/locationd"
$RUN "$UNIT_TEST selfdrive/athena"
$RUN "$UNIT_TEST tools/lib/tests"
process_replay:
name: process replay

106
common/url_file.py 100644
View File

@ -0,0 +1,106 @@
import os
import sys
import time
import tempfile
import threading
import urllib.parse
import pycurl
import hashlib
from io import BytesIO
from tenacity import retry, wait_random_exponential, stop_after_attempt
from common.file_helpers import mkdirs_exists_ok, atomic_write_in_dir
class URLFile(object):
_tlocal = threading.local()
def __init__(self, url, debug=False):
self._url = url
self._pos = 0
self._local_file = None
self._debug = debug
try:
self._curl = self._tlocal.curl
except AttributeError:
self._curl = self._tlocal.curl = pycurl.Curl()
def __enter__(self):
return self
def __exit__(self, type, value, traceback):
if self._local_file is not None:
os.remove(self._local_file.name)
self._local_file.close()
self._local_file = None
@retry(wait=wait_random_exponential(multiplier=1, max=5), stop=stop_after_attempt(3), reraise=True)
def read(self, ll=None):
if ll is None:
trange = 'bytes=%d-' % self._pos
else:
trange = 'bytes=%d-%d' % (self._pos, self._pos+ll-1)
dats = BytesIO()
c = self._curl
c.setopt(pycurl.URL, self._url)
c.setopt(pycurl.WRITEDATA, dats)
c.setopt(pycurl.NOSIGNAL, 1)
c.setopt(pycurl.TIMEOUT_MS, 500000)
c.setopt(pycurl.HTTPHEADER, ["Range: " + trange, "Connection: keep-alive"])
c.setopt(pycurl.FOLLOWLOCATION, True)
if self._debug:
print("downloading", self._url)
def header(x):
if b'MISS' in x:
print(x.strip())
c.setopt(pycurl.HEADERFUNCTION, header)
def test(debug_type, debug_msg):
print(" debug(%d): %s" % (debug_type, debug_msg.strip()))
c.setopt(pycurl.VERBOSE, 1)
c.setopt(pycurl.DEBUGFUNCTION, test)
t1 = time.time()
c.perform()
if self._debug:
t2 = time.time()
if t2-t1 > 0.1:
print("get %s %r %.f slow" % (self._url, trange, t2-t1))
response_code = c.getinfo(pycurl.RESPONSE_CODE)
if response_code == 416: # Requested Range Not Satisfiable
return ""
if response_code != 206 and response_code != 200:
raise Exception("Error {}: {}".format(response_code, repr(dats.getvalue())[:500]))
ret = dats.getvalue()
self._pos += len(ret)
return ret
def seek(self, pos):
self._pos = pos
@property
def name(self):
"""Returns a local path to file with the URLFile's contents.
This can be used to interface with modules that require local files.
"""
if self._local_file is None:
_, ext = os.path.splitext(urllib.parse.urlparse(self._url).path)
local_fd, local_path = tempfile.mkstemp(suffix=ext)
try:
os.write(local_fd, self.read())
local_file = open(local_path, "rb")
except:
os.remove(local_path)
raise
finally:
os.close(local_fd)
self._local_file = local_file
self.read = self._local_file.read
self.seek = self._local_file.seek
return self._local_file.name

View File

@ -191,9 +191,21 @@ Replay driving data
`unlogger.py` replays data collected with [chffrplus](https://github.com/commaai/chffrplus) or [openpilot](https://github.com/commaai/openpilot).
You'll need to download log and camera files into a local directory. Download these from the footer of the comma [explorer](https://my.comma.ai) or SCP from your device.
Usage (Remote data):
```
# Log in via browser
python lib/auth.py
Usage:
# Start unlogger
python replay/unlogger.py <route-name>
#Example:
#python replay/unlogger.py '99c94dc769b5d96e|2018-11-14--13-31-42'
# In another terminal you can run a debug visualizer:
python replay/ui.py # Define the environmental variable HORIZONTAL is the ui layout is too tall
```
Usage (Local data downloaded from device or https://my.comma.ai):
```
python replay/unlogger.py <route-name> <path-to-data-directory>
@ -206,9 +218,6 @@ python replay/unlogger.py <route-name> <path-to-data-directory>
# 99c94dc769b5d96e|2018-11-14--13-31-42--0--fcamera.hevc
# 99c94dc769b5d96e|2018-11-14--13-31-42--0--rlog.bz2
# ...
# In another terminal you can run a debug visualizer:
python replay/ui.py # Define the environmental variable HORIZONTAL is the ui layout is too tall
```
![Imgur](https://i.imgur.com/Yppe0h2.png)

37
tools/lib/api.py 100644
View File

@ -0,0 +1,37 @@
import sys
import os
import requests
from tools.lib.auth_config import clear_token
API_HOST = os.getenv('API_HOST', 'https://api.commadotai.com')
class CommaApi():
def __init__(self, token=None):
self.session = requests.Session()
self.session.headers['User-agent'] = 'OpenpilotTools'
if token:
self.session.headers['Authorization'] = 'JWT ' + token
def request(self, method, endpoint, **kwargs):
resp = self.session.request(method, API_HOST + '/' + endpoint, **kwargs)
resp_json = resp.json()
if isinstance(resp_json, dict) and resp_json.get('error'):
if resp.status_code == 401:
clear_token()
raise UnauthorizedError('Unauthorized. Authenticate with tools/lib/auth.py')
e = APIError(str(resp.status_code) + ":" + resp_json.get('description', str(resp_json['error'])))
e.status_code = resp.status_code
raise e
return resp_json
def get(self, endpoint, **kwargs):
return self.request('GET', endpoint, **kwargs)
def post(self, endpoint, **kwargs):
return self.request('POST', endpoint, **kwargs)
class APIError(Exception):
pass
class UnauthorizedError(Exception):
pass

73
tools/lib/auth.py 100755
View File

@ -0,0 +1,73 @@
#!/usr/bin/env python3
import json
import os
import sys
import webbrowser
from http.server import HTTPServer, BaseHTTPRequestHandler
from urllib.parse import urlencode, parse_qs
from common.file_helpers import mkdirs_exists_ok
from tools.lib.api import CommaApi, APIError
from tools.lib.auth_config import set_token
class ClientRedirectServer(HTTPServer):
query_params = {}
class ClientRedirectHandler(BaseHTTPRequestHandler):
def do_GET(self):
if not self.path.startswith('/auth_redirect'):
self.send_response(204)
return
query = self.path.split('?', 1)[-1]
query = parse_qs(query, keep_blank_values=True)
self.server.query_params = query
self.send_response(200)
self.send_header('Content-type', 'text/plain')
self.end_headers()
self.wfile.write(b'Return to the CLI to continue')
def log_message(self, format, *args):
pass # this prevent http server from dumping messages to stdout
def auth_redirect_link(port):
redirect_uri = f'http://localhost:{port}/auth_redirect'
params = {
'type': 'web_server',
'client_id': '45471411055-ornt4svd2miog6dnopve7qtmh5mnu6id.apps.googleusercontent.com',
'redirect_uri': redirect_uri,
'response_type': 'code',
'scope': 'https://www.googleapis.com/auth/userinfo.email',
'prompt': 'select_account',
}
return (redirect_uri, 'https://accounts.google.com/o/oauth2/auth?' + urlencode(params))
def login():
port = 9090
redirect_uri, oauth_uri = auth_redirect_link(port)
web_server = ClientRedirectServer(('localhost', port), ClientRedirectHandler)
webbrowser.open(oauth_uri, new=2)
while True:
web_server.handle_request()
if 'code' in web_server.query_params:
code = web_server.query_params['code']
break
elif 'error' in web_server.query_params:
print('Authentication Error: "%s". Description: "%s" ' % (
web_server.query_params['error'],
web_server.query_params.get('error_description')), file=sys.stderr)
break
try:
auth_resp = CommaApi().post('v2/auth/', data={'code': code, 'redirect_uri': redirect_uri})
set_token(auth_resp['access_token'])
print('Authenticated')
except APIError as e:
print(f'Authentication Error: {e}', file=sys.stderr)
if __name__ == '__main__':
login()

View File

@ -0,0 +1,24 @@
import json
import os
from common.file_helpers import mkdirs_exists_ok
class MissingAuthConfigError(Exception):
pass
CONFIG_DIR = os.path.expanduser('~/.comma')
mkdirs_exists_ok(CONFIG_DIR)
def get_token():
try:
with open(os.path.join(CONFIG_DIR, 'auth.json')) as f:
auth = json.load(f)
return auth['access_token']
except:
raise MissingAuthConfigError('Authenticate with tools/lib/auth.py')
def set_token(token):
with open(os.path.join(CONFIG_DIR, 'auth.json'), 'w') as f:
json.dump({'access_token': token}, f)
def clear_token():
os.unlink(os.path.join(CONFIG_DIR, 'auth.json'))

View File

@ -1,4 +1,5 @@
import os
import urllib.parse
from tools.lib.file_helpers import mkdirs_exists_ok
DEFAULT_CACHE_DIR = os.path.expanduser("~/.commacache")
@ -6,4 +7,9 @@ DEFAULT_CACHE_DIR = os.path.expanduser("~/.commacache")
def cache_path_for_file_path(fn, cache_prefix=None):
dir_ = os.path.join(DEFAULT_CACHE_DIR, "local")
mkdirs_exists_ok(dir_)
return os.path.join(dir_, os.path.abspath(fn).replace("/", "_"))
fn_parsed = urllib.parse.urlparse(fn)
if fn_parsed.scheme == '':
cache_fn = os.path.abspath(fn).replace("/", "_")
else:
cache_fn = f'{fn_parsed.hostname}_{fn_parsed.path.replace("/", "_")}'
return os.path.join(dir_, cache_fn)

View File

@ -1,3 +1,7 @@
def FileReader(fn):
return open(fn, 'rb')
from common.url_file import URLFile
def FileReader(fn, debug=False):
if fn.startswith("http://") or fn.startswith("https://"):
return URLFile(fn, debug=debug)
else:
return open(fn, "rb")

View File

@ -1,111 +0,0 @@
from cereal import log as capnp_log
def write_can_to_msg(data, src, msg):
if not isinstance(data[0], Sequence):
data = [data]
can_msgs = msg.init('can', len(data))
for i, d in enumerate(data):
if d[0] < 0: continue # ios bug
cc = can_msgs[i]
cc.address = d[0]
cc.busTime = 0
cc.dat = hex_to_str(d[2])
if len(d) == 4:
cc.src = d[3]
cc.busTime = d[1]
else:
cc.src = src
def convert_old_pkt_to_new(old_pkt):
m, d = old_pkt
msg = capnp_log.Event.new_message()
if len(m) == 3:
_, pid, t = m
msg.logMonoTime = t
else:
t, pid = m
msg.logMonoTime = int(t * 1e9)
last_velodyne_time = None
if pid == PID_OBD:
write_can_to_msg(d, 0, msg)
elif pid == PID_CAM:
frame = msg.init('frame')
frame.frameId = d[0]
frame.timestampEof = msg.logMonoTime
# iOS
elif pid == PID_IGPS:
loc = msg.init('gpsLocation')
loc.latitude = d[0]
loc.longitude = d[1]
loc.speed = d[2]
loc.timestamp = int(m[0]*1000.0) # on iOS, first number is wall time in seconds
loc.flags = 1 | 4 # has latitude, longitude, and speed.
elif pid == PID_IMOTION:
user_acceleration = d[:3]
gravity = d[3:6]
# iOS separates gravity from linear acceleration, so we recombine them.
# Apple appears to use this constant for the conversion.
g = -9.8
acceleration = [g*(a + b) for a, b in zip(user_acceleration, gravity)]
accel_event = msg.init('sensorEvents', 1)[0]
accel_event.acceleration.v = acceleration
# android
elif pid == PID_GPS:
if len(d) <= 6 or d[-1] == "gps":
loc = msg.init('gpsLocation')
loc.latitude = d[0]
loc.longitude = d[1]
loc.speed = d[2]
if len(d) > 6:
loc.timestamp = d[6]
loc.flags = 1 | 4 # has latitude, longitude, and speed.
elif pid == PID_ACCEL:
val = d[2] if type(d[2]) != type(0.0) else d
accel_event = msg.init('sensorEvents', 1)[0]
accel_event.acceleration.v = val
elif pid == PID_GYRO:
val = d[2] if type(d[2]) != type(0.0) else d
gyro_event = msg.init('sensorEvents', 1)[0]
gyro_event.init('gyro').v = val
elif pid == PID_LIDAR:
lid = msg.init('lidarPts')
lid.idx = d[3]
elif pid == PID_APPLANIX:
loc = msg.init('liveLocation')
loc.status = d[18]
loc.lat, loc.lon, loc.alt = d[0:3]
loc.vNED = d[3:6]
loc.roll = d[6]
loc.pitch = d[7]
loc.heading = d[8]
loc.wanderAngle = d[9]
loc.trackAngle = d[10]
loc.speed = d[11]
loc.gyro = d[12:15]
loc.accel = d[15:18]
elif pid == PID_IBAROMETER:
pressure_event = msg.init('sensorEvents', 1)[0]
_, pressure = d[0:2]
pressure_event.init('pressure').v = [pressure] # Kilopascals
elif pid == PID_IINIT and len(d) == 4:
init_event = msg.init('initData')
init_event.deviceType = capnp_log.InitData.DeviceType.chffrIos
build_info = init_event.init('iosBuildInfo')
build_info.appVersion = d[0]
build_info.appBuild = int(d[1])
build_info.osVersion = d[2]
build_info.deviceModel = d[3]
return msg.as_reader()

74
tools/lib/logreader.py 100644 → 100755
View File

@ -1,24 +1,20 @@
import os
import sys
import gzip
import zlib
import json
import bz2
import tempfile
import requests
import subprocess
import urllib.parse
from aenum import Enum
import capnp
import numpy as np
import platform
from tools.lib.exceptions import DataUnreadableError
try:
from xx.chffr.lib.filereader import FileReader
except ImportError:
from tools.lib.filereader import FileReader
from tools.lib.log_util import convert_old_pkt_to_new
from cereal import log as capnp_log
OP_PATH = os.path.dirname(os.path.dirname(capnp_log.__file__))
@ -90,8 +86,6 @@ class MultiLogIterator(object):
while 1:
lr = self._log_reader(self._current_log)
ret = lr._ents[self._idx]
if lr._do_conversion:
ret = convert_old_pkt_to_new(ret, lr.data_version)
self._inc()
return ret
@ -116,74 +110,28 @@ class MultiLogIterator(object):
class LogReader(object):
def __init__(self, fn, canonicalize=True, only_union_types=False):
_, ext = os.path.splitext(fn)
data_version = None
_, ext = os.path.splitext(urllib.parse.urlparse(fn).path)
with FileReader(fn) as f:
dat = f.read()
# decompress file
if ext == ".gz" and ("log_" in fn or "log2" in fn):
dat = zlib.decompress(dat, zlib.MAX_WBITS|32)
if ext == "":
# old rlogs weren't bz2 compressed
ents = event_read_multiple_bytes(dat)
elif ext == ".bz2":
dat = bz2.decompress(dat)
elif ext == ".7z":
if platform.system() == "Darwin":
os.environ["LA_LIBRARY_FILEPATH"] = "/usr/local/opt/libarchive/lib/libarchive.dylib"
import libarchive.public
with libarchive.public.memory_reader(dat) as aa:
mdat = []
for it in aa:
for bb in it.get_blocks():
mdat.append(bb)
dat = ''.join(mdat)
# TODO: extension shouln't be a proxy for DeviceType
if ext == "":
if dat[0] == "[":
needs_conversion = True
ents = [json.loads(x) for x in dat.strip().split("\n")[:-1]]
if "_" in fn:
data_version = fn.split("_")[1]
else:
# old rlogs weren't bz2 compressed
needs_conversion = False
ents = event_read_multiple_bytes(dat)
elif ext == ".gz":
if "log_" in fn:
# Zero data file.
ents = [json.loads(x) for x in dat.strip().split("\n")[:-1]]
needs_conversion = True
elif "log2" in fn:
needs_conversion = False
ents = event_read_multiple_bytes(dat)
else:
raise Exception("unknown extension")
elif ext == ".bz2":
needs_conversion = False
ents = event_read_multiple_bytes(dat)
elif ext == ".7z":
needs_conversion = True
ents = [json.loads(x) for x in dat.strip().split("\n")]
else:
raise Exception("unknown extension")
if needs_conversion:
# TODO: should we call convert_old_pkt_to_new to generate this?
self._ts = [x[0][0]*1e9 for x in ents]
else:
self._ts = [x.logMonoTime for x in ents]
raise Exception(f"unknown extension {ext}")
self._ts = [x.logMonoTime for x in ents]
self.data_version = data_version
self._do_conversion = needs_conversion and canonicalize
self._only_union_types = only_union_types
self._ents = ents
def __iter__(self):
for ent in self._ents:
if self._do_conversion:
yield convert_old_pkt_to_new(ent, self.data_version)
elif self._only_union_types:
if self._only_union_types:
try:
ent.which()
yield ent
@ -192,12 +140,6 @@ class LogReader(object):
else:
yield ent
def load_many_logs_canonical(log_paths):
"""Load all logs for a sequence of log paths."""
for log_path in log_paths:
for msg in LogReader(log_path):
yield msg
if __name__ == "__main__":
log_path = sys.argv[1]
lr = LogReader(log_path)

View File

@ -1,18 +1,26 @@
import os
import re
from urllib.parse import urlparse
from collections import defaultdict
from itertools import chain
from tools.lib.auth_config import get_token
from tools.lib.api import CommaApi
SEGMENT_NAME_RE = r'[a-z0-9]{16}[|_][0-9]{4}-[0-9]{2}-[0-9]{2}--[0-9]{2}-[0-9]{2}-[0-9]{2}--[0-9]+'
EXPLORER_FILE_RE = r'^({})--([a-z]+\.[a-z0-9]+)$'.format(SEGMENT_NAME_RE)
OP_SEGMENT_DIR_RE = r'^({})$'.format(SEGMENT_NAME_RE)
LOG_FILENAMES = ['rlog.bz2', 'raw_log.bz2', 'log2.gz', 'ilog.7z']
CAMERA_FILENAMES = ['fcamera.hevc', 'video.hevc', 'acamera', 'icamera']
LOG_FILENAMES = ['rlog.bz2', 'raw_log.bz2']
CAMERA_FILENAMES = ['fcamera.hevc', 'video.hevc']
class Route(object):
def __init__(self, route_name, data_dir):
def __init__(self, route_name, data_dir=None):
self.route_name = route_name.replace('_', '|')
self._segments = self._get_segments(data_dir)
if data_dir is not None:
self._segments = self._get_segments_local(data_dir)
else:
self._segments = self._get_segments_remote()
@property
def segments(self):
@ -28,7 +36,30 @@ class Route(object):
camera_path_by_seg_num = {s.canonical_name.segment_num: s.camera_path for s in self._segments}
return [camera_path_by_seg_num.get(i, None) for i in range(max_seg_number+1)]
def _get_segments(self, data_dir):
def _get_segments_remote(self):
api = CommaApi(get_token())
route_files = api.get('v1/route/' + self.route_name + '/files')
segments = {}
for url in chain.from_iterable(route_files.values()):
_, _, dongle_id, time_str, segment_num, fn = urlparse(url).path.split('/')
segment_name = f'{dongle_id}|{time_str}--{segment_num}'
if segments.get(segment_name):
segments[segment_name] = RouteSegment(
segment_name,
url if fn in LOG_FILENAMES else segments[segment_name].log_path,
url if fn in CAMERA_FILENAMES else segments[segment_name].camera_path
)
else:
segments[segment_name] = RouteSegment(
segment_name,
url if fn in LOG_FILENAMES else None,
url if fn in CAMERA_FILENAMES else None
)
return sorted(segments.values(), key=lambda seg: seg.canonical_name.segment_num)
def _get_segments_local(self, data_dir):
files = os.listdir(data_dir)
segment_files = defaultdict(list)

View File

@ -10,12 +10,7 @@ from tools.lib.logreader import LogReader
class TestReaders(unittest.TestCase):
def test_logreader(self):
with tempfile.NamedTemporaryFile(suffix=".bz2") as fp:
r = requests.get("https://github.com/commaai/comma2k19/blob/master/Example_1/b0c9d2329ad1606b%7C2018-08-02--08-34-47/40/raw_log.bz2?raw=true")
fp.write(r.content)
fp.flush()
lr = LogReader(fp.name)
def _check_data(lr):
hist = defaultdict(int)
for l in lr:
hist[l.which()] += 1
@ -23,14 +18,20 @@ class TestReaders(unittest.TestCase):
self.assertEqual(hist['carControl'], 6000)
self.assertEqual(hist['logMessage'], 6857)
def test_framereader(self):
with tempfile.NamedTemporaryFile(suffix=".hevc") as fp:
r = requests.get("https://github.com/commaai/comma2k19/blob/master/Example_1/b0c9d2329ad1606b%7C2018-08-02--08-34-47/40/video.hevc?raw=true")
with tempfile.NamedTemporaryFile(suffix=".bz2") as fp:
r = requests.get("https://github.com/commaai/comma2k19/blob/master/Example_1/b0c9d2329ad1606b%7C2018-08-02--08-34-47/40/raw_log.bz2?raw=true")
fp.write(r.content)
fp.flush()
f = FrameReader(fp.name)
lr_file = LogReader(fp.name)
_check_data(lr_file)
lr_url = LogReader("https://github.com/commaai/comma2k19/blob/master/Example_1/b0c9d2329ad1606b%7C2018-08-02--08-34-47/40/raw_log.bz2?raw=true")
_check_data(lr_url)
def test_framereader(self):
def _check_data(f):
self.assertEqual(f.frame_count, 1200)
self.assertEqual(f.w, 1164)
self.assertEqual(f.h, 874)
@ -48,9 +49,19 @@ class TestReaders(unittest.TestCase):
print(frame_15[0])
assert np.all(frame_first_30[0] == frame_0[0])
assert np.all(frame_first_30[15] == frame_15[0])
assert np.all(frame_first_30[0] == frame_0[0])
assert np.all(frame_first_30[15] == frame_15[0])
with tempfile.NamedTemporaryFile(suffix=".hevc") as fp:
r = requests.get("https://github.com/commaai/comma2k19/blob/master/Example_1/b0c9d2329ad1606b%7C2018-08-02--08-34-47/40/video.hevc?raw=true")
fp.write(r.content)
fp.flush()
fr_file = FrameReader(fp.name)
_check_data(fr_file)
fr_url = FrameReader("https://github.com/commaai/comma2k19/blob/master/Example_1/b0c9d2329ad1606b%7C2018-08-02--08-34-47/40/video.hevc?raw=true")
_check_data(fr_url)
if __name__ == "__main__":
unittest.main()