nopenpilot/tools/lib/robust_logreader.py

61 lines
1.8 KiB
Python
Executable File

#!/usr/bin/env python3
import os
import bz2
import urllib.parse
import subprocess
import tqdm
import glob
from tempfile import TemporaryDirectory
import capnp
from tools.lib.logreader import FileReader, LogReader
from cereal import log as capnp_log
class RobustLogReader(LogReader):
def __init__(self, fn, canonicalize=True, only_union_types=False, sort_by_time=False): # pylint: disable=super-init-not-called
data_version = None
_, ext = os.path.splitext(urllib.parse.urlparse(fn).path)
with FileReader(fn) as f:
dat = f.read()
if ext == "":
pass
elif ext == ".bz2":
try:
dat = bz2.decompress(dat)
except ValueError:
print("Failed to decompress, falling back to bzip2recover")
with TemporaryDirectory() as directory:
# Run bzip2recovery on log
with open(os.path.join(directory, 'out.bz2'), 'wb') as f:
f.write(dat)
subprocess.check_call(["bzip2recover", "out.bz2"], cwd=directory)
# Decompress and concatenate parts
dat = b""
for n in sorted(glob.glob(f"{directory}/rec*.bz2")):
print(f"Decompressing {n}")
with open(n, 'rb') as f:
dat += bz2.decompress(f.read())
else:
raise Exception(f"unknown extension {ext}")
progress = None
while True:
try:
ents = capnp_log.Event.read_multiple_bytes(dat)
self._ents = list(sorted(ents, key=lambda x: x.logMonoTime) if sort_by_time else ents)
break
except capnp.lib.capnp.KjException:
if progress is None:
progress = tqdm.tqdm(total=len(dat))
# Cut off bytes at the end until capnp is able to read
dat = dat[:-1]
progress.update(1)
self._ts = [x.logMonoTime for x in self._ents]
self.data_version = data_version
self._only_union_types = only_union_types