nopenpilot/selfdrive/test/test_valgrind_replay.py

118 lines
3.7 KiB
Python
Executable File

#!/usr/bin/env python3
import os
import threading
import time
import unittest
import subprocess
import signal
if "CI" in os.environ:
def tqdm(x):
return x
else:
from tqdm import tqdm # type: ignore
import cereal.messaging as messaging
from collections import namedtuple
from tools.lib.logreader import LogReader
from selfdrive.test.openpilotci import get_url
from common.basedir import BASEDIR
ProcessConfig = namedtuple('ProcessConfig', ['proc_name', 'pub_sub', 'ignore', 'command', 'path', 'segment', 'wait_for_response'])
CONFIGS = [
ProcessConfig(
proc_name="ubloxd",
pub_sub={
"ubloxRaw": ["ubloxGnss", "gpsLocationExternal"],
},
ignore=[],
command="./ubloxd",
path="selfdrive/locationd/",
segment="0375fdf7b1ce594d|2019-06-13--08-32-25--3",
wait_for_response=True
),
]
class TestValgrind(unittest.TestCase):
def extract_leak_sizes(self, log):
if "All heap blocks were freed -- no leaks are possible" in log:
return (0,0,0)
log = log.replace(",","") # fixes casting to int issue with large leaks
err_lost1 = log.split("definitely lost: ")[1]
err_lost2 = log.split("indirectly lost: ")[1]
err_lost3 = log.split("possibly lost: ")[1]
definitely_lost = int(err_lost1.split(" ")[0])
indirectly_lost = int(err_lost2.split(" ")[0])
possibly_lost = int(err_lost3.split(" ")[0])
return (definitely_lost, indirectly_lost, possibly_lost)
def valgrindlauncher(self, arg, cwd):
os.chdir(os.path.join(BASEDIR, cwd))
# Run valgrind on a process
command = "valgrind --leak-check=full " + arg
p = subprocess.Popen(command, stderr=subprocess.PIPE, shell=True, preexec_fn=os.setsid) # pylint: disable=W1509
while not self.replay_done:
time.sleep(0.1)
# Kill valgrind and extract leak output
os.killpg(os.getpgid(p.pid), signal.SIGINT)
_, err = p.communicate()
error_msg = str(err, encoding='utf-8')
with open(os.path.join(BASEDIR, "selfdrive/test/valgrind_logs.txt"), "a") as f:
f.write(error_msg)
f.write(5 * "\n")
definitely_lost, indirectly_lost, possibly_lost = self.extract_leak_sizes(error_msg)
if max(definitely_lost, indirectly_lost, possibly_lost) > 0:
self.leak = True
print("LEAKS from", arg, "\nDefinitely lost:", definitely_lost, "\nIndirectly lost", indirectly_lost, "\nPossibly lost", possibly_lost)
else:
self.leak = False
def replay_process(self, config, logreader):
pub_sockets = [s for s in config.pub_sub.keys()] # We dump data from logs here
sub_sockets = [s for _, sub in config.pub_sub.items() for s in sub] # We get responses here
pm = messaging.PubMaster(pub_sockets)
sm = messaging.SubMaster(sub_sockets)
print("Sorting logs")
all_msgs = sorted(logreader, key=lambda msg: msg.logMonoTime)
pub_msgs = [msg for msg in all_msgs if msg.which() in list(config.pub_sub.keys())]
thread = threading.Thread(target=self.valgrindlauncher, args=(config.command, config.path))
thread.daemon = True
thread.start()
while not all(pm.all_readers_updated(s) for s in config.pub_sub.keys()):
time.sleep(0)
for msg in tqdm(pub_msgs):
pm.send(msg.which(), msg.as_builder())
if config.wait_for_response:
sm.update(100)
self.replay_done = True
def test_config(self):
open(os.path.join(BASEDIR, "selfdrive/test/valgrind_logs.txt"), "w").close()
for cfg in CONFIGS:
self.leak = None
self.replay_done = False
r, n = cfg.segment.rsplit("--", 1)
lr = LogReader(get_url(r, n))
self.replay_process(cfg, lr)
while self.leak is None:
time.sleep(0.1) # Wait for the valgrind to finish
self.assertFalse(self.leak)
if __name__ == "__main__":
unittest.main()