diff --git a/.circleci/config.yml b/.circleci/config.yml index a4ae26e..52e4560 100644 --- a/.circleci/config.yml +++ b/.circleci/config.yml @@ -11,7 +11,7 @@ jobs: - run: name: Run safety test command: | - docker run panda_safety /bin/bash -c "cd /panda/tests/safety; PYTHONPATH=/ ./test.sh" + docker run panda_safety /bin/bash -c "cd /openpilot/panda/tests/safety; PYTHONPATH=/openpilot ./test.sh" misra-c2012: machine: diff --git a/board/safety/safety_toyota.h b/board/safety/safety_toyota.h index 9c47a54..35e1aee 100644 --- a/board/safety/safety_toyota.h +++ b/board/safety/safety_toyota.h @@ -248,6 +248,7 @@ static int toyota_tx_hook(CAN_FIFOMailBox_TypeDef *to_send) { static void toyota_init(int16_t param) { controls_allowed = 0; relay_malfunction_reset(); + gas_interceptor_detected = 0; toyota_dbc_eps_torque_factor = param; } diff --git a/tests/safety/Dockerfile b/tests/safety/Dockerfile index 3c70d25..ec301a4 100644 --- a/tests/safety/Dockerfile +++ b/tests/safety/Dockerfile @@ -1,6 +1,20 @@ FROM ubuntu:16.04 -RUN apt-get update && apt-get install -y clang make python python-pip git curl locales zlib1g-dev libffi-dev bzip2 libssl-dev libbz2-dev libusb-1.0-0 +RUN apt-get update && apt-get install -y \ + autoconf \ + clang \ + curl \ + git \ + libtool \ + libssl-dev \ + libusb-1.0-0 \ + libzmq3-dev \ + locales \ + make \ + python \ + python-pip \ + wget \ + zlib1g-dev RUN sed -i -e 's/# en_US.UTF-8 UTF-8/en_US.UTF-8 UTF-8/' /etc/locale.gen && locale-gen ENV LANG en_US.UTF-8 @@ -17,4 +31,20 @@ RUN pyenv rehash COPY requirements.txt /tmp/ RUN pip install -r /tmp/requirements.txt -COPY . /panda +WORKDIR /openpilot +RUN git clone https://github.com/commaai/opendbc.git || true +WORKDIR /openpilot/opendbc +RUN git checkout 36c471e59eaac3760a00125d557ef19af091f289 +WORKDIR /openpilot +RUN git clone https://github.com/commaai/cereal.git +WORKDIR /openpilot/cereal +RUN git checkout e370f79522ff7fc0b16f33f4fef420be48061206 +RUN /openpilot/cereal/install_capnp.sh + +RUN pip install -r /openpilot/opendbc/requirements.txt + +WORKDIR /openpilot +RUN cp /openpilot/opendbc/SConstruct /openpilot +COPY . /openpilot/panda + +RUN scons -c && scons -j$(nproc) diff --git a/tests/safety/common.py b/tests/safety/common.py index 8b57808..f0900bc 100644 --- a/tests/safety/common.py +++ b/tests/safety/common.py @@ -1,3 +1,7 @@ +import abc +import struct +import unittest +from opendbc.can.packer import CANPacker # pylint: disable=import-error from panda.tests.safety import libpandasafety_py MAX_WRONG_COUNTERS = 5 @@ -8,17 +12,167 @@ class UNSAFE_MODE: DISABLE_STOCK_AEB = 2 RAISE_LONGITUDINAL_LIMITS_TO_ISO_MAX = 8 -def make_msg(bus, addr, length=8): - to_send = libpandasafety_py.ffi.new('CAN_FIFOMailBox_TypeDef *') - if addr >= 0x800: - to_send[0].RIR = (addr << 3) | 5 +def twos_comp(val, bits): + if val >= 0: + return val else: - to_send[0].RIR = (addr << 21) | 1 - to_send[0].RDTR = length - to_send[0].RDTR |= bus << 4 + return (2**bits) + val +def package_can_msg(msg): + addr, _, dat, bus = msg + rdlr, rdhr = struct.unpack('II', dat.ljust(8, b'\x00')) + + ret = libpandasafety_py.ffi.new('CAN_FIFOMailBox_TypeDef *') + if addr >= 0x800: + ret[0].RIR = (addr << 3) | 5 + else: + ret[0].RIR = (addr << 21) | 1 + ret[0].RDTR = len(dat) | ((bus & 0xF) << 4) + ret[0].RDHR = rdhr + ret[0].RDLR = rdlr + + return ret + +def make_msg(bus, addr, length=8): + return package_can_msg([addr, 0, b'\x00'*length, bus]) + +def interceptor_msg(gas, addr): + to_send = make_msg(0, addr, 6) + gas2 = gas * 2 + to_send[0].RDLR = ((gas & 0xff) << 8) | ((gas & 0xff00) >> 8) | \ + ((gas2 & 0xff) << 24) | ((gas2 & 0xff00) << 8) return to_send +class CANPackerPanda(CANPacker): + def make_can_msg_panda(self, name_or_addr, bus, values, counter=-1): + msg = self.make_can_msg(name_or_addr, bus, values, counter=-1) + return package_can_msg(msg) + +class PandaSafetyTest(unittest.TestCase): + TX_MSGS = None + STANDSTILL_THRESHOLD = None + RELAY_MALFUNCTION_ADDR = None + RELAY_MALFUNCTION_BUS = None + FWD_BLACKLISTED_ADDRS = {} # {bus: [addr]} + FWD_BUS_LOOKUP = {} + + @classmethod + def setUpClass(cls): + if cls.__name__ == "PandaSafetyTest": + cls.safety = None + raise unittest.SkipTest + + def _rx(self, msg): + return self.safety.safety_rx_hook(msg) + + def _tx(self, msg): + return self.safety.safety_tx_hook(msg) + + @abc.abstractmethod + def _brake_msg(self, brake): + pass + + @abc.abstractmethod + def _speed_msg(self, speed): + pass + + @abc.abstractmethod + def _gas_msg(self, speed): + pass + + # ***** standard tests for all safety modes ***** + + def test_relay_malfunction(self): + # each car has an addr that is used to detect relay malfunction + # if that addr is seen on specified bus, triggers the relay malfunction + # protection logic: both tx_hook and fwd_hook are expected to return failure + self.assertFalse(self.safety.get_relay_malfunction()) + self._rx(make_msg(self.RELAY_MALFUNCTION_BUS, self.RELAY_MALFUNCTION_ADDR, 8)) + self.assertTrue(self.safety.get_relay_malfunction()) + for a in range(1, 0x800): + for b in range(0, 3): + self.assertFalse(self._tx(make_msg(b, a, 8))) + self.assertEqual(-1, self.safety.safety_fwd_hook(b, make_msg(b, a, 8))) + + def test_fwd_hook(self): + # some safety modes don't forward anything, while others blacklist msgs + for bus in range(0x0, 0x3): + for addr in range(0x1, 0x800): + # assume len 8 + msg = make_msg(bus, addr, 8) + fwd_bus = self.FWD_BUS_LOOKUP.get(bus, -1) + if bus in self.FWD_BLACKLISTED_ADDRS and addr in self.FWD_BLACKLISTED_ADDRS[bus]: + fwd_bus = -1 + self.assertEqual(fwd_bus, self.safety.safety_fwd_hook(bus, msg)) + + def test_spam_can_buses(self): + for addr in range(1, 0x800): + for bus in range(0, 4): + if all(addr != m[0] or bus != m[1] for m in self.TX_MSGS): + self.assertFalse(self._tx(make_msg(bus, addr, 8))) + + def test_default_controls_not_allowed(self): + self.assertFalse(self.safety.get_controls_allowed()) + + def test_manually_enable_controls_allowed(self): + self.safety.set_controls_allowed(1) + self.assertTrue(self.safety.get_controls_allowed()) + self.safety.set_controls_allowed(0) + self.assertFalse(self.safety.get_controls_allowed()) + + def test_prev_gas(self): + for pressed in [True, False]: + self._rx(self._gas_msg(pressed)) + self.assertEqual(pressed, self.safety.get_gas_pressed_prev()) + + def test_allow_engage_with_gas_pressed(self): + self._rx(self._gas_msg(1)) + self.safety.set_controls_allowed(True) + self._rx(self._gas_msg(1)) + self.assertTrue(self.safety.get_controls_allowed()) + self._rx(self._gas_msg(1)) + self.assertTrue(self.safety.get_controls_allowed()) + + def test_disengage_on_gas(self): + self._rx(self._gas_msg(0)) + self.safety.set_controls_allowed(True) + self._rx(self._gas_msg(1)) + self.assertFalse(self.safety.get_controls_allowed()) + + def test_unsafe_mode_no_disengage_on_gas(self): + self._rx(self._gas_msg(0)) + self.safety.set_controls_allowed(True) + self.safety.set_unsafe_mode(UNSAFE_MODE.DISABLE_DISENGAGE_ON_GAS) + self._rx(self._gas_msg(1)) + self.assertTrue(self.safety.get_controls_allowed()) + + def test_allow_brake_at_zero_speed(self): + # Brake was already pressed + self._rx(self._speed_msg(0)) + self._rx(self._brake_msg(1)) + self.safety.set_controls_allowed(1) + self._rx(self._brake_msg(1)) + self.assertTrue(self.safety.get_controls_allowed()) + self._rx(self._brake_msg(0)) + self.assertTrue(self.safety.get_controls_allowed()) + # rising edge of brake should disengage + self._rx(self._brake_msg(1)) + self.assertFalse(self.safety.get_controls_allowed()) + self._rx(self._brake_msg(0)) # reset no brakes + + def test_not_allow_brake_when_moving(self): + # Brake was already pressed + self._rx(self._brake_msg(1)) + self.safety.set_controls_allowed(1) + self._rx(self._speed_msg(self.STANDSTILL_THRESHOLD)) + self._rx(self._brake_msg(1)) + self.assertTrue(self.safety.get_controls_allowed()) + self._rx(self._speed_msg(self.STANDSTILL_THRESHOLD + 1)) + self._rx(self._brake_msg(1)) + self.assertFalse(self.safety.get_controls_allowed()) + self._rx(self._speed_msg(0)) + +# TODO: use PandaSafetyTest for all tests and delete this class StdTest: @staticmethod def test_relay_malfunction(test, addr, bus=0): diff --git a/tests/safety/test.c b/tests/safety/test.c index d28ab80..9d5bd12 100644 --- a/tests/safety/test.c +++ b/tests/safety/test.c @@ -282,6 +282,7 @@ void init_tests(void){ safety_mode_cnt = 2U; // avoid ignoring relay_malfunction logic gas_pressed_prev = false; brake_pressed_prev = false; + unsafe_mode = 0; } void init_tests_toyota(void){ diff --git a/tests/safety/test_honda.py b/tests/safety/test_honda.py index 3b3827f..9003bb1 100755 --- a/tests/safety/test_honda.py +++ b/tests/safety/test_honda.py @@ -3,7 +3,8 @@ import unittest import numpy as np from panda import Panda from panda.tests.safety import libpandasafety_py -from panda.tests.safety.common import StdTest, make_msg, MAX_WRONG_COUNTERS, UNSAFE_MODE +from panda.tests.safety.common import StdTest, make_msg, interceptor_msg, \ + MAX_WRONG_COUNTERS, UNSAFE_MODE MAX_BRAKE = 255 @@ -87,13 +88,6 @@ class TestHondaSafety(unittest.TestCase): to_send[0].RDLR = ((brake & 0x3) << 14) | ((brake & 0x3FF) >> 2) return to_send - def _send_interceptor_msg(self, gas, addr): - to_send = make_msg(0, addr, 6) - gas2 = gas * 2 - to_send[0].RDLR = ((gas & 0xff) << 8) | ((gas & 0xff00) >> 8) | \ - ((gas2 & 0xff) << 24) | ((gas2 & 0xff00) << 8) - return to_send - def _send_steer_msg(self, steer): bus = 2 if self.safety.get_honda_hw() == HONDA_BG_HW else 0 to_send = make_msg(bus, 0xE4, 6) @@ -176,11 +170,11 @@ class TestHondaSafety(unittest.TestCase): self.assertTrue(self.safety.get_gas_pressed_prev()) def test_prev_gas_interceptor(self): - self.safety.safety_rx_hook(self._send_interceptor_msg(0x0, 0x201)) + self.safety.safety_rx_hook(interceptor_msg(0x0, 0x201)) self.assertFalse(self.safety.get_gas_interceptor_prev()) - self.safety.safety_rx_hook(self._send_interceptor_msg(0x1000, 0x201)) + self.safety.safety_rx_hook(interceptor_msg(0x1000, 0x201)) self.assertTrue(self.safety.get_gas_interceptor_prev()) - self.safety.safety_rx_hook(self._send_interceptor_msg(0x0, 0x201)) + self.safety.safety_rx_hook(interceptor_msg(0x0, 0x201)) self.safety.set_gas_interceptor_detected(False) def test_disengage_on_gas(self): @@ -205,31 +199,31 @@ class TestHondaSafety(unittest.TestCase): def test_disengage_on_gas_interceptor(self): for g in range(0, 0x1000): - self.safety.safety_rx_hook(self._send_interceptor_msg(0, 0x201)) + self.safety.safety_rx_hook(interceptor_msg(0, 0x201)) self.safety.set_controls_allowed(True) - self.safety.safety_rx_hook(self._send_interceptor_msg(g, 0x201)) + self.safety.safety_rx_hook(interceptor_msg(g, 0x201)) remain_enabled = g <= INTERCEPTOR_THRESHOLD self.assertEqual(remain_enabled, self.safety.get_controls_allowed()) - self.safety.safety_rx_hook(self._send_interceptor_msg(0, 0x201)) + self.safety.safety_rx_hook(interceptor_msg(0, 0x201)) self.safety.set_gas_interceptor_detected(False) def test_unsafe_mode_no_disengage_on_gas_interceptor(self): self.safety.set_controls_allowed(True) self.safety.set_unsafe_mode(UNSAFE_MODE.DISABLE_DISENGAGE_ON_GAS) for g in range(0, 0x1000): - self.safety.safety_rx_hook(self._send_interceptor_msg(g, 0x201)) + self.safety.safety_rx_hook(interceptor_msg(g, 0x201)) self.assertTrue(self.safety.get_controls_allowed()) - self.safety.safety_rx_hook(self._send_interceptor_msg(0, 0x201)) + self.safety.safety_rx_hook(interceptor_msg(0, 0x201)) self.safety.set_gas_interceptor_detected(False) self.safety.set_unsafe_mode(UNSAFE_MODE.DEFAULT) self.safety.set_controls_allowed(False) def test_allow_engage_with_gas_interceptor_pressed(self): - self.safety.safety_rx_hook(self._send_interceptor_msg(0x1000, 0x201)) + self.safety.safety_rx_hook(interceptor_msg(0x1000, 0x201)) self.safety.set_controls_allowed(1) - self.safety.safety_rx_hook(self._send_interceptor_msg(0x1000, 0x201)) + self.safety.safety_rx_hook(interceptor_msg(0x1000, 0x201)) self.assertTrue(self.safety.get_controls_allowed()) - self.safety.safety_rx_hook(self._send_interceptor_msg(0, 0x201)) + self.safety.safety_rx_hook(interceptor_msg(0, 0x201)) self.safety.set_gas_interceptor_detected(False) def test_brake_safety_check(self): @@ -258,7 +252,7 @@ class TestHondaSafety(unittest.TestCase): send = True else: send = gas == 0 - self.assertEqual(send, self.safety.safety_tx_hook(self._send_interceptor_msg(gas, 0x200))) + self.assertEqual(send, self.safety.safety_tx_hook(interceptor_msg(gas, 0x200))) def test_steer_safety_check(self): self.safety.set_controls_allowed(0) @@ -365,22 +359,22 @@ class TestHondaSafety(unittest.TestCase): self.safety.safety_rx_hook(self._gas_msg(1)) elif pedal == 'interceptor': # gas_interceptor_prev > INTERCEPTOR_THRESHOLD - self.safety.safety_rx_hook(self._send_interceptor_msg(INTERCEPTOR_THRESHOLD+1, 0x201)) - self.safety.safety_rx_hook(self._send_interceptor_msg(INTERCEPTOR_THRESHOLD+1, 0x201)) + self.safety.safety_rx_hook(interceptor_msg(INTERCEPTOR_THRESHOLD+1, 0x201)) + self.safety.safety_rx_hook(interceptor_msg(INTERCEPTOR_THRESHOLD+1, 0x201)) self.safety.set_controls_allowed(1) hw = self.safety.get_honda_hw() if hw == HONDA_N_HW: self.safety.set_honda_fwd_brake(False) self.assertFalse(self.safety.safety_tx_hook(self._send_brake_msg(MAX_BRAKE))) - self.assertFalse(self.safety.safety_tx_hook(self._send_interceptor_msg(INTERCEPTOR_THRESHOLD, 0x200))) + self.assertFalse(self.safety.safety_tx_hook(interceptor_msg(INTERCEPTOR_THRESHOLD, 0x200))) self.assertFalse(self.safety.safety_tx_hook(self._send_steer_msg(0x1000))) # reset status self.safety.set_controls_allowed(0) self.safety.safety_tx_hook(self._send_brake_msg(0)) self.safety.safety_tx_hook(self._send_steer_msg(0)) - self.safety.safety_tx_hook(self._send_interceptor_msg(0, 0x200)) + self.safety.safety_tx_hook(interceptor_msg(0, 0x200)) if pedal == 'brake': self.safety.safety_rx_hook(self._speed_msg(0)) self.safety.safety_rx_hook(self._brake_msg(0)) @@ -403,8 +397,8 @@ class TestHondaSafety(unittest.TestCase): allow_ctrl = True elif pedal == 'interceptor': # gas_interceptor_prev > INTERCEPTOR_THRESHOLD - self.safety.safety_rx_hook(self._send_interceptor_msg(INTERCEPTOR_THRESHOLD+1, 0x201)) - self.safety.safety_rx_hook(self._send_interceptor_msg(INTERCEPTOR_THRESHOLD+1, 0x201)) + self.safety.safety_rx_hook(interceptor_msg(INTERCEPTOR_THRESHOLD+1, 0x201)) + self.safety.safety_rx_hook(interceptor_msg(INTERCEPTOR_THRESHOLD+1, 0x201)) allow_ctrl = True self.safety.set_controls_allowed(1) @@ -412,14 +406,14 @@ class TestHondaSafety(unittest.TestCase): if hw == HONDA_N_HW: self.safety.set_honda_fwd_brake(False) self.assertEqual(allow_ctrl, self.safety.safety_tx_hook(self._send_brake_msg(MAX_BRAKE))) - self.assertEqual(allow_ctrl, self.safety.safety_tx_hook(self._send_interceptor_msg(INTERCEPTOR_THRESHOLD, 0x200))) + self.assertEqual(allow_ctrl, self.safety.safety_tx_hook(interceptor_msg(INTERCEPTOR_THRESHOLD, 0x200))) self.assertEqual(allow_ctrl, self.safety.safety_tx_hook(self._send_steer_msg(0x1000))) # reset status self.safety.set_controls_allowed(0) self.safety.set_unsafe_mode(UNSAFE_MODE.DEFAULT) self.safety.safety_tx_hook(self._send_brake_msg(0)) self.safety.safety_tx_hook(self._send_steer_msg(0)) - self.safety.safety_tx_hook(self._send_interceptor_msg(0, 0x200)) + self.safety.safety_tx_hook(interceptor_msg(0, 0x200)) if pedal == 'brake': self.safety.safety_rx_hook(self._speed_msg(0)) self.safety.safety_rx_hook(self._brake_msg(0)) diff --git a/tests/safety/test_toyota.py b/tests/safety/test_toyota.py old mode 100644 new mode 100755 index 4f7af53..5b121d4 --- a/tests/safety/test_toyota.py +++ b/tests/safety/test_toyota.py @@ -3,59 +3,42 @@ import unittest import numpy as np from panda import Panda from panda.tests.safety import libpandasafety_py -from panda.tests.safety.common import StdTest, make_msg, UNSAFE_MODE +from panda.tests.safety.common import CANPackerPanda, PandaSafetyTest, \ + interceptor_msg, UNSAFE_MODE MAX_RATE_UP = 10 MAX_RATE_DOWN = 25 MAX_TORQUE = 1500 -MAX_ACCEL = 1500 -MIN_ACCEL = -3000 +MAX_ACCEL = 1.5 +MIN_ACCEL = -3.0 -ISO_MAX_ACCEL = 2000 -ISO_MIN_ACCEL = -3500 +ISO_MAX_ACCEL = 2.0 +ISO_MIN_ACCEL = -3.5 MAX_RT_DELTA = 375 RT_INTERVAL = 250000 -STANDSTILL_THRESHOLD = 100 # 1kph - MAX_TORQUE_ERROR = 350 INTERCEPTOR_THRESHOLD = 475 -TX_MSGS = [[0x283, 0], [0x2E6, 0], [0x2E7, 0], [0x33E, 0], [0x344, 0], [0x365, 0], [0x366, 0], [0x4CB, 0], # DSU bus 0 - [0x128, 1], [0x141, 1], [0x160, 1], [0x161, 1], [0x470, 1], # DSU bus 1 - [0x2E4, 0], [0x411, 0], [0x412, 0], [0x343, 0], [0x1D2, 0], # LKAS + ACC - [0x200, 0], [0x750, 0]]; # interceptor + blindspot monitor +class TestToyotaSafety(PandaSafetyTest): + TX_MSGS = [[0x283, 0], [0x2E6, 0], [0x2E7, 0], [0x33E, 0], [0x344, 0], [0x365, 0], [0x366, 0], [0x4CB, 0], # DSU bus 0 + [0x128, 1], [0x141, 1], [0x160, 1], [0x161, 1], [0x470, 1], # DSU bus 1 + [0x2E4, 0], [0x411, 0], [0x412, 0], [0x343, 0], [0x1D2, 0], # LKAS + ACC + [0x200, 0], [0x750, 0]] # interceptor + blindspot monitor + STANDSTILL_THRESHOLD = 1 # 1kph + RELAY_MALFUNCTION_ADDR = 0x2E4 + RELAY_MALFUNCTION_BUS = 0 + FWD_BLACKLISTED_ADDRS = {2: [0x2E4, 0x412, 0x191, 0x343]} + FWD_BUS_LOOKUP = {0: 2, 2: 0} -def twos_comp(val, bits): - if val >= 0: - return val - else: - return (2**bits) + val - -def sign(a): - if a > 0: - return 1 - else: - return -1 - -def toyota_checksum(msg, addr, len_msg): - checksum = (len_msg + addr + (addr >> 8)) - for i in range(len_msg): - if i < 4: - checksum += (msg.RDLR >> (8 * i)) - else: - checksum += (msg.RDHR >> (8 * (i - 4))) - return checksum & 0xff - - -class TestToyotaSafety(unittest.TestCase): @classmethod def setUp(cls): + cls.packer = CANPackerPanda("toyota_prius_2017_pt_generated") cls.safety = libpandasafety_py.libpandasafety - cls.safety.set_safety_hooks(Panda.SAFETY_TOYOTA, 100) + cls.safety.set_safety_hooks(Panda.SAFETY_TOYOTA, 66) cls.safety.init_tests_toyota() def _set_prev_torque(self, t): @@ -64,171 +47,94 @@ class TestToyotaSafety(unittest.TestCase): self.safety.set_toyota_torque_meas(t, t) def _torque_meas_msg(self, torque): - t = twos_comp(torque, 16) - to_send = make_msg(0, 0x260) - to_send[0].RDHR = (t & 0xff00) | ((t & 0xFF) << 16) - to_send[0].RDHR |= toyota_checksum(to_send[0], 0x260, 8) << 24 - return to_send + values = {"STEER_TORQUE_EPS": torque} + return self.packer.make_can_msg_panda("STEER_TORQUE_SENSOR", 0, values) def _torque_msg(self, torque): - t = twos_comp(torque, 16) - to_send = make_msg(0, 0x2E4) - to_send[0].RDLR = t | ((t & 0xFF) << 16) - return to_send + values = {"STEER_TORQUE_CMD": torque} + return self.packer.make_can_msg_panda("STEERING_LKA", 0, values) def _accel_msg(self, accel): - to_send = make_msg(0, 0x343) - a = twos_comp(accel, 16) - to_send[0].RDLR = (a & 0xFF) << 8 | (a >> 8) - return to_send + values = {"ACCEL_CMD": accel} + return self.packer.make_can_msg_panda("ACC_CONTROL", 0, values) def _speed_msg(self, s): - offset = (0x6f << 8) + 0x1a # there is a 0x1a6f offset in the signal - to_send = make_msg(0, 0xaa) - to_send[0].RDLR = ((s & 0xFF) << 8 | (s >> 8)) + offset - to_send[0].RDLR += ((s & 0xFF) << 24 | ((s >> 8) << 16)) + (offset << 16) - to_send[0].RDHR = ((s & 0xFF) << 8 | (s >> 8)) + offset - to_send[0].RDHR += ((s & 0xFF) << 24 | ((s >> 8) << 16)) + (offset << 16) - return to_send + values = {("WHEEL_SPEED_%s"%n): s for n in ["FR", "FL", "RR", "RL"]} + return self.packer.make_can_msg_panda("WHEEL_SPEEDS", 0, values) - def _brake_msg(self, brake): - to_send = make_msg(0, 0x226) - to_send[0].RDHR = brake << 5 - to_send[0].RDHR |= toyota_checksum(to_send[0], 0x226, 8) << 24 - return to_send + def _brake_msg(self, pressed): + values = {"BRAKE_PRESSED": pressed} + return self.packer.make_can_msg_panda("BRAKE_MODULE", 0, values) - def _gas_pressed_msg(self, pressed, enable_cruise=False): - to_send = make_msg(0, 0x1D2) - to_send[0].RDLR = ((1*(not pressed)) << 4) | (1*enable_cruise << 5) - to_send[0].RDHR = (toyota_checksum(to_send[0], 0x1D2, 8) << 24) - return to_send - - def _send_interceptor_msg(self, gas, addr): - gas2 = gas * 2 - to_send = make_msg(0, addr, 6) - to_send[0].RDLR = ((gas & 0xff) << 8) | ((gas & 0xff00) >> 8) | \ - ((gas2 & 0xff) << 24) | ((gas2 & 0xff00) << 8) - return to_send + def _gas_msg(self, pressed): + cruise_active = self.safety.get_controls_allowed() + values = {"GAS_RELEASED": not pressed, "CRUISE_ACTIVE": cruise_active} + return self.packer.make_can_msg_panda("PCM_CRUISE", 0, values) def _pcm_cruise_msg(self, cruise_on): - to_send = make_msg(0, 0x1D2) - to_send[0].RDLR = cruise_on << 5 - to_send[0].RDHR = to_send[0].RDHR | (toyota_checksum(to_send[0], 0x1D2, 8) << 24) - return to_send - - def test_spam_can_buses(self): - StdTest.test_spam_can_buses(self, TX_MSGS) - - def test_relay_malfunction(self): - StdTest.test_relay_malfunction(self, 0x2E4) - - def test_default_controls_not_allowed(self): - self.assertFalse(self.safety.get_controls_allowed()) - - def test_manually_enable_controls_allowed(self): - StdTest.test_manually_enable_controls_allowed(self) + values = {"CRUISE_ACTIVE": cruise_on} + values["CHECKSUM"] = 1 + return self.packer.make_can_msg_panda("PCM_CRUISE", 0, values) def test_enable_control_allowed_from_cruise(self): - self.safety.safety_rx_hook(self._pcm_cruise_msg(False)) + self._rx(self._pcm_cruise_msg(False)) self.assertFalse(self.safety.get_controls_allowed()) - self.safety.safety_rx_hook(self._pcm_cruise_msg(True)) + self._rx(self._pcm_cruise_msg(True)) self.assertTrue(self.safety.get_controls_allowed()) def test_disable_control_allowed_from_cruise(self): self.safety.set_controls_allowed(1) - self.safety.safety_rx_hook(self._pcm_cruise_msg(False)) + self._rx(self._pcm_cruise_msg(False)) self.assertFalse(self.safety.get_controls_allowed()) - def test_prev_gas(self): - for pressed in [True, False]: - self.safety.safety_rx_hook(self._gas_pressed_msg(pressed)) - self.assertEqual(pressed, self.safety.get_gas_pressed_prev()) - def test_prev_gas_interceptor(self): - self.safety.safety_rx_hook(self._send_interceptor_msg(0x0, 0x201)) + self._rx(interceptor_msg(0x0, 0x201)) self.assertFalse(self.safety.get_gas_interceptor_prev()) - self.safety.safety_rx_hook(self._send_interceptor_msg(0x1000, 0x201)) + self._rx(interceptor_msg(0x1000, 0x201)) self.assertTrue(self.safety.get_gas_interceptor_prev()) - self.safety.safety_rx_hook(self._send_interceptor_msg(0x0, 0x201)) - self.safety.set_gas_interceptor_detected(False) - - def test_disengage_on_gas(self): - self.safety.safety_rx_hook(self._gas_pressed_msg(False)) - self.safety.set_controls_allowed(True) - self.safety.safety_rx_hook(self._gas_pressed_msg(True, enable_cruise=True)) - self.assertFalse(self.safety.get_controls_allowed()) - - def test_unsafe_mode_no_disengage_on_gas(self): - self.safety.safety_rx_hook(self._gas_pressed_msg(False)) - self.safety.set_controls_allowed(True) - self.safety.set_unsafe_mode(UNSAFE_MODE.DISABLE_DISENGAGE_ON_GAS) - self.safety.safety_rx_hook(self._gas_pressed_msg(True, enable_cruise=True)) - self.assertTrue(self.safety.get_controls_allowed()) - self.safety.set_unsafe_mode(UNSAFE_MODE.DEFAULT) - - def test_allow_engage_with_gas_pressed(self): - self.safety.safety_rx_hook(self._gas_pressed_msg(True)) - self.safety.set_controls_allowed(True) - for _ in range(2): - # since cruise msg is used for gas pedal state, cruise bit must - # also be set for this test or else it will set controls_allowed - self.safety.safety_rx_hook(self._gas_pressed_msg(True, enable_cruise=True)) - self.assertTrue(self.safety.get_controls_allowed()) + self._rx(interceptor_msg(0x0, 0x201)) def test_disengage_on_gas_interceptor(self): for g in range(0, 0x1000): - self.safety.safety_rx_hook(self._send_interceptor_msg(0, 0x201)) + self._rx(interceptor_msg(0, 0x201)) self.safety.set_controls_allowed(True) - self.safety.safety_rx_hook(self._send_interceptor_msg(g, 0x201)) + self._rx(interceptor_msg(g, 0x201)) remain_enabled = g <= INTERCEPTOR_THRESHOLD self.assertEqual(remain_enabled, self.safety.get_controls_allowed()) - self.safety.safety_rx_hook(self._send_interceptor_msg(0, 0x201)) + self._rx(interceptor_msg(0, 0x201)) self.safety.set_gas_interceptor_detected(False) def test_unsafe_mode_no_disengage_on_gas_interceptor(self): self.safety.set_controls_allowed(True) self.safety.set_unsafe_mode(UNSAFE_MODE.DISABLE_DISENGAGE_ON_GAS) for g in range(0, 0x1000): - self.safety.safety_rx_hook(self._send_interceptor_msg(g, 0x201)) + self._rx(interceptor_msg(g, 0x201)) self.assertTrue(self.safety.get_controls_allowed()) - self.safety.safety_rx_hook(self._send_interceptor_msg(0, 0x201)) + self._rx(interceptor_msg(0, 0x201)) self.safety.set_gas_interceptor_detected(False) self.safety.set_unsafe_mode(UNSAFE_MODE.DEFAULT) - self.safety.set_controls_allowed(False) - - def test_brake_disengage(self): - StdTest.test_allow_brake_at_zero_speed(self) - StdTest.test_not_allow_brake_when_moving(self, STANDSTILL_THRESHOLD) def test_allow_engage_with_gas_interceptor_pressed(self): - self.safety.safety_rx_hook(self._send_interceptor_msg(0x1000, 0x201)) + self._rx(interceptor_msg(0x1000, 0x201)) self.safety.set_controls_allowed(1) - self.safety.safety_rx_hook(self._send_interceptor_msg(0x1000, 0x201)) + self._rx(interceptor_msg(0x1000, 0x201)) self.assertTrue(self.safety.get_controls_allowed()) - self.safety.safety_rx_hook(self._send_interceptor_msg(0, 0x201)) - self.safety.set_gas_interceptor_detected(False) + self._rx(interceptor_msg(0, 0x201)) def test_accel_actuation_limits(self): - for accel in np.arange(MIN_ACCEL - 1000, MAX_ACCEL + 1000, 100): - for controls_allowed in [True, False]: - self.safety.set_controls_allowed(controls_allowed) - if controls_allowed: - send = MIN_ACCEL <= accel <= MAX_ACCEL - else: - send = accel == 0 - self.assertEqual(send, self.safety.safety_tx_hook(self._accel_msg(accel))) + limits = ((MIN_ACCEL, MAX_ACCEL, UNSAFE_MODE.DEFAULT), + (ISO_MIN_ACCEL, ISO_MAX_ACCEL, UNSAFE_MODE.RAISE_LONGITUDINAL_LIMITS_TO_ISO_MAX)) - def test_unsafe_iso_accel_actuation_limits(self): - for accel in np.arange(ISO_MIN_ACCEL - 1000, ISO_MAX_ACCEL + 1000, 100): - for controls_allowed in [True, False]: - self.safety.set_controls_allowed(controls_allowed) - self.safety.set_unsafe_mode(UNSAFE_MODE.RAISE_LONGITUDINAL_LIMITS_TO_ISO_MAX) - if controls_allowed: - send = ISO_MIN_ACCEL <= accel <= ISO_MAX_ACCEL - else: - send = accel == 0 - self.assertEqual(send, self.safety.safety_tx_hook(self._accel_msg(accel))) - self.safety.set_unsafe_mode(UNSAFE_MODE.DEFAULT) + for min_accel, max_accel, unsafe_mode in limits: + for accel in np.arange(min_accel - 1, max_accel + 1, 0.1): + for controls_allowed in [True, False]: + self.safety.set_controls_allowed(controls_allowed) + self.safety.set_unsafe_mode(unsafe_mode) + if controls_allowed: + should_tx = int(min_accel*1000) <= int(accel*1000) <= int(max_accel*1000) + else: + should_tx = np.isclose(accel, 0, atol=0.0001) + self.assertEqual(should_tx, self._tx(self._accel_msg(accel))) def test_torque_absolute_limits(self): for controls_allowed in [True, False]: @@ -243,16 +149,16 @@ class TestToyotaSafety(unittest.TestCase): else: send = torque == 0 - self.assertEqual(send, self.safety.safety_tx_hook(self._torque_msg(torque))) + self.assertEqual(send, self._tx(self._torque_msg(torque))) def test_non_realtime_limit_up(self): self.safety.set_controls_allowed(True) self._set_prev_torque(0) - self.assertTrue(self.safety.safety_tx_hook(self._torque_msg(MAX_RATE_UP))) + self.assertTrue(self._tx(self._torque_msg(MAX_RATE_UP))) self._set_prev_torque(0) - self.assertFalse(self.safety.safety_tx_hook(self._torque_msg(MAX_RATE_UP + 1))) + self.assertFalse(self._tx(self._torque_msg(MAX_RATE_UP + 1))) def test_non_realtime_limit_down(self): self.safety.set_controls_allowed(True) @@ -260,12 +166,12 @@ class TestToyotaSafety(unittest.TestCase): self.safety.set_toyota_rt_torque_last(1000) self.safety.set_toyota_torque_meas(500, 500) self.safety.set_toyota_desired_torque_last(1000) - self.assertTrue(self.safety.safety_tx_hook(self._torque_msg(1000 - MAX_RATE_DOWN))) + self.assertTrue(self._tx(self._torque_msg(1000 - MAX_RATE_DOWN))) self.safety.set_toyota_rt_torque_last(1000) self.safety.set_toyota_torque_meas(500, 500) self.safety.set_toyota_desired_torque_last(1000) - self.assertFalse(self.safety.safety_tx_hook(self._torque_msg(1000 - MAX_RATE_DOWN + 1))) + self.assertFalse(self._tx(self._torque_msg(1000 - MAX_RATE_DOWN + 1))) def test_exceed_torque_sensor(self): self.safety.set_controls_allowed(True) @@ -274,9 +180,9 @@ class TestToyotaSafety(unittest.TestCase): self._set_prev_torque(0) for t in np.arange(0, MAX_TORQUE_ERROR + 10, 10): t *= sign - self.assertTrue(self.safety.safety_tx_hook(self._torque_msg(t))) + self.assertTrue(self._tx(self._torque_msg(t))) - self.assertFalse(self.safety.safety_tx_hook(self._torque_msg(sign * (MAX_TORQUE_ERROR + 10)))) + self.assertFalse(self._tx(self._torque_msg(sign * (MAX_TORQUE_ERROR + 10)))) def test_realtime_limit_up(self): self.safety.set_controls_allowed(True) @@ -287,46 +193,42 @@ class TestToyotaSafety(unittest.TestCase): for t in np.arange(0, 380, 10): t *= sign self.safety.set_toyota_torque_meas(t, t) - self.assertTrue(self.safety.safety_tx_hook(self._torque_msg(t))) - self.assertFalse(self.safety.safety_tx_hook(self._torque_msg(sign * 380))) + self.assertTrue(self._tx(self._torque_msg(t))) + self.assertFalse(self._tx(self._torque_msg(sign * 380))) self._set_prev_torque(0) for t in np.arange(0, 370, 10): t *= sign self.safety.set_toyota_torque_meas(t, t) - self.assertTrue(self.safety.safety_tx_hook(self._torque_msg(t))) + self.assertTrue(self._tx(self._torque_msg(t))) # Increase timer to update rt_torque_last self.safety.set_timer(RT_INTERVAL + 1) - self.assertTrue(self.safety.safety_tx_hook(self._torque_msg(sign * 370))) - self.assertTrue(self.safety.safety_tx_hook(self._torque_msg(sign * 380))) + self.assertTrue(self._tx(self._torque_msg(sign * 370))) + self.assertTrue(self._tx(self._torque_msg(sign * 380))) def test_torque_measurements(self): - self.safety.safety_rx_hook(self._torque_meas_msg(50)) - self.safety.safety_rx_hook(self._torque_meas_msg(-50)) - self.safety.safety_rx_hook(self._torque_meas_msg(0)) - self.safety.safety_rx_hook(self._torque_meas_msg(0)) - self.safety.safety_rx_hook(self._torque_meas_msg(0)) - self.safety.safety_rx_hook(self._torque_meas_msg(0)) + for trq in [50, -50, 0, 0, 0, 0]: + self._rx(self._torque_meas_msg(trq)) + # toyota safety adds one to be conservative on rounding self.assertEqual(-51, self.safety.get_toyota_torque_meas_min()) self.assertEqual(51, self.safety.get_toyota_torque_meas_max()) - self.safety.safety_rx_hook(self._torque_meas_msg(0)) + self._rx(self._torque_meas_msg(0)) self.assertEqual(1, self.safety.get_toyota_torque_meas_max()) self.assertEqual(-51, self.safety.get_toyota_torque_meas_min()) - self.safety.safety_rx_hook(self._torque_meas_msg(0)) + self._rx(self._torque_meas_msg(0)) self.assertEqual(1, self.safety.get_toyota_torque_meas_max()) self.assertEqual(-1, self.safety.get_toyota_torque_meas_min()) def test_gas_interceptor_safety_check(self): - self.safety.set_controls_allowed(0) - self.assertTrue(self.safety.safety_tx_hook(self._send_interceptor_msg(0, 0x200))) - self.assertFalse(self.safety.safety_tx_hook(self._send_interceptor_msg(0x1000, 0x200))) + self.assertTrue(self._tx(interceptor_msg(0, 0x200))) + self.assertFalse(self._tx(interceptor_msg(0x1000, 0x200))) self.safety.set_controls_allowed(1) - self.assertTrue(self.safety.safety_tx_hook(self._send_interceptor_msg(0x1000, 0x200))) + self.assertTrue(self._tx(interceptor_msg(0x1000, 0x200))) def test_rx_hook(self): # checksum checks @@ -336,30 +238,11 @@ class TestToyotaSafety(unittest.TestCase): to_push = self._torque_meas_msg(0) if msg == "pcm": to_push = self._pcm_cruise_msg(1) - self.assertTrue(self.safety.safety_rx_hook(to_push)) + self.assertTrue(self._rx(to_push)) to_push[0].RDHR = 0 - self.assertFalse(self.safety.safety_rx_hook(to_push)) + self.assertFalse(self._rx(to_push)) self.assertFalse(self.safety.get_controls_allowed()) - def test_fwd_hook(self): - - buss = list(range(0x0, 0x3)) - msgs = list(range(0x1, 0x800)) - - blocked_msgs = [0x2E4, 0x412, 0x191] - blocked_msgs += [0x343] - for b in buss: - for m in msgs: - if b == 0: - fwd_bus = 2 - elif b == 1: - fwd_bus = -1 - elif b == 2: - fwd_bus = -1 if m in blocked_msgs else 0 - - # assume len 8 - self.assertEqual(fwd_bus, self.safety.safety_fwd_hook(b, make_msg(b, m, 8))) - if __name__ == "__main__": unittest.main()