From 5a3b51130649356b14b24e3c82e014b3a0be95a8 Mon Sep 17 00:00:00 2001 From: Willem Melching Date: Fri, 5 Mar 2021 11:03:23 +0100 Subject: [PATCH] Manager cleanup (#20231) * move manager in folder * inital refactor * call start * small cleanup * add comments * use self.signal() * order shouldnt matter * newlines * add helpers * newlines * add process config * split out build part of manager * this should fix most tests * no sensord on pc * dont start athena * remove comment * fix old athena test * fix inject model * fix test car models * should be not none * fix helpers exitcode * ignore manage_athenad * Use time.monotonic() Co-authored-by: Adeeb Shihadeh * combine init, remove spinner * move manager test Co-authored-by: Adeeb Shihadeh --- Jenkinsfile | 2 +- launch_chffrplus.sh | 4 +- release/build_release2.sh | 2 +- release/build_release3.sh | 2 +- release/files_common | 12 +- selfdrive/athena/manage_athenad.py | 4 +- selfdrive/athena/tests/test_athenad_old.py | 10 +- selfdrive/controls/controlsd.py | 2 +- selfdrive/debug/cpu_usage_stat.py | 4 +- selfdrive/debug/uiview.py | 36 +- selfdrive/launcher.py | 27 - selfdrive/loggerd/tests/test_loggerd.py | 22 +- selfdrive/manager.py | 596 ------------------ selfdrive/manager/__init__.py | 0 selfdrive/manager/build.py | 89 +++ selfdrive/manager/helpers.py | 38 ++ selfdrive/manager/manager.py | 222 +++++++ selfdrive/manager/process.py | 225 +++++++ selfdrive/manager/process_config.py | 48 ++ selfdrive/manager/test/__init__.py | 0 selfdrive/{ => manager}/test/test_manager.py | 25 +- selfdrive/modeld/test/timing/benchmark.py | 7 +- selfdrive/test/helpers.py | 19 +- .../test_longitudinal.py | 14 +- .../test/process_replay/camera_replay.py | 30 +- selfdrive/test/process_replay/inject_model.py | 14 +- .../test/process_replay/process_replay.py | 61 +- selfdrive/test/test_car_models.py | 14 +- selfdrive/test/test_onroad.py | 8 +- tools/sim/launch_openpilot.sh | 2 +- tools/webcam/README.md | 2 +- 31 files changed, 786 insertions(+), 755 deletions(-) delete mode 100644 selfdrive/launcher.py delete mode 100755 selfdrive/manager.py create mode 100644 selfdrive/manager/__init__.py create mode 100755 selfdrive/manager/build.py create mode 100644 selfdrive/manager/helpers.py create mode 100755 selfdrive/manager/manager.py create mode 100644 selfdrive/manager/process.py create mode 100644 selfdrive/manager/process_config.py create mode 100644 selfdrive/manager/test/__init__.py rename selfdrive/{ => manager}/test/test_manager.py (72%) diff --git a/Jenkinsfile b/Jenkinsfile index 32bdc496..44c732cd 100644 --- a/Jenkinsfile +++ b/Jenkinsfile @@ -126,7 +126,7 @@ pipeline { phone_steps("eon-build", [ ["build", "SCONS_CACHE=1 scons -j4"], ["test athena", "nosetests -s selfdrive/athena/tests/test_athenad_old.py"], - ["test manager", "python selfdrive/test/test_manager.py"], + ["test manager", "python selfdrive/manager/test/test_manager.py"], ["onroad tests", "cd selfdrive/test/ && ./test_onroad.py"], ["build devel", "cd release && CI_PUSH=${env.CI_PUSH} ./build_devel.sh"], ["test car interfaces", "cd selfdrive/car/tests/ && ./test_car_interfaces.py"], diff --git a/launch_chffrplus.sh b/launch_chffrplus.sh index 2e524b6f..8a91bb4e 100755 --- a/launch_chffrplus.sh +++ b/launch_chffrplus.sh @@ -219,8 +219,8 @@ function launch { tmux capture-pane -pq -S-1000 > /tmp/launch_log # start manager - cd selfdrive - ./manager.py + cd selfdrive/manager + ./build.py && ./manager.py # if broken, keep on screen error while true; do sleep 1; done diff --git a/release/build_release2.sh b/release/build_release2.sh index 656eb20f..add9de6e 100755 --- a/release/build_release2.sh +++ b/release/build_release2.sh @@ -56,7 +56,7 @@ export PYTHONPATH="/data/openpilot:/data/openpilot/pyextra" SCONS_CACHE=1 scons -j3 # Run tests -python selfdrive/test/test_manager.py +python selfdrive/manager/test/test_manager.py selfdrive/car/tests/test_car_interfaces.py # Cleanup diff --git a/release/build_release3.sh b/release/build_release3.sh index f02069d6..3df9f8f2 100755 --- a/release/build_release3.sh +++ b/release/build_release3.sh @@ -70,7 +70,7 @@ export PYTHONPATH="$BUILD_DIR" SCONS_CACHE=1 scons -j$(nproc) # Run tests -#python selfdrive/test/test_manager.py +#python selfdrive/manager/test/test_manager.py selfdrive/car/tests/test_car_interfaces.py # Cleanup diff --git a/release/files_common b/release/files_common index 2d8c9005..dbe11018 100644 --- a/release/files_common +++ b/release/files_common @@ -69,8 +69,6 @@ selfdrive/__init__.py selfdrive/registration.py selfdrive/config.py selfdrive/crash.py -selfdrive/launcher.py -selfdrive/manager.py selfdrive/swaglog.py selfdrive/logmessaged.py selfdrive/tombstoned.py @@ -333,7 +331,6 @@ selfdrive/test/__init__.py selfdrive/test/helpers.py selfdrive/test/setup_device_ci.sh selfdrive/test/test_fingerprints.py -selfdrive/test/test_manager.py selfdrive/ui/SConscript selfdrive/ui/*.cc @@ -386,6 +383,15 @@ selfdrive/camerad/imgproc/pool.cl selfdrive/camerad/imgproc/utils.cc selfdrive/camerad/imgproc/utils.h +selfdrive/manager/__init__.py +selfdrive/manager/build.py +selfdrive/manager/helpers.py +selfdrive/manager/manager.py +selfdrive/manager/process_config.py +selfdrive/manager/process.py +selfdrive/manager/test/__init__.py +selfdrive/manager/test/test_manager.py + selfdrive/modeld/SConscript selfdrive/modeld/modeld.cc selfdrive/modeld/dmonitoringmodeld.cc diff --git a/selfdrive/athena/manage_athenad.py b/selfdrive/athena/manage_athenad.py index cd8bc582..c16bef07 100755 --- a/selfdrive/athena/manage_athenad.py +++ b/selfdrive/athena/manage_athenad.py @@ -5,12 +5,13 @@ from multiprocessing import Process import selfdrive.crash as crash from common.params import Params -from selfdrive.launcher import launcher +from selfdrive.manager.process import launcher from selfdrive.swaglog import cloudlog from selfdrive.version import version, dirty ATHENA_MGR_PID_PARAM = "AthenadPid" + def main(): params = Params() dongle_id = params.get("DongleId").decode('utf-8') @@ -32,5 +33,6 @@ def main(): finally: params.delete(ATHENA_MGR_PID_PARAM) + if __name__ == '__main__': main() diff --git a/selfdrive/athena/tests/test_athenad_old.py b/selfdrive/athena/tests/test_athenad_old.py index 739dc7ef..b00a97a5 100644 --- a/selfdrive/athena/tests/test_athenad_old.py +++ b/selfdrive/athena/tests/test_athenad_old.py @@ -1,23 +1,25 @@ #!/usr/bin/env python3 import json import os -import requests import signal import subprocess import time -os.environ['FAKEUPLOAD'] = "1" +import requests +from selfdrive.manager.process_config import managed_processes from common.params import Params from common.realtime import sec_since_boot -import selfdrive.manager as manager from selfdrive.test.helpers import with_processes +os.environ['FAKEUPLOAD'] = "1" + def test_athena(): print("ATHENA") start = sec_since_boot() - manager.start_daemon_process("manage_athenad") + managed_processes['manage_athenad'].start() + params = Params() manage_athenad_pid = params.get("AthenadPid") assert manage_athenad_pid is not None diff --git a/selfdrive/controls/controlsd.py b/selfdrive/controls/controlsd.py index 58a9ac63..f7377f8f 100755 --- a/selfdrive/controls/controlsd.py +++ b/selfdrive/controls/controlsd.py @@ -30,7 +30,7 @@ STEER_ANGLE_SATURATION_THRESHOLD = 2.5 # Degrees SIMULATION = "SIMULATION" in os.environ NOSENSOR = "NOSENSOR" in os.environ -IGNORE_PROCESSES = set(["rtshield", "uploader", "deleter", "loggerd", "logmessaged", "tombstoned", "logcatd", "proclogd", "clocksd", "updated", "timezoned"]) +IGNORE_PROCESSES = set(["rtshield", "uploader", "deleter", "loggerd", "logmessaged", "tombstoned", "logcatd", "proclogd", "clocksd", "updated", "timezoned", "manage_athenad"]) ThermalStatus = log.DeviceState.ThermalStatus State = log.ControlsState.OpenpilotState diff --git a/selfdrive/debug/cpu_usage_stat.py b/selfdrive/debug/cpu_usage_stat.py index 0bc3efde..fa7c266c 100755 --- a/selfdrive/debug/cpu_usage_stat.py +++ b/selfdrive/debug/cpu_usage_stat.py @@ -24,7 +24,7 @@ import argparse import re from collections import defaultdict -import selfdrive.manager as manager +from selfdrive.manager.process_config import managed_processes # Do statistics every 5 seconds PRINT_INTERVAL = 5 @@ -36,7 +36,7 @@ monitored_proc_names = [ # android procs 'SurfaceFlinger', 'sensors.qcom' -] + manager.car_started_processes + manager.persistent_processes +] + list(managed_processes.keys()) cpu_time_names = ['user', 'system', 'children_user', 'children_system'] diff --git a/selfdrive/debug/uiview.py b/selfdrive/debug/uiview.py index 04496295..e194957e 100755 --- a/selfdrive/debug/uiview.py +++ b/selfdrive/debug/uiview.py @@ -1,21 +1,27 @@ #!/usr/bin/env python3 import time import cereal.messaging as messaging -from selfdrive.manager import start_managed_process, kill_managed_process +from selfdrive.manager.process_config import managed_processes -services = ['controlsState', 'deviceState', 'radarState'] # the services needed to be spoofed to start ui offroad -procs = ['camerad', 'ui', 'modeld', 'calibrationd'] -[start_managed_process(p) for p in procs] # start needed processes -pm = messaging.PubMaster(services) -dat_controlsState, dat_deviceState, dat_radar = [messaging.new_message(s) for s in services] -dat_deviceState.deviceState.started = True +if __name__ == "__main__": + services = ['controlsState', 'deviceState', 'radarState'] # the services needed to be spoofed to start ui offroad + procs = ['camerad', 'ui', 'modeld', 'calibrationd'] -try: - while True: - pm.send('controlsState', dat_controlsState) - pm.send('deviceState', dat_deviceState) - pm.send('radarState', dat_radar) - time.sleep(1 / 100) # continually send, rate doesn't matter for deviceState -except KeyboardInterrupt: - [kill_managed_process(p) for p in procs] + for p in procs: + managed_processes[p].start() + + pm = messaging.PubMaster(services) + + dat_controlsState, dat_deviceState, dat_radar = [messaging.new_message(s) for s in services] + dat_deviceState.deviceState.started = True + + try: + while True: + pm.send('controlsState', dat_controlsState) + pm.send('deviceState', dat_deviceState) + pm.send('radarState', dat_radar) + time.sleep(1 / 100) # continually send, rate doesn't matter for deviceState + except KeyboardInterrupt: + for p in procs: + managed_processes[p].stop() diff --git a/selfdrive/launcher.py b/selfdrive/launcher.py deleted file mode 100644 index dcedf071..00000000 --- a/selfdrive/launcher.py +++ /dev/null @@ -1,27 +0,0 @@ -import importlib -from setproctitle import setproctitle # pylint: disable=no-name-in-module - -import cereal.messaging as messaging -import selfdrive.crash as crash -from selfdrive.swaglog import cloudlog - -def launcher(proc): - try: - # import the process - mod = importlib.import_module(proc) - - # rename the process - setproctitle(proc) - - # create new context since we forked - messaging.context = messaging.Context() - - # exec the process - mod.main() - except KeyboardInterrupt: - cloudlog.warning("child %s got SIGINT" % proc) - except Exception: - # can't install the crash handler becuase sys.excepthook doesn't play nice - # with threads, so catch it here. - crash.capture_exception() - raise diff --git a/selfdrive/loggerd/tests/test_loggerd.py b/selfdrive/loggerd/tests/test_loggerd.py index 823957fc..36d6a937 100755 --- a/selfdrive/loggerd/tests/test_loggerd.py +++ b/selfdrive/loggerd/tests/test_loggerd.py @@ -8,15 +8,15 @@ import unittest from collections import defaultdict from pathlib import Path -from cereal import log import cereal.messaging as messaging +from cereal import log from cereal.services import service_list from common.basedir import BASEDIR -from common.timeout import Timeout from common.params import Params -import selfdrive.manager as manager -from selfdrive.hardware import TICI, PC +from common.timeout import Timeout +from selfdrive.hardware import PC, TICI from selfdrive.loggerd.config import ROOT +from selfdrive.manager.process_config import managed_processes from selfdrive.test.helpers import with_processes from selfdrive.version import version as VERSION from tools.lib.logreader import LogReader @@ -26,8 +26,8 @@ SentinelType = log.Sentinel.SentinelType CEREAL_SERVICES = [f for f in log.Event.schema.union_fields if f in service_list and service_list[f].should_log and "encode" not in f.lower()] -class TestLoggerd(unittest.TestCase): +class TestLoggerd(unittest.TestCase): # TODO: all tests should work on PC @classmethod def setUpClass(cls): @@ -118,9 +118,9 @@ class TestLoggerd(unittest.TestCase): length = random.randint(2, 5) os.environ["LOGGERD_SEGMENT_LENGTH"] = str(length) - manager.start_managed_process("loggerd") + managed_processes["loggerd"].start() time.sleep((num_segs + 1) * length) - manager.kill_managed_process("loggerd") + managed_processes["loggerd"].stop() route_path = str(self._get_latest_log_dir()).rsplit("--", 1)[0] for n in range(num_segs): @@ -170,7 +170,7 @@ class TestLoggerd(unittest.TestCase): # sleep enough for the first poll to time out # TOOD: fix loggerd bug dropping the msgs from the first poll - manager.start_managed_process("loggerd") + managed_processes["loggerd"].start() time.sleep(2) sent_msgs = defaultdict(list) @@ -185,7 +185,7 @@ class TestLoggerd(unittest.TestCase): time.sleep(0.01) time.sleep(1) - manager.kill_managed_process("loggerd") + managed_processes["loggerd"].stop() qlog_path = os.path.join(self._get_latest_log_dir(), "qlog.bz2") lr = list(LogReader(qlog_path)) @@ -215,7 +215,7 @@ class TestLoggerd(unittest.TestCase): # sleep enough for the first poll to time out # TOOD: fix loggerd bug dropping the msgs from the first poll - manager.start_managed_process("loggerd") + managed_processes["loggerd"].start() time.sleep(2) sent_msgs = defaultdict(list) @@ -230,7 +230,7 @@ class TestLoggerd(unittest.TestCase): time.sleep(0.01) time.sleep(1) - manager.kill_managed_process("loggerd") + managed_processes["loggerd"].stop() lr = list(LogReader(os.path.join(self._get_latest_log_dir(), "rlog.bz2"))) diff --git a/selfdrive/manager.py b/selfdrive/manager.py deleted file mode 100755 index e1aa7256..00000000 --- a/selfdrive/manager.py +++ /dev/null @@ -1,596 +0,0 @@ -#!/usr/bin/env python3 -import datetime -import importlib -import os -import sys -import fcntl -import errno -import signal -import shutil -import subprocess -import textwrap -import time -import traceback - -from multiprocessing import Process -from typing import Dict - -from common.basedir import BASEDIR -from common.spinner import Spinner -from common.text_window import TextWindow -import selfdrive.crash as crash -from selfdrive.hardware import HARDWARE, EON, PC, TICI -from selfdrive.hardware.eon.apk import update_apks, pm_apply_packages, start_offroad -from selfdrive.swaglog import cloudlog, add_logentries_handler -from selfdrive.version import version, dirty - -os.environ['BASEDIR'] = BASEDIR -sys.path.append(os.path.join(BASEDIR, "pyextra")) - -TOTAL_SCONS_NODES = 1225 -MAX_BUILD_PROGRESS = 70 -WEBCAM = os.getenv("WEBCAM") is not None -PREBUILT = os.path.exists(os.path.join(BASEDIR, 'prebuilt')) - - -def unblock_stdout(): - # get a non-blocking stdout - child_pid, child_pty = os.forkpty() - if child_pid != 0: # parent - - # child is in its own process group, manually pass kill signals - signal.signal(signal.SIGINT, lambda signum, frame: os.kill(child_pid, signal.SIGINT)) - signal.signal(signal.SIGTERM, lambda signum, frame: os.kill(child_pid, signal.SIGTERM)) - - fcntl.fcntl(sys.stdout, fcntl.F_SETFL, fcntl.fcntl(sys.stdout, fcntl.F_GETFL) | os.O_NONBLOCK) - - while True: - try: - dat = os.read(child_pty, 4096) - except OSError as e: - if e.errno == errno.EIO: - break - continue - - if not dat: - break - - try: - sys.stdout.write(dat.decode('utf8')) - except (OSError, IOError, UnicodeDecodeError): - pass - - # os.wait() returns a tuple with the pid and a 16 bit value - # whose low byte is the signal number and whose high byte is the exit satus - exit_status = os.wait()[1] >> 8 - os._exit(exit_status) - -if __name__ == "__main__": - unblock_stdout() - - -# Start spinner -spinner = Spinner() -spinner.update_progress(0, 100) -if __name__ != "__main__": - spinner.close() - -def build(): - env = os.environ.copy() - env['SCONS_PROGRESS'] = "1" - env['SCONS_CACHE'] = "1" - nproc = os.cpu_count() - j_flag = "" if nproc is None else f"-j{nproc - 1}" - - for retry in [True, False]: - scons = subprocess.Popen(["scons", j_flag], cwd=BASEDIR, env=env, stderr=subprocess.PIPE) - - compile_output = [] - - # Read progress from stderr and update spinner - while scons.poll() is None: - try: - line = scons.stderr.readline() - if line is None: - continue - line = line.rstrip() - - prefix = b'progress: ' - if line.startswith(prefix): - i = int(line[len(prefix):]) - spinner.update_progress(MAX_BUILD_PROGRESS * min(1., i / TOTAL_SCONS_NODES), 100.) - elif len(line): - compile_output.append(line) - print(line.decode('utf8', 'replace')) - except Exception: - pass - - if scons.returncode != 0: - # Read remaining output - r = scons.stderr.read().split(b'\n') - compile_output += r - - if retry and (not dirty): - if not os.getenv("CI"): - print("scons build failed, cleaning in") - for i in range(3, -1, -1): - print("....%d" % i) - time.sleep(1) - subprocess.check_call(["scons", "-c"], cwd=BASEDIR, env=env) - shutil.rmtree("/tmp/scons_cache", ignore_errors=True) - shutil.rmtree("/data/scons_cache", ignore_errors=True) - else: - print("scons build failed after retry") - sys.exit(1) - else: - # Build failed log errors - errors = [line.decode('utf8', 'replace') for line in compile_output - if any([err in line for err in [b'error: ', b'not found, needed by target']])] - error_s = "\n".join(errors) - add_logentries_handler(cloudlog) - cloudlog.error("scons build failed\n" + error_s) - - # Show TextWindow - spinner.close() - error_s = "\n \n".join(["\n".join(textwrap.wrap(e, 65)) for e in errors]) - with TextWindow("openpilot failed to build\n \n" + error_s) as t: - t.wait_for_exit() - exit(1) - else: - break - -if __name__ == "__main__" and not PREBUILT: - build() - -import cereal.messaging as messaging -from cereal import log - -from common.params import Params -from selfdrive.registration import register -from selfdrive.launcher import launcher - - -# comment out anything you don't want to run -managed_processes = { - "thermald": "selfdrive.thermald.thermald", - "uploader": "selfdrive.loggerd.uploader", - "deleter": "selfdrive.loggerd.deleter", - "controlsd": "selfdrive.controls.controlsd", - "plannerd": "selfdrive.controls.plannerd", - "radard": "selfdrive.controls.radard", - "dmonitoringd": "selfdrive.monitoring.dmonitoringd", - "ubloxd": ("selfdrive/locationd", ["./ubloxd"]), - "loggerd": ("selfdrive/loggerd", ["./loggerd"]), - "logmessaged": "selfdrive.logmessaged", - "locationd": "selfdrive.locationd.locationd", - "tombstoned": "selfdrive.tombstoned", - "logcatd": ("selfdrive/logcatd", ["./logcatd"]), - "proclogd": ("selfdrive/proclogd", ["./proclogd"]), - "pandad": "selfdrive.pandad", - "ui": ("selfdrive/ui", ["./ui"]), - "calibrationd": "selfdrive.locationd.calibrationd", - "paramsd": "selfdrive.locationd.paramsd", - "camerad": ("selfdrive/camerad", ["./camerad"]), - "sensord": ("selfdrive/sensord", ["./sensord"]), - "clocksd": ("selfdrive/clocksd", ["./clocksd"]), - "updated": "selfdrive.updated", - "dmonitoringmodeld": ("selfdrive/modeld", ["./dmonitoringmodeld"]), - "modeld": ("selfdrive/modeld", ["./modeld"]), - "rtshield": "selfdrive.rtshield", -} - -daemon_processes = { - "manage_athenad": ("selfdrive.athena.manage_athenad", "AthenadPid"), -} - -running: Dict[str, Process] = {} -def get_running(): - return running - -# due to qualcomm kernel bugs SIGKILLing camerad sometimes causes page table corruption -unkillable_processes = ['camerad'] - -# processes to end with SIGKILL instead of SIGTERM -kill_processes = [] -if EON: - kill_processes += [ - 'sensord', - ] - -persistent_processes = [ - 'pandad', - 'thermald', - 'logmessaged', - 'ui', - 'uploader', - 'deleter', -] - -if not PC: - persistent_processes += [ - 'updated', - 'tombstoned', - ] - -if EON: - persistent_processes += [ - 'sensord', - ] - -if TICI: - managed_processes["timezoned"] = "selfdrive.timezoned" - persistent_processes += ['timezoned'] - -car_started_processes = [ - 'controlsd', - 'plannerd', - 'loggerd', - 'radard', - 'calibrationd', - 'paramsd', - 'camerad', - 'modeld', - 'proclogd', - 'locationd', - 'clocksd', - 'logcatd', -] - -driver_view_processes = [ - 'camerad', - 'dmonitoringd', - 'dmonitoringmodeld' -] - -if not PC or WEBCAM: - car_started_processes += [ - 'ubloxd', - 'dmonitoringd', - 'dmonitoringmodeld', - ] - -if EON: - car_started_processes += [ - 'rtshield', - ] -else: - car_started_processes += [ - 'sensord', - ] - -def register_managed_process(name, desc, car_started=False): - global managed_processes, car_started_processes, persistent_processes - managed_processes[name] = desc - if car_started: - car_started_processes.append(name) - else: - persistent_processes.append(name) - -# ****************** process management functions ****************** -def nativelauncher(pargs, cwd): - # exec the process - os.chdir(cwd) - os.execvp(pargs[0], pargs) - -def start_managed_process(name): - if name in running or name not in managed_processes: - return - proc = managed_processes[name] - if isinstance(proc, str): - cloudlog.info("starting python %s" % proc) - running[name] = Process(name=name, target=launcher, args=(proc,)) - else: - pdir, pargs = proc - cwd = os.path.join(BASEDIR, pdir) - cloudlog.info("starting process %s" % name) - running[name] = Process(name=name, target=nativelauncher, args=(pargs, cwd)) - running[name].start() - -def start_daemon_process(name): - params = Params() - proc, pid_param = daemon_processes[name] - pid = params.get(pid_param, encoding='utf-8') - - if pid is not None: - try: - os.kill(int(pid), 0) - with open(f'/proc/{pid}/cmdline') as f: - if proc in f.read(): - # daemon is running - return - except (OSError, FileNotFoundError): - # process is dead - pass - - cloudlog.info("starting daemon %s" % name) - proc = subprocess.Popen(['python', '-m', proc], # pylint: disable=subprocess-popen-preexec-fn - stdin=open('/dev/null', 'r'), - stdout=open('/dev/null', 'w'), - stderr=open('/dev/null', 'w'), - preexec_fn=os.setpgrp) - - params.put(pid_param, str(proc.pid)) - -def prepare_managed_process(p): - proc = managed_processes[p] - if isinstance(proc, str): - # import this python - cloudlog.info("preimporting %s" % proc) - importlib.import_module(proc) - -def join_process(process, timeout): - # Process().join(timeout) will hang due to a python 3 bug: https://bugs.python.org/issue28382 - # We have to poll the exitcode instead - t = time.time() - while time.time() - t < timeout and process.exitcode is None: - time.sleep(0.001) - - -def kill_managed_process(name, retry=True): - if name not in running or name not in managed_processes: - return - cloudlog.info(f"killing {name}") - - if running[name].exitcode is None: - sig = signal.SIGKILL if name in kill_processes else signal.SIGINT - os.kill(running[name].pid, sig) - - join_process(running[name], 5) - - if running[name].exitcode is None: - if not retry: - raise Exception(f"{name} failed to die") - - if name in unkillable_processes: - cloudlog.critical("unkillable process %s failed to exit! rebooting in 15 if it doesn't die" % name) - join_process(running[name], 15) - if running[name].exitcode is None: - cloudlog.critical("unkillable process %s failed to die!" % name) - os.system("date >> /data/unkillable_reboot") - os.sync() - HARDWARE.reboot() - raise RuntimeError - else: - cloudlog.info("killing %s with SIGKILL" % name) - os.kill(running[name].pid, signal.SIGKILL) - running[name].join() - - ret = running[name].exitcode - cloudlog.info(f"{name} is dead with {ret}") - del running[name] - return ret - - -def cleanup_all_processes(signal, frame): - cloudlog.info("caught ctrl-c %s %s" % (signal, frame)) - - if EON: - pm_apply_packages('disable') - - for name in list(running.keys()): - kill_managed_process(name) - cloudlog.info("everything is dead") - - -def send_managed_process_signal(name, sig): - if name not in running or name not in managed_processes or \ - running[name].exitcode is not None: - return - - cloudlog.info(f"sending signal {sig} to {name}") - os.kill(running[name].pid, sig) - - -# ****************** run loop ****************** - -def manager_init(): - os.umask(0) # Make sure we can create files with 777 permissions - - # Create folders needed for msgq - try: - os.mkdir("/dev/shm") - except FileExistsError: - pass - except PermissionError: - print("WARNING: failed to make /dev/shm") - - # set dongle id - reg_res = register(spinner) - if reg_res: - dongle_id = reg_res - else: - raise Exception("server registration failed") - os.environ['DONGLE_ID'] = dongle_id - - if not dirty: - os.environ['CLEAN'] = '1' - - cloudlog.bind_global(dongle_id=dongle_id, version=version, dirty=dirty, - device=HARDWARE.get_device_type()) - crash.bind_user(id=dongle_id) - crash.bind_extra(version=version, dirty=dirty, device=HARDWARE.get_device_type()) - - # ensure shared libraries are readable by apks - if EON: - os.chmod(BASEDIR, 0o755) - os.chmod("/dev/shm", 0o777) - os.chmod(os.path.join(BASEDIR, "cereal"), 0o755) - os.chmod(os.path.join(BASEDIR, "cereal", "libmessaging_shared.so"), 0o755) - -def manager_thread(): - - cloudlog.info("manager start") - cloudlog.info({"environ": os.environ}) - - # save boot log - subprocess.call("./bootlog", cwd=os.path.join(BASEDIR, "selfdrive/loggerd")) - - # start daemon processes - for p in daemon_processes: - start_daemon_process(p) - - # start persistent processes - for p in persistent_processes: - start_managed_process(p) - - # start offroad - if EON and "QT" not in os.environ: - pm_apply_packages('enable') - start_offroad() - - if os.getenv("NOBOARD") is not None: - del managed_processes["pandad"] - - if os.getenv("BLOCK") is not None: - for k in os.getenv("BLOCK").split(","): - del managed_processes[k] - - started_prev = False - logger_dead = False - params = Params() - device_state_sock = messaging.sub_sock('deviceState') - pm = messaging.PubMaster(['managerState']) - - while 1: - msg = messaging.recv_sock(device_state_sock, wait=True) - - if msg.deviceState.freeSpacePercent < 5: - logger_dead = True - - if msg.deviceState.started: - for p in car_started_processes: - if p == "loggerd" and logger_dead: - kill_managed_process(p) - else: - start_managed_process(p) - else: - logger_dead = False - driver_view = params.get("IsDriverViewEnabled") == b"1" - - # TODO: refactor how manager manages processes - for p in reversed(car_started_processes): - if p not in driver_view_processes or not driver_view: - kill_managed_process(p) - - for p in driver_view_processes: - if driver_view: - start_managed_process(p) - else: - kill_managed_process(p) - - # trigger an update after going offroad - if started_prev: - os.sync() - send_managed_process_signal("updated", signal.SIGHUP) - - started_prev = msg.deviceState.started - - # check the status of all processes, did any of them die? - running_list = ["%s%s\u001b[0m" % ("\u001b[32m" if running[p].is_alive() else "\u001b[31m", p) for p in running] - cloudlog.debug(' '.join(running_list)) - - # send managerState - states = [] - for p in managed_processes: - state = log.ManagerState.ProcessState.new_message() - state.name = p - if p in running: - state.running = running[p].is_alive() - state.pid = running[p].pid - state.exitCode = running[p].exitcode or 0 - states.append(state) - msg = messaging.new_message('managerState') - msg.managerState.processes = states - pm.send('managerState', msg) - - # Exit main loop when uninstall is needed - if params.get("DoUninstall", encoding='utf8') == "1": - break - -def manager_prepare(): - # build all processes - os.chdir(os.path.dirname(os.path.abspath(__file__))) - - total = 100.0 - (0 if PREBUILT else MAX_BUILD_PROGRESS) - - for i, p in enumerate(managed_processes): - perc = (100.0 - total) + total * (i + 1) / len(managed_processes) - spinner.update_progress(perc, 100.) - prepare_managed_process(p) - -def main(): - params = Params() - params.manager_start() - - default_params = [ - ("CommunityFeaturesToggle", "0"), - ("CompletedTrainingVersion", "0"), - ("IsRHD", "0"), - ("IsMetric", "0"), - ("RecordFront", "0"), - ("HasAcceptedTerms", "0"), - ("HasCompletedSetup", "0"), - ("IsUploadRawEnabled", "1"), - ("IsLdwEnabled", "1"), - ("LastUpdateTime", datetime.datetime.utcnow().isoformat().encode('utf8')), - ("OpenpilotEnabledToggle", "1"), - ("VisionRadarToggle", "0"), - ("LaneChangeEnabled", "1"), - ("IsDriverViewEnabled", "0"), - ] - - # set unset params - for k, v in default_params: - if params.get(k) is None: - params.put(k, v) - - # is this dashcam? - if os.getenv("PASSIVE") is not None: - params.put("Passive", str(int(os.getenv("PASSIVE")))) - - if params.get("Passive") is None: - raise Exception("Passive must be set to continue") - - if EON: - update_apks() - manager_init() - manager_prepare() - spinner.close() - - if os.getenv("PREPAREONLY") is not None: - return - - # SystemExit on sigterm - signal.signal(signal.SIGTERM, lambda signum, frame: sys.exit(1)) - - try: - manager_thread() - except Exception: - traceback.print_exc() - crash.capture_exception() - finally: - cleanup_all_processes(None, None) - - if params.get("DoUninstall", encoding='utf8') == "1": - cloudlog.warning("uninstalling") - HARDWARE.uninstall() - - -if __name__ == "__main__": - try: - main() - except Exception: - add_logentries_handler(cloudlog) - cloudlog.exception("Manager failed to start") - - # Show last 3 lines of traceback - error = traceback.format_exc(-3) - error = "Manager failed to start\n\n" + error - spinner.close() - with TextWindow(error) as t: - t.wait_for_exit() - - raise - - # manual exit because we are forked - sys.exit(0) diff --git a/selfdrive/manager/__init__.py b/selfdrive/manager/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/selfdrive/manager/build.py b/selfdrive/manager/build.py new file mode 100755 index 00000000..d3b3ddd7 --- /dev/null +++ b/selfdrive/manager/build.py @@ -0,0 +1,89 @@ +#!/usr/bin/env python3 +import os +import shutil +import subprocess +import sys +import time +import textwrap + +# NOTE: Do NOT import anything here that needs be built (e.g. params) +from common.basedir import BASEDIR +from common.spinner import Spinner +from common.text_window import TextWindow +from selfdrive.swaglog import add_logentries_handler, cloudlog +from selfdrive.version import dirty + +TOTAL_SCONS_NODES = 1225 +MAX_BUILD_PROGRESS = 70 +PREBUILT = os.path.exists(os.path.join(BASEDIR, 'prebuilt')) + + +def build(spinner, dirty=False): + env = os.environ.copy() + env['SCONS_PROGRESS'] = "1" + env['SCONS_CACHE'] = "1" + nproc = os.cpu_count() + j_flag = "" if nproc is None else f"-j{nproc - 1}" + + for retry in [True, False]: + scons = subprocess.Popen(["scons", j_flag], cwd=BASEDIR, env=env, stderr=subprocess.PIPE) + + compile_output = [] + + # Read progress from stderr and update spinner + while scons.poll() is None: + try: + line = scons.stderr.readline() + if line is None: + continue + line = line.rstrip() + + prefix = b'progress: ' + if line.startswith(prefix): + i = int(line[len(prefix):]) + spinner.update_progress(MAX_BUILD_PROGRESS * min(1., i / TOTAL_SCONS_NODES), 100.) + elif len(line): + compile_output.append(line) + print(line.decode('utf8', 'replace')) + except Exception: + pass + + if scons.returncode != 0: + # Read remaining output + r = scons.stderr.read().split(b'\n') + compile_output += r + + if retry and (not dirty): + if not os.getenv("CI"): + print("scons build failed, cleaning in") + for i in range(3, -1, -1): + print("....%d" % i) + time.sleep(1) + subprocess.check_call(["scons", "-c"], cwd=BASEDIR, env=env) + shutil.rmtree("/tmp/scons_cache", ignore_errors=True) + shutil.rmtree("/data/scons_cache", ignore_errors=True) + else: + print("scons build failed after retry") + sys.exit(1) + else: + # Build failed log errors + errors = [line.decode('utf8', 'replace') for line in compile_output + if any([err in line for err in [b'error: ', b'not found, needed by target']])] + error_s = "\n".join(errors) + add_logentries_handler(cloudlog) + cloudlog.error("scons build failed\n" + error_s) + + # Show TextWindow + spinner.close() + error_s = "\n \n".join(["\n".join(textwrap.wrap(e, 65)) for e in errors]) + with TextWindow("openpilot failed to build\n \n" + error_s) as t: + t.wait_for_exit() + exit(1) + else: + break + + +if __name__ == "__main__" and not PREBUILT: + spinner = Spinner() + spinner.update_progress(0, 100) + build(spinner, dirty) diff --git a/selfdrive/manager/helpers.py b/selfdrive/manager/helpers.py new file mode 100644 index 00000000..fdda0deb --- /dev/null +++ b/selfdrive/manager/helpers.py @@ -0,0 +1,38 @@ +import os +import sys +import fcntl +import errno +import signal + + +def unblock_stdout(): + # get a non-blocking stdout + child_pid, child_pty = os.forkpty() + if child_pid != 0: # parent + + # child is in its own process group, manually pass kill signals + signal.signal(signal.SIGINT, lambda signum, frame: os.kill(child_pid, signal.SIGINT)) + signal.signal(signal.SIGTERM, lambda signum, frame: os.kill(child_pid, signal.SIGTERM)) + + fcntl.fcntl(sys.stdout, fcntl.F_SETFL, fcntl.fcntl(sys.stdout, fcntl.F_GETFL) | os.O_NONBLOCK) + + while True: + try: + dat = os.read(child_pty, 4096) + except OSError as e: + if e.errno == errno.EIO: + break + continue + + if not dat: + break + + try: + sys.stdout.write(dat.decode('utf8')) + except (OSError, IOError, UnicodeDecodeError): + pass + + # os.wait() returns a tuple with the pid and a 16 bit value + # whose low byte is the signal number and whose high byte is the exit satus + exit_status = os.wait()[1] >> 8 + os._exit(exit_status) diff --git a/selfdrive/manager/manager.py b/selfdrive/manager/manager.py new file mode 100755 index 00000000..688f79f9 --- /dev/null +++ b/selfdrive/manager/manager.py @@ -0,0 +1,222 @@ +#!/usr/bin/env python3 +import datetime +import os +import signal +import subprocess +import sys +import traceback + +import cereal.messaging as messaging +import selfdrive.crash as crash +from common.basedir import BASEDIR +from common.params import Params +from common.spinner import Spinner +from common.text_window import TextWindow +from selfdrive.hardware import EON, HARDWARE +from selfdrive.hardware.eon.apk import (pm_apply_packages, start_offroad, + update_apks) +from selfdrive.manager.build import MAX_BUILD_PROGRESS, PREBUILT +from selfdrive.manager.helpers import unblock_stdout +from selfdrive.manager.process import ensure_running +from selfdrive.manager.process_config import managed_processes +from selfdrive.registration import register +from selfdrive.swaglog import add_logentries_handler, cloudlog +from selfdrive.version import dirty, version + + +def manager_init(spinner=None): + params = Params() + params.manager_start() + + default_params = [ + ("CommunityFeaturesToggle", "0"), + ("CompletedTrainingVersion", "0"), + ("IsRHD", "0"), + ("IsMetric", "0"), + ("RecordFront", "0"), + ("HasAcceptedTerms", "0"), + ("HasCompletedSetup", "0"), + ("IsUploadRawEnabled", "1"), + ("IsLdwEnabled", "1"), + ("LastUpdateTime", datetime.datetime.utcnow().isoformat().encode('utf8')), + ("OpenpilotEnabledToggle", "1"), + ("VisionRadarToggle", "0"), + ("LaneChangeEnabled", "1"), + ("IsDriverViewEnabled", "0"), + ] + + # set unset params + for k, v in default_params: + if params.get(k) is None: + params.put(k, v) + + # is this dashcam? + if os.getenv("PASSIVE") is not None: + params.put("Passive", str(int(os.getenv("PASSIVE")))) + + if params.get("Passive") is None: + raise Exception("Passive must be set to continue") + + if EON: + update_apks() + + os.umask(0) # Make sure we can create files with 777 permissions + + # Create folders needed for msgq + try: + os.mkdir("/dev/shm") + except FileExistsError: + pass + except PermissionError: + print("WARNING: failed to make /dev/shm") + + # set dongle id + reg_res = register(spinner) + if reg_res: + dongle_id = reg_res + else: + raise Exception("server registration failed") + os.environ['DONGLE_ID'] = dongle_id # Needed for swaglog and loggerd + + if not dirty: + os.environ['CLEAN'] = '1' + + cloudlog.bind_global(dongle_id=dongle_id, version=version, dirty=dirty, + device=HARDWARE.get_device_type()) + crash.bind_user(id=dongle_id) + crash.bind_extra(version=version, dirty=dirty, device=HARDWARE.get_device_type()) + + # ensure shared libraries are readable by apks + if EON: + os.chmod(BASEDIR, 0o755) + os.chmod("/dev/shm", 0o777) + os.chmod(os.path.join(BASEDIR, "cereal"), 0o755) + os.chmod(os.path.join(BASEDIR, "cereal", "libmessaging_shared.so"), 0o755) + + +def manager_prepare(spinner=None): + # build all processes + os.chdir(os.path.dirname(os.path.abspath(__file__))) + + total = 100.0 - (0 if PREBUILT else MAX_BUILD_PROGRESS) + + for i, p in enumerate(managed_processes.values()): + perc = (100.0 - total) + total * (i + 1) / len(managed_processes) + + if spinner: + spinner.update_progress(perc, 100.) + p.prepare() + + +def manager_cleanup(): + if EON: + pm_apply_packages('disable') + + for p in managed_processes.values(): + p.stop() + + cloudlog.info("everything is dead") + + +def manager_thread(): + cloudlog.info("manager start") + cloudlog.info({"environ": os.environ}) + + # save boot log + subprocess.call("./bootlog", cwd=os.path.join(BASEDIR, "selfdrive/loggerd")) + + ignore = [] + if os.getenv("NOBOARD") is not None: + ignore.append("pandad") + if os.getenv("BLOCK") is not None: + ignore += os.getenv("BLOCK").split(",") + + # start offroad + if EON and "QT" not in os.environ: + pm_apply_packages('enable') + start_offroad() + + started_prev = False + params = Params() + sm = messaging.SubMaster(['deviceState']) + pm = messaging.PubMaster(['managerState']) + + while True: + sm.update() + not_run = ignore[:] + + if sm['deviceState'].freeSpacePercent < 5: + not_run.append("loggerd") + + started = sm['deviceState'].started + driverview = params.get("IsDriverViewEnabled") == b"1" + ensure_running(managed_processes.values(), started, driverview, not_run) + + # trigger an update after going offroad + if started_prev and not started: + os.sync() + managed_processes['updated'].signal(signal.SIGHUP) + + started_prev = started + + running_list = ["%s%s\u001b[0m" % ("\u001b[32m" if p.proc.is_alive() else "\u001b[31m", p.name) + for p in managed_processes.values() if p.proc] + cloudlog.debug(' '.join(running_list)) + + # send managerState + msg = messaging.new_message('managerState') + msg.managerState.processes = [p.get_process_state_msg() for p in managed_processes.values()] + pm.send('managerState', msg) + + # Exit main loop when uninstall is needed + if params.get("DoUninstall", encoding='utf8') == "1": + break + + +def main(spinner=None): + manager_init(spinner) + manager_prepare(spinner) + + if spinner: + spinner.close() + + if os.getenv("PREPAREONLY") is not None: + return + + # SystemExit on sigterm + signal.signal(signal.SIGTERM, lambda signum, frame: sys.exit(1)) + + try: + manager_thread() + except Exception: + traceback.print_exc() + crash.capture_exception() + finally: + manager_cleanup() + + if Params().params.get("DoUninstall", encoding='utf8') == "1": + cloudlog.warning("uninstalling") + HARDWARE.uninstall() + + +if __name__ == "__main__": + unblock_stdout() + spinner = Spinner() + + try: + main(spinner) + except Exception: + add_logentries_handler(cloudlog) + cloudlog.exception("Manager failed to start") + + # Show last 3 lines of traceback + error = traceback.format_exc(-3) + error = "Manager failed to start\n\n" + error + spinner.close() + with TextWindow(error) as t: + t.wait_for_exit() + + raise + + # manual exit because we are forked + sys.exit(0) diff --git a/selfdrive/manager/process.py b/selfdrive/manager/process.py new file mode 100644 index 00000000..5b3fcf7d --- /dev/null +++ b/selfdrive/manager/process.py @@ -0,0 +1,225 @@ +import importlib +import os +import signal +import time +import subprocess +from abc import ABC, abstractmethod +from multiprocessing import Process + +from setproctitle import setproctitle # pylint: disable=no-name-in-module + +import cereal.messaging as messaging +import selfdrive.crash as crash +from common.basedir import BASEDIR +from common.params import Params +from selfdrive.swaglog import cloudlog +from selfdrive.hardware import HARDWARE +from cereal import log + + +def launcher(proc): + try: + # import the process + mod = importlib.import_module(proc) + + # rename the process + setproctitle(proc) + + # create new context since we forked + messaging.context = messaging.Context() + + # exec the process + mod.main() + except KeyboardInterrupt: + cloudlog.warning("child %s got SIGINT" % proc) + except Exception: + # can't install the crash handler becuase sys.excepthook doesn't play nice + # with threads, so catch it here. + crash.capture_exception() + raise + + +def nativelauncher(pargs, cwd): + # exec the process + os.chdir(cwd) + os.execvp(pargs[0], pargs) + + +def join_process(process, timeout): + # Process().join(timeout) will hang due to a python 3 bug: https://bugs.python.org/issue28382 + # We have to poll the exitcode instead + t = time.monotonic() + while time.monotonic() - t < timeout and process.exitcode is None: + time.sleep(0.001) + + +class ManagerProcess(ABC): + unkillable = False + daemon = False + sigkill = False + proc = None + name = "" + + @abstractmethod + def prepare(self): + pass + + @abstractmethod + def start(self): + pass + + def stop(self, retry=True): + if self.proc is None: + return + + cloudlog.info(f"killing {self.name}") + + if self.proc.exitcode is None: + sig = signal.SIGKILL if self.sigkill else signal.SIGINT + self.signal(sig) + + join_process(self.proc, 5) + + # If process failed to die send SIGKILL or reboot + if self.proc.exitcode is None and retry: + if self.unkillable: + cloudlog.critical(f"unkillable process {self.name} failed to exit! rebooting in 15 if it doesn't die") + join_process(self.proc, 15) + + if self.proc.exitcode is None: + cloudlog.critical(f"unkillable process {self.name} failed to die!") + os.system("date >> /data/unkillable_reboot") + os.sync() + HARDWARE.reboot() + raise RuntimeError + else: + cloudlog.info(f"killing {self.name} with SIGKILL") + self.signal(signal.SIGKILL) + self.proc.join() + + ret = self.proc.exitcode + cloudlog.info(f"{self.name} is dead with {ret}") + + if self.proc.exitcode is not None: + self.proc = None + + return ret + + def signal(self, sig): + if self.proc.exitcode is not None and self.proc.pid is not None: + return + + cloudlog.info(f"sending signal {sig} to {self.name}") + os.kill(self.proc.pid, sig) + + def get_process_state_msg(self): + state = log.ManagerState.ProcessState.new_message() + state.name = self.name + if self.proc: + state.running = self.proc.is_alive() + state.pid = self.proc.pid or 0 + state.exitCode = self.proc.exitcode or 0 + return state + + +class NativeProcess(ManagerProcess): + def __init__(self, name, cwd, cmdline, persistent=False, driverview=False, unkillable=False, sigkill=False): + self.name = name + self.cwd = cwd + self.cmdline = cmdline + self.persistent = persistent + self.driverview = driverview + self.unkillable = unkillable + self.sigkill = sigkill + + def prepare(self): + pass + + def start(self): + if self.proc is not None: + return + + cwd = os.path.join(BASEDIR, self.cwd) + cloudlog.info("starting process %s" % self.name) + self.proc = Process(name=self.name, target=nativelauncher, args=(self.cmdline, cwd)) + self.proc.start() + + +class PythonProcess(ManagerProcess): + def __init__(self, name, module, persistent=False, driverview=False, unkillable=False, sigkill=False): + self.name = name + self.module = module + self.persistent = persistent + self.driverview = driverview + self.unkillable = unkillable + self.sigkill = sigkill + + def prepare(self): + cloudlog.info("preimporting %s" % self.module) + importlib.import_module(self.module) + + def start(self): + if self.proc is not None: + return + + cloudlog.info("starting python %s" % self.module) + self.proc = Process(name=self.name, target=launcher, args=(self.module,)) + self.proc.start() + + +class DaemonProcess(ManagerProcess): + """Python process that has to stay running accross manager restart. + This is used for athena so you don't lose SSH access when restarting manager.""" + def __init__(self, name, module, param_name): + self.name = name + self.module = module + self.param_name = param_name + self.persistent = True + + def prepare(self): + pass + + def start(self): + params = Params() + pid = params.get(self.param_name, encoding='utf-8') + + if pid is not None: + try: + os.kill(int(pid), 0) + with open(f'/proc/{pid}/cmdline') as f: + if self.module in f.read(): + # daemon is running + return + except (OSError, FileNotFoundError): + # process is dead + pass + + cloudlog.info("starting daemon %s" % self.name) + proc = subprocess.Popen(['python', '-m', self.module], # pylint: disable=subprocess-popen-preexec-fn + stdin=open('/dev/null', 'r'), + stdout=open('/dev/null', 'w'), + stderr=open('/dev/null', 'w'), + preexec_fn=os.setpgrp) + + params.put(self.param_name, str(proc.pid)) + + def stop(self, retry=True): + pass + + +def ensure_running(procs, started, driverview=False, not_run=None): + if not_run is None: + not_run = [] + + # TODO: can we do this in parallel? + for p in procs: + if p.name in not_run: + p.stop() + elif p.persistent: + p.start() + elif p.driverview and driverview: + p.start() + elif started: + p.start() + else: + p.stop() diff --git a/selfdrive/manager/process_config.py b/selfdrive/manager/process_config.py new file mode 100644 index 00000000..be0807dd --- /dev/null +++ b/selfdrive/manager/process_config.py @@ -0,0 +1,48 @@ +from selfdrive.manager.process import PythonProcess, NativeProcess, DaemonProcess +from selfdrive.hardware import EON, TICI, PC + +procs = [ + DaemonProcess("manage_athenad", "selfdrive.athena.manage_athenad", "AthenadPid"), + # due to qualcomm kernel bugs SIGKILLing camerad sometimes causes page table corruption + NativeProcess("camerad", "selfdrive/camerad", ["./camerad"], unkillable=True, driverview=True), + NativeProcess("clocksd", "selfdrive/clocksd", ["./clocksd"]), + NativeProcess("dmonitoringmodeld", "selfdrive/modeld", ["./dmonitoringmodeld"], driverview=True), + NativeProcess("logcatd", "selfdrive/logcatd", ["./logcatd"]), + NativeProcess("loggerd", "selfdrive/loggerd", ["./loggerd"]), + NativeProcess("modeld", "selfdrive/modeld", ["./modeld"]), + NativeProcess("proclogd", "selfdrive/proclogd", ["./proclogd"]), + NativeProcess("ubloxd", "selfdrive/locationd", ["./ubloxd"]), + NativeProcess("ui", "selfdrive/ui", ["./ui"], persistent=True), + PythonProcess("calibrationd", "selfdrive.locationd.calibrationd"), + PythonProcess("controlsd", "selfdrive.controls.controlsd"), + PythonProcess("deleter", "selfdrive.loggerd.deleter", persistent=True), + PythonProcess("dmonitoringd", "selfdrive.monitoring.dmonitoringd", driverview=True), + PythonProcess("locationd", "selfdrive.locationd.locationd"), + PythonProcess("logmessaged", "selfdrive.logmessaged", persistent=True), + PythonProcess("pandad", "selfdrive.pandad", persistent=True), + PythonProcess("paramsd", "selfdrive.locationd.paramsd"), + PythonProcess("plannerd", "selfdrive.controls.plannerd"), + PythonProcess("radard", "selfdrive.controls.radard"), + PythonProcess("thermald", "selfdrive.thermald.thermald", persistent=True), + PythonProcess("uploader", "selfdrive.loggerd.uploader", persistent=True), +] + +if not PC: + procs += [ + NativeProcess("sensord", "selfdrive/sensord", ["./sensord"], persistent=EON, sigkill=EON), + PythonProcess("tombstoned", "selfdrive.tombstoned", persistent=True), + PythonProcess("updated", "selfdrive.updated", persistent=True), + ] + +if TICI: + procs += [ + PythonProcess("timezoned", "selfdrive.timezoned", persistent=True), + ] + +if EON: + procs += [ + PythonProcess("rtshield", "selfdrive.rtshield"), + ] + + +managed_processes = {p.name: p for p in procs} diff --git a/selfdrive/manager/test/__init__.py b/selfdrive/manager/test/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/selfdrive/test/test_manager.py b/selfdrive/manager/test/test_manager.py similarity index 72% rename from selfdrive/test/test_manager.py rename to selfdrive/manager/test/test_manager.py index 9a666e96..71154277 100644 --- a/selfdrive/test/test_manager.py +++ b/selfdrive/manager/test/test_manager.py @@ -4,22 +4,24 @@ import signal import time import unittest -os.environ['FAKEUPLOAD'] = "1" - -import selfdrive.manager as manager +import selfdrive.manager.manager as manager from selfdrive.hardware import EON +from selfdrive.manager.process import DaemonProcess +from selfdrive.manager.process_config import managed_processes + +os.environ['FAKEUPLOAD'] = "1" # TODO: make eon fast MAX_STARTUP_TIME = 30 if EON else 15 -ALL_PROCESSES = manager.persistent_processes + manager.car_started_processes +ALL_PROCESSES = [p.name for p in managed_processes.values() if type(p) is not DaemonProcess] + class TestManager(unittest.TestCase): - def setUp(self): os.environ['PASSIVE'] = '0' def tearDown(self): - manager.cleanup_all_processes(None, None) + manager.manager_cleanup() def test_manager_prepare(self): os.environ['PREPAREONLY'] = '1' @@ -36,24 +38,27 @@ class TestManager(unittest.TestCase): # ensure all processes exit cleanly def test_clean_exit(self): manager.manager_prepare() + for p in ALL_PROCESSES: - manager.start_managed_process(p) + managed_processes[p].start() time.sleep(10) for p in reversed(ALL_PROCESSES): - exit_code = manager.kill_managed_process(p, retry=False) + exit_code = managed_processes[p].stop(retry=False) if (not EON and p == 'ui') or (EON and p == 'logcatd'): # TODO: make Qt UI exit gracefully and fix OMX encoder exiting continue + # Make sure the process is actually dead + managed_processes[p].stop() + # TODO: interrupted blocking read exits with 1 in cereal. use a more unique return code exit_codes = [0, 1] - if p in manager.kill_processes: + if managed_processes[p].sigkill: exit_codes = [-signal.SIGKILL] assert exit_code in exit_codes, f"{p} died with {exit_code}" - if __name__ == "__main__": unittest.main() diff --git a/selfdrive/modeld/test/timing/benchmark.py b/selfdrive/modeld/test/timing/benchmark.py index 88ab0e76..2ea56d97 100755 --- a/selfdrive/modeld/test/timing/benchmark.py +++ b/selfdrive/modeld/test/timing/benchmark.py @@ -2,8 +2,9 @@ import os import time import numpy as np + import cereal.messaging as messaging -import selfdrive.manager as manager +from selfdrive.manager.process_config import managed_processes N = int(os.getenv("N", "5")) @@ -16,7 +17,7 @@ if __name__ == "__main__": for _ in range(N): os.environ['LOGPRINT'] = 'debug' - manager.start_managed_process('modeld') + managed_processes['modeld'].start() time.sleep(5) t = [] @@ -27,7 +28,7 @@ if __name__ == "__main__": t.append(m.modelV2.modelExecutionTime) execution_times.append(np.array(t[10:]) * 1000) - manager.kill_managed_process('modeld') + managed_processes['modeld'].stop() print("\n\n") print(f"ran modeld {N} times for {TIME}s each") diff --git a/selfdrive/test/helpers.py b/selfdrive/test/helpers.py index de8b15b3..025907c4 100644 --- a/selfdrive/test/helpers.py +++ b/selfdrive/test/helpers.py @@ -6,7 +6,8 @@ from nose.tools import nottest from selfdrive.hardware.eon.apk import update_apks, start_offroad, pm_apply_packages, android_packages from selfdrive.hardware import PC from selfdrive.version import training_version, terms_version -from selfdrive.manager import start_managed_process, kill_managed_process, get_running +from selfdrive.manager.process_config import managed_processes + def set_params_enabled(): from common.params import Params @@ -18,36 +19,38 @@ def set_params_enabled(): params.put("Passive", "0") params.put("CompletedTrainingVersion", training_version) + def phone_only(x): if PC: return nottest(x) else: return x + def with_processes(processes, init_time=0): def wrapper(func): @wraps(func) def wrap(*args, **kwargs): # start and assert started for n, p in enumerate(processes): - start_managed_process(p) - if n < len(processes)-1: + managed_processes[p].start() + if n < len(processes) - 1: time.sleep(init_time) - assert all(get_running()[name].exitcode is None for name in processes) + assert all(managed_processes[name].proc.exitcode is None for name in processes) # call the function try: func(*args, **kwargs) # assert processes are still started - assert all(get_running()[name].exitcode is None for name in processes) + assert all(managed_processes[name].proc.exitcode is None for name in processes) finally: - # kill and assert all stopped for p in processes: - kill_managed_process(p) - assert len(get_running()) == 0 + managed_processes[p].stop() + return wrap return wrapper + def with_apks(): def wrapper(func): @wraps(func) diff --git a/selfdrive/test/longitudinal_maneuvers/test_longitudinal.py b/selfdrive/test/longitudinal_maneuvers/test_longitudinal.py index c687ff7a..010aa820 100755 --- a/selfdrive/test/longitudinal_maneuvers/test_longitudinal.py +++ b/selfdrive/test/longitudinal_maneuvers/test_longitudinal.py @@ -9,7 +9,7 @@ matplotlib.use('svg') from selfdrive.config import Conversions as CV from selfdrive.car.honda.values import CruiseButtons as CB from selfdrive.test.longitudinal_maneuvers.maneuver import Maneuver -import selfdrive.manager as manager +from selfdrive.manager.process_config import managed_processes from common.file_helpers import mkdirs_exists_ok from common.params import Params @@ -344,16 +344,16 @@ def run_maneuver_worker(k): valid = False for _ in range(3): - manager.start_managed_process('radard') - manager.start_managed_process('controlsd') - manager.start_managed_process('plannerd') + managed_processes['radard'].start() + managed_processes['controlsd'].start() + managed_processes['plannerd'].start() plot, valid = man.evaluate() plot.write_plot(output_dir, "maneuver" + str(k + 1).zfill(2)) - manager.kill_managed_process('radard') - manager.kill_managed_process('controlsd') - manager.kill_managed_process('plannerd') + managed_processes['radard'].stop() + managed_processes['controlsd'].stop() + managed_processes['plannerd'].stop() if valid: break diff --git a/selfdrive/test/process_replay/camera_replay.py b/selfdrive/test/process_replay/camera_replay.py index 4871ae30..05f79cc9 100755 --- a/selfdrive/test/process_replay/camera_replay.py +++ b/selfdrive/test/process_replay/camera_replay.py @@ -3,32 +3,32 @@ import os import sys import time from typing import Any + from tqdm import tqdm -os.environ['QCOM_REPLAY'] = '1' - +import cereal.messaging as messaging +from cereal import log from common.spinner import Spinner from common.timeout import Timeout -import selfdrive.manager as manager - -from cereal import log -import cereal.messaging as messaging -from tools.lib.framereader import FrameReader -from tools.lib.logreader import LogReader +from common.transformations.camera import get_view_frame_from_road_frame +from selfdrive.manager.process_config import managed_processes from selfdrive.test.openpilotci import BASE_URL, get_url from selfdrive.test.process_replay.compare_logs import compare_logs, save_log from selfdrive.test.process_replay.test_processes import format_diff from selfdrive.version import get_git_commit -from common.transformations.camera import get_view_frame_from_road_frame +from tools.lib.framereader import FrameReader +from tools.lib.logreader import LogReader TEST_ROUTE = "99c94dc769b5d96e|2019-08-03--14-19-59" + def replace_calib(msg, calib): msg = msg.as_builder() if calib is not None: msg.liveCalibration.extrinsicMatrix = get_view_frame_from_road_frame(*calib, 1.22).flatten().tolist() return msg + def camera_replay(lr, fr, desire=None, calib=None): spinner = Spinner() @@ -39,12 +39,12 @@ def camera_replay(lr, fr, desire=None, calib=None): # TODO: add dmonitoringmodeld print("preparing procs") - manager.prepare_managed_process("camerad") - manager.prepare_managed_process("modeld") + managed_processes['camerad'].prepare() + managed_processes['modeld'].prepare() try: print("starting procs") - manager.start_managed_process("camerad") - manager.start_managed_process("modeld") + managed_processes['camerad'].start() + managed_processes['modeld'].start() time.sleep(5) sm.update(1000) print("procs started") @@ -85,9 +85,9 @@ def camera_replay(lr, fr, desire=None, calib=None): print("replay done") spinner.close() - manager.kill_managed_process('modeld') + managed_processes['modeld'].stop() time.sleep(2) - manager.kill_managed_process('camerad') + managed_processes['camerad'].stop() return log_msgs if __name__ == "__main__": diff --git a/selfdrive/test/process_replay/inject_model.py b/selfdrive/test/process_replay/inject_model.py index b5ccf264..a7d12cfb 100755 --- a/selfdrive/test/process_replay/inject_model.py +++ b/selfdrive/test/process_replay/inject_model.py @@ -4,7 +4,7 @@ import time from tqdm import tqdm -import selfdrive.manager as manager +from selfdrive.manager.process_config import managed_processes from cereal.messaging import PubMaster, recv_one, sub_sock from tools.lib.framereader import FrameReader @@ -47,8 +47,8 @@ def inject_model(msgs, segment_name): segment_name = rreplace(segment_name, '--', '/', 1) frame_reader = FrameReader('cd:/'+segment_name.replace("|", "/") + "/fcamera.hevc") - manager.start_managed_process('camerad') - manager.start_managed_process('modeld') + managed_processes['camerad'].start() + managed_processes['modeld'].start() # TODO do better than just wait for modeld to boot time.sleep(5) @@ -57,13 +57,13 @@ def inject_model(msgs, segment_name): try: out_msgs = regen_model(msgs, pm, frame_reader, model_sock) except (KeyboardInterrupt, SystemExit, Exception) as e: - manager.kill_managed_process('modeld') + managed_processes['modeld'].stop() time.sleep(2) - manager.kill_managed_process('camerad') + managed_processes['camerad'].stop() raise e - manager.kill_managed_process('modeld') + managed_processes['modeld'].stop() time.sleep(2) - manager.kill_managed_process('camerad') + managed_processes['camerad'].stop() new_msgs = [] midx = 0 diff --git a/selfdrive/test/process_replay/process_replay.py b/selfdrive/test/process_replay/process_replay.py index f12fbcdf..adecc983 100755 --- a/selfdrive/test/process_replay/process_replay.py +++ b/selfdrive/test/process_replay/process_replay.py @@ -1,30 +1,29 @@ #!/usr/bin/env python3 -import capnp +import importlib import os import sys import threading -import importlib import time - -if "CI" in os.environ: - def tqdm(x): - return x -else: - from tqdm import tqdm # type: ignore - -from cereal import car, log -from selfdrive.car.car_helpers import get_car -import selfdrive.manager as manager -import cereal.messaging as messaging -from common.params import Params -from cereal.services import service_list from collections import namedtuple -from selfdrive.manager import managed_processes + +import capnp +from tqdm import tqdm + +import cereal.messaging as messaging +from cereal import car, log +from cereal.services import service_list +from common.params import Params +from selfdrive.car.car_helpers import get_car +from selfdrive.manager.process import PythonProcess +from selfdrive.manager.process_config import managed_processes + # Numpy gives different results based on CPU features after version 19 NUMPY_TOLERANCE = 1e-7 +CI = "CI" in os.environ ProcessConfig = namedtuple('ProcessConfig', ['proc_name', 'pub_sub', 'ignore', 'init_callback', 'should_recv_callback', 'tolerance']) + def wait_for_event(evt): if not evt.wait(15): if threading.currentThread().getName() == "MainThread": @@ -148,6 +147,7 @@ class FakePubMaster(messaging.PubMaster): self.get_called.set() return dat + def fingerprint(msgs, fsm, can_sock): print("start fingerprinting") fsm.wait_on_getitem = True @@ -171,6 +171,7 @@ def fingerprint(msgs, fsm, can_sock): fsm.update_ready.set() print("finished fingerprinting") + def get_car_params(msgs, fsm, can_sock): can = FakeSocket(wait=False) sendcan = FakeSocket(wait=False) @@ -181,6 +182,7 @@ def get_car_params(msgs, fsm, can_sock): _, CP = get_car(can, sendcan) Params().put("CarParams", CP.to_bytes()) + def radar_rcv_callback(msg, CP, cfg, fsm): if msg.which() != "can": return [], False @@ -198,6 +200,7 @@ def radar_rcv_callback(msg, CP, cfg, fsm): return ["radarState", "liveTracks"], True return [], False + def calibration_rcv_callback(msg, CP, cfg, fsm): # calibrationd publishes 1 calibrationData every 5 cameraOdometry packets. # should_recv always true to increment frame @@ -207,6 +210,7 @@ def calibration_rcv_callback(msg, CP, cfg, fsm): recv_socks = ["liveCalibration"] return recv_socks, fsm.frame == 0 or msg.which() == 'cameraOdometry' + def ublox_rcv_callback(msg): msg_class, msg_id = msg.ubloxRaw[2:4] if (msg_class, msg_id) in {(1, 7 * 16)}: @@ -216,6 +220,7 @@ def ublox_rcv_callback(msg): else: return [] + CONFIGS = [ ProcessConfig( proc_name="controlsd", @@ -233,7 +238,7 @@ CONFIGS = [ proc_name="radard", pub_sub={ "can": ["radarState", "liveTracks"], - "liveParameters": [], "carState": [], "modelV2": [], + "liveParameters": [], "carState": [], "modelV2": [], }, ignore=["logMonoTime", "valid", "radarState.cumLagMs"], init_callback=get_car_params, @@ -307,9 +312,10 @@ CONFIGS = [ ), ] + def replay_process(cfg, lr): proc = managed_processes[cfg.proc_name] - if isinstance(proc, str): + if isinstance(proc, PythonProcess): return python_replay_process(cfg, lr) else: return cpp_replay_process(cfg, lr) @@ -343,8 +349,10 @@ def python_replay_process(cfg, lr): if msg.which() == 'carParams': os.environ['FINGERPRINT'] = msg.carParams.carFingerprint - manager.prepare_managed_process(cfg.proc_name) - mod = importlib.import_module(manager.managed_processes[cfg.proc_name]) + assert(type(managed_processes[cfg.proc_name]) is PythonProcess) + managed_processes[cfg.proc_name].prepare() + mod = importlib.import_module(managed_processes[cfg.proc_name].module) + thread = threading.Thread(target=mod.main, args=args) thread.daemon = True thread.start() @@ -363,7 +371,7 @@ def python_replay_process(cfg, lr): fsm.wait_for_update() log_msgs, msg_queue = [], [] - for msg in tqdm(pub_msgs): + for msg in tqdm(pub_msgs, disable=CI): if cfg.should_recv_callback is not None: recv_socks, should_recv = cfg.should_recv_callback(msg, CP, cfg, fsm) else: @@ -388,16 +396,17 @@ def python_replay_process(cfg, lr): recv_cnt -= m.which() in recv_socks return log_msgs + def cpp_replay_process(cfg, lr): sub_sockets = [s for _, sub in cfg.pub_sub.items() for s in sub] # We get responses here pm = messaging.PubMaster(cfg.pub_sub.keys()) - sockets = {s : messaging.sub_sock(s, timeout=1000) for s in sub_sockets} + sockets = {s: messaging.sub_sock(s, timeout=1000) for s in sub_sockets} all_msgs = sorted(lr, key=lambda msg: msg.logMonoTime) pub_msgs = [msg for msg in all_msgs if msg.which() in list(cfg.pub_sub.keys())] - manager.prepare_managed_process(cfg.proc_name) - manager.start_managed_process(cfg.proc_name) + managed_processes[cfg.proc_name].prepare() + managed_processes[cfg.proc_name].start() time.sleep(1) # We give the process time to start @@ -405,7 +414,7 @@ def cpp_replay_process(cfg, lr): for s in sub_sockets: messaging.recv_one_or_none(sockets[s]) - for msg in tqdm(pub_msgs): + for msg in tqdm(pub_msgs, disable=CI): pm.send(msg.which(), msg.as_builder()) resp_sockets = sub_sockets if cfg.should_recv_callback is None else cfg.should_recv_callback(msg) for s in resp_sockets: @@ -413,5 +422,5 @@ def cpp_replay_process(cfg, lr): if response is not None: log_msgs.append(response) - manager.kill_managed_process(cfg.proc_name) + managed_processes[cfg.proc_name].stop() return log_msgs diff --git a/selfdrive/test/test_car_models.py b/selfdrive/test/test_car_models.py index 3d974e33..cf87d834 100755 --- a/selfdrive/test/test_car_models.py +++ b/selfdrive/test/test_car_models.py @@ -9,7 +9,7 @@ from typing import List, cast import requests import cereal.messaging as messaging -import selfdrive.manager as manager +from selfdrive.manager.process_config import managed_processes from cereal import car from common.basedir import BASEDIR from common.params import Params @@ -547,7 +547,7 @@ non_tested_cars = [ if __name__ == "__main__": tested_procs = ["controlsd", "radard", "plannerd"] - tested_socks = ["radarState", "controlsState", "carState", "plan"] + tested_socks = ["radarState", "controlsState", "carState", "longitudinalPlan"] tested_cars = [keys["carFingerprint"] for route, keys in routes.items()] for car_model in all_known_cars(): @@ -561,7 +561,7 @@ if __name__ == "__main__": print("Preparing processes") for p in tested_procs: - manager.prepare_managed_process(p) + managed_processes[p].prepare() results = {} for route, checks in routes.items(): @@ -583,7 +583,7 @@ if __name__ == "__main__": print("testing ", route, " ", checks['carFingerprint']) print("Starting processes") for p in tested_procs: - manager.start_managed_process(p) + managed_processes[p].start() # Start unlogger print("Start unlogger") @@ -603,11 +603,11 @@ if __name__ == "__main__": failures = [s for s in tested_socks + extra_socks if s not in recvd_socks] print("Check if everything is running") - running = manager.get_running() for p in tested_procs: - if not running[p].is_alive: + proc = managed_processes[p].proc + if not proc or not proc.is_alive: failures.append(p) - manager.kill_managed_process(p) + managed_processes[p].stop() os.killpg(os.getpgid(unlogger.pid), signal.SIGTERM) sockets_ok = len(failures) == 0 diff --git a/selfdrive/test/test_onroad.py b/selfdrive/test/test_onroad.py index 2be906b8..b022b8fa 100755 --- a/selfdrive/test/test_onroad.py +++ b/selfdrive/test/test_onroad.py @@ -10,7 +10,6 @@ from cereal.services import service_list from common.basedir import BASEDIR from common.timeout import Timeout from selfdrive.loggerd.config import ROOT -import selfdrive.manager as manager from selfdrive.test.helpers import set_params_enabled from tools.lib.logreader import LogReader @@ -38,11 +37,11 @@ PROCS = [ ("./logcatd", 0), ] -# ***** test helpers ***** def cputime_total(ct): return ct.cpuUser + ct.cpuSystem + ct.cpuChildrenUser + ct.cpuChildrenSystem + def check_cpu_usage(first_proc, last_proc): result = "------------------------------------------------\n" result += "------------------ CPU Usage -------------------\n" @@ -84,9 +83,7 @@ class TestOnroad(unittest.TestCase): # start manager and run openpilot for a minute try: - manager.build() - manager.manager_prepare() - manager_path = os.path.join(BASEDIR, "selfdrive/manager.py") + manager_path = os.path.join(BASEDIR, "selfdrive/manager/manager.py") proc = subprocess.Popen(["python", manager_path]) sm = messaging.SubMaster(['carState']) @@ -116,5 +113,6 @@ class TestOnroad(unittest.TestCase): cpu_ok = check_cpu_usage(proclogs[0], proclogs[-1]) self.assertTrue(cpu_ok) + if __name__ == "__main__": unittest.main() diff --git a/tools/sim/launch_openpilot.sh b/tools/sim/launch_openpilot.sh index 90706344..f433ae26 100755 --- a/tools/sim/launch_openpilot.sh +++ b/tools/sim/launch_openpilot.sh @@ -5,4 +5,4 @@ export NOBOARD="1" export SIMULATION="1" DIR="$( cd "$( dirname "${BASH_SOURCE[0]}" )" >/dev/null && pwd )" -cd ../../selfdrive && ./manager.py +cd ../../selfdrive/manager && ./manager.py diff --git a/tools/webcam/README.md b/tools/webcam/README.md index 6d559b3c..4b3b46ea 100644 --- a/tools/webcam/README.md +++ b/tools/webcam/README.md @@ -36,7 +36,7 @@ USE_WEBCAM=1 scons -j$(nproc) ## GO ``` -cd ~/openpilot/selfdrive +cd ~/openpilot/selfdrive/manager PASSIVE=0 NOSENSOR=1 USE_WEBCAM=1 ./manager.py ``` - Start the car, then the UI should show the road webcam's view