RobustLogReader that can recover corrupted bz2 files (#22835)

* LogReader with bzip2 recovery

* only rlogs

* add comment

* plotjuggler should also use robust logreader
pull/22784/head
Willem Melching 2021-11-10 16:41:00 +01:00 committed by GitHub
parent 71e22a6009
commit 6d6f989b7f
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 64 additions and 4 deletions

View File

@ -3,7 +3,7 @@ import argparse
import json
import cereal.messaging as messaging
from tools.lib.logreader import LogReader
from tools.lib.robust_logreader import RobustLogReader as LogReader
from tools.lib.route import Route
LEVELS = {
@ -56,7 +56,7 @@ if __name__ == "__main__":
logs = None
if len(args.route):
r = Route(args.route[0])
logs = [q if r is None else r for (q, r) in zip(r.qlog_paths(), r.log_paths())]
logs = r.log_paths()
if len(args.route) == 2 and logs:
n = int(args.route[1])

View File

@ -0,0 +1,60 @@
#!/usr/bin/env python3
import os
import bz2
import urllib.parse
import subprocess
import tqdm
import glob
from tempfile import TemporaryDirectory
import capnp
from tools.lib.logreader import FileReader, LogReader
from cereal import log as capnp_log
class RobustLogReader(LogReader):
def __init__(self, fn, canonicalize=True, only_union_types=False): # pylint: disable=super-init-not-called
data_version = None
_, ext = os.path.splitext(urllib.parse.urlparse(fn).path)
with FileReader(fn) as f:
dat = f.read()
if ext == "":
pass
elif ext == ".bz2":
try:
dat = bz2.decompress(dat)
except ValueError:
print("Failed to decompress, falling back to bzip2recover")
with TemporaryDirectory() as directory:
# Run bzip2recovery on log
with open(os.path.join(directory, 'out.bz2'), 'wb') as f:
f.write(dat)
subprocess.check_call(["bzip2recover", "out.bz2"], cwd=directory)
# Decompress and concatenate parts
dat = b""
for n in sorted(glob.glob(f"{directory}/rec*.bz2")):
print(f"Decompressing {n}")
with open(n, 'rb') as f:
dat += bz2.decompress(f.read())
else:
raise Exception(f"unknown extension {ext}")
progress = None
while True:
try:
ents = capnp_log.Event.read_multiple_bytes(dat)
self._ents = list(ents)
break
except capnp.lib.capnp.KjException:
if progress is None:
progress = tqdm.tqdm(total=len(dat))
# Cut off bytes at the end until capnp is able to read
dat = dat[:-1]
progress.update(1)
self._ts = [x.logMonoTime for x in self._ents]
self.data_version = data_version
self._only_union_types = only_union_types

View File

@ -10,7 +10,7 @@ from common.basedir import BASEDIR
from selfdrive.test.process_replay.compare_logs import save_log
from tools.lib.api import CommaApi
from tools.lib.auth_config import get_token
from tools.lib.logreader import LogReader
from tools.lib.robust_logreader import RobustLogReader
from tools.lib.route import Route
from urllib.parse import urlparse, parse_qs
@ -22,7 +22,7 @@ def load_segment(segment_name):
return []
try:
return list(LogReader(segment_name))
return list(RobustLogReader(segment_name))
except ValueError as e:
print(f"Error parsing {segment_name}: {e}")
return []