Safety Test Refactor: Toyota + support code (#491)
* bring over toyota + support code from safety-test-refactor * old tests still use StdTest * don't duplicate * test fwd * make linter happy * fix indent * fix ident * fix order * whitespace * move some common tests * cleanup * unused * commentmaster
parent
500370aecd
commit
abce8f32b1
|
@ -11,7 +11,7 @@ jobs:
|
|||
- run:
|
||||
name: Run safety test
|
||||
command: |
|
||||
docker run panda_safety /bin/bash -c "cd /panda/tests/safety; PYTHONPATH=/ ./test.sh"
|
||||
docker run panda_safety /bin/bash -c "cd /openpilot/panda/tests/safety; PYTHONPATH=/openpilot ./test.sh"
|
||||
|
||||
misra-c2012:
|
||||
machine:
|
||||
|
|
|
@ -248,6 +248,7 @@ static int toyota_tx_hook(CAN_FIFOMailBox_TypeDef *to_send) {
|
|||
static void toyota_init(int16_t param) {
|
||||
controls_allowed = 0;
|
||||
relay_malfunction_reset();
|
||||
gas_interceptor_detected = 0;
|
||||
toyota_dbc_eps_torque_factor = param;
|
||||
}
|
||||
|
||||
|
|
|
@ -1,6 +1,20 @@
|
|||
FROM ubuntu:16.04
|
||||
|
||||
RUN apt-get update && apt-get install -y clang make python python-pip git curl locales zlib1g-dev libffi-dev bzip2 libssl-dev libbz2-dev libusb-1.0-0
|
||||
RUN apt-get update && apt-get install -y \
|
||||
autoconf \
|
||||
clang \
|
||||
curl \
|
||||
git \
|
||||
libtool \
|
||||
libssl-dev \
|
||||
libusb-1.0-0 \
|
||||
libzmq3-dev \
|
||||
locales \
|
||||
make \
|
||||
python \
|
||||
python-pip \
|
||||
wget \
|
||||
zlib1g-dev
|
||||
|
||||
RUN sed -i -e 's/# en_US.UTF-8 UTF-8/en_US.UTF-8 UTF-8/' /etc/locale.gen && locale-gen
|
||||
ENV LANG en_US.UTF-8
|
||||
|
@ -17,4 +31,20 @@ RUN pyenv rehash
|
|||
COPY requirements.txt /tmp/
|
||||
RUN pip install -r /tmp/requirements.txt
|
||||
|
||||
COPY . /panda
|
||||
WORKDIR /openpilot
|
||||
RUN git clone https://github.com/commaai/opendbc.git || true
|
||||
WORKDIR /openpilot/opendbc
|
||||
RUN git checkout 36c471e59eaac3760a00125d557ef19af091f289
|
||||
WORKDIR /openpilot
|
||||
RUN git clone https://github.com/commaai/cereal.git
|
||||
WORKDIR /openpilot/cereal
|
||||
RUN git checkout e370f79522ff7fc0b16f33f4fef420be48061206
|
||||
RUN /openpilot/cereal/install_capnp.sh
|
||||
|
||||
RUN pip install -r /openpilot/opendbc/requirements.txt
|
||||
|
||||
WORKDIR /openpilot
|
||||
RUN cp /openpilot/opendbc/SConstruct /openpilot
|
||||
COPY . /openpilot/panda
|
||||
|
||||
RUN scons -c && scons -j$(nproc)
|
||||
|
|
|
@ -1,3 +1,7 @@
|
|||
import abc
|
||||
import struct
|
||||
import unittest
|
||||
from opendbc.can.packer import CANPacker # pylint: disable=import-error
|
||||
from panda.tests.safety import libpandasafety_py
|
||||
|
||||
MAX_WRONG_COUNTERS = 5
|
||||
|
@ -8,17 +12,167 @@ class UNSAFE_MODE:
|
|||
DISABLE_STOCK_AEB = 2
|
||||
RAISE_LONGITUDINAL_LIMITS_TO_ISO_MAX = 8
|
||||
|
||||
def make_msg(bus, addr, length=8):
|
||||
to_send = libpandasafety_py.ffi.new('CAN_FIFOMailBox_TypeDef *')
|
||||
if addr >= 0x800:
|
||||
to_send[0].RIR = (addr << 3) | 5
|
||||
def twos_comp(val, bits):
|
||||
if val >= 0:
|
||||
return val
|
||||
else:
|
||||
to_send[0].RIR = (addr << 21) | 1
|
||||
to_send[0].RDTR = length
|
||||
to_send[0].RDTR |= bus << 4
|
||||
return (2**bits) + val
|
||||
|
||||
def package_can_msg(msg):
|
||||
addr, _, dat, bus = msg
|
||||
rdlr, rdhr = struct.unpack('II', dat.ljust(8, b'\x00'))
|
||||
|
||||
ret = libpandasafety_py.ffi.new('CAN_FIFOMailBox_TypeDef *')
|
||||
if addr >= 0x800:
|
||||
ret[0].RIR = (addr << 3) | 5
|
||||
else:
|
||||
ret[0].RIR = (addr << 21) | 1
|
||||
ret[0].RDTR = len(dat) | ((bus & 0xF) << 4)
|
||||
ret[0].RDHR = rdhr
|
||||
ret[0].RDLR = rdlr
|
||||
|
||||
return ret
|
||||
|
||||
def make_msg(bus, addr, length=8):
|
||||
return package_can_msg([addr, 0, b'\x00'*length, bus])
|
||||
|
||||
def interceptor_msg(gas, addr):
|
||||
to_send = make_msg(0, addr, 6)
|
||||
gas2 = gas * 2
|
||||
to_send[0].RDLR = ((gas & 0xff) << 8) | ((gas & 0xff00) >> 8) | \
|
||||
((gas2 & 0xff) << 24) | ((gas2 & 0xff00) << 8)
|
||||
return to_send
|
||||
|
||||
class CANPackerPanda(CANPacker):
|
||||
def make_can_msg_panda(self, name_or_addr, bus, values, counter=-1):
|
||||
msg = self.make_can_msg(name_or_addr, bus, values, counter=-1)
|
||||
return package_can_msg(msg)
|
||||
|
||||
class PandaSafetyTest(unittest.TestCase):
|
||||
TX_MSGS = None
|
||||
STANDSTILL_THRESHOLD = None
|
||||
RELAY_MALFUNCTION_ADDR = None
|
||||
RELAY_MALFUNCTION_BUS = None
|
||||
FWD_BLACKLISTED_ADDRS = {} # {bus: [addr]}
|
||||
FWD_BUS_LOOKUP = {}
|
||||
|
||||
@classmethod
|
||||
def setUpClass(cls):
|
||||
if cls.__name__ == "PandaSafetyTest":
|
||||
cls.safety = None
|
||||
raise unittest.SkipTest
|
||||
|
||||
def _rx(self, msg):
|
||||
return self.safety.safety_rx_hook(msg)
|
||||
|
||||
def _tx(self, msg):
|
||||
return self.safety.safety_tx_hook(msg)
|
||||
|
||||
@abc.abstractmethod
|
||||
def _brake_msg(self, brake):
|
||||
pass
|
||||
|
||||
@abc.abstractmethod
|
||||
def _speed_msg(self, speed):
|
||||
pass
|
||||
|
||||
@abc.abstractmethod
|
||||
def _gas_msg(self, speed):
|
||||
pass
|
||||
|
||||
# ***** standard tests for all safety modes *****
|
||||
|
||||
def test_relay_malfunction(self):
|
||||
# each car has an addr that is used to detect relay malfunction
|
||||
# if that addr is seen on specified bus, triggers the relay malfunction
|
||||
# protection logic: both tx_hook and fwd_hook are expected to return failure
|
||||
self.assertFalse(self.safety.get_relay_malfunction())
|
||||
self._rx(make_msg(self.RELAY_MALFUNCTION_BUS, self.RELAY_MALFUNCTION_ADDR, 8))
|
||||
self.assertTrue(self.safety.get_relay_malfunction())
|
||||
for a in range(1, 0x800):
|
||||
for b in range(0, 3):
|
||||
self.assertFalse(self._tx(make_msg(b, a, 8)))
|
||||
self.assertEqual(-1, self.safety.safety_fwd_hook(b, make_msg(b, a, 8)))
|
||||
|
||||
def test_fwd_hook(self):
|
||||
# some safety modes don't forward anything, while others blacklist msgs
|
||||
for bus in range(0x0, 0x3):
|
||||
for addr in range(0x1, 0x800):
|
||||
# assume len 8
|
||||
msg = make_msg(bus, addr, 8)
|
||||
fwd_bus = self.FWD_BUS_LOOKUP.get(bus, -1)
|
||||
if bus in self.FWD_BLACKLISTED_ADDRS and addr in self.FWD_BLACKLISTED_ADDRS[bus]:
|
||||
fwd_bus = -1
|
||||
self.assertEqual(fwd_bus, self.safety.safety_fwd_hook(bus, msg))
|
||||
|
||||
def test_spam_can_buses(self):
|
||||
for addr in range(1, 0x800):
|
||||
for bus in range(0, 4):
|
||||
if all(addr != m[0] or bus != m[1] for m in self.TX_MSGS):
|
||||
self.assertFalse(self._tx(make_msg(bus, addr, 8)))
|
||||
|
||||
def test_default_controls_not_allowed(self):
|
||||
self.assertFalse(self.safety.get_controls_allowed())
|
||||
|
||||
def test_manually_enable_controls_allowed(self):
|
||||
self.safety.set_controls_allowed(1)
|
||||
self.assertTrue(self.safety.get_controls_allowed())
|
||||
self.safety.set_controls_allowed(0)
|
||||
self.assertFalse(self.safety.get_controls_allowed())
|
||||
|
||||
def test_prev_gas(self):
|
||||
for pressed in [True, False]:
|
||||
self._rx(self._gas_msg(pressed))
|
||||
self.assertEqual(pressed, self.safety.get_gas_pressed_prev())
|
||||
|
||||
def test_allow_engage_with_gas_pressed(self):
|
||||
self._rx(self._gas_msg(1))
|
||||
self.safety.set_controls_allowed(True)
|
||||
self._rx(self._gas_msg(1))
|
||||
self.assertTrue(self.safety.get_controls_allowed())
|
||||
self._rx(self._gas_msg(1))
|
||||
self.assertTrue(self.safety.get_controls_allowed())
|
||||
|
||||
def test_disengage_on_gas(self):
|
||||
self._rx(self._gas_msg(0))
|
||||
self.safety.set_controls_allowed(True)
|
||||
self._rx(self._gas_msg(1))
|
||||
self.assertFalse(self.safety.get_controls_allowed())
|
||||
|
||||
def test_unsafe_mode_no_disengage_on_gas(self):
|
||||
self._rx(self._gas_msg(0))
|
||||
self.safety.set_controls_allowed(True)
|
||||
self.safety.set_unsafe_mode(UNSAFE_MODE.DISABLE_DISENGAGE_ON_GAS)
|
||||
self._rx(self._gas_msg(1))
|
||||
self.assertTrue(self.safety.get_controls_allowed())
|
||||
|
||||
def test_allow_brake_at_zero_speed(self):
|
||||
# Brake was already pressed
|
||||
self._rx(self._speed_msg(0))
|
||||
self._rx(self._brake_msg(1))
|
||||
self.safety.set_controls_allowed(1)
|
||||
self._rx(self._brake_msg(1))
|
||||
self.assertTrue(self.safety.get_controls_allowed())
|
||||
self._rx(self._brake_msg(0))
|
||||
self.assertTrue(self.safety.get_controls_allowed())
|
||||
# rising edge of brake should disengage
|
||||
self._rx(self._brake_msg(1))
|
||||
self.assertFalse(self.safety.get_controls_allowed())
|
||||
self._rx(self._brake_msg(0)) # reset no brakes
|
||||
|
||||
def test_not_allow_brake_when_moving(self):
|
||||
# Brake was already pressed
|
||||
self._rx(self._brake_msg(1))
|
||||
self.safety.set_controls_allowed(1)
|
||||
self._rx(self._speed_msg(self.STANDSTILL_THRESHOLD))
|
||||
self._rx(self._brake_msg(1))
|
||||
self.assertTrue(self.safety.get_controls_allowed())
|
||||
self._rx(self._speed_msg(self.STANDSTILL_THRESHOLD + 1))
|
||||
self._rx(self._brake_msg(1))
|
||||
self.assertFalse(self.safety.get_controls_allowed())
|
||||
self._rx(self._speed_msg(0))
|
||||
|
||||
# TODO: use PandaSafetyTest for all tests and delete this
|
||||
class StdTest:
|
||||
@staticmethod
|
||||
def test_relay_malfunction(test, addr, bus=0):
|
||||
|
|
|
@ -282,6 +282,7 @@ void init_tests(void){
|
|||
safety_mode_cnt = 2U; // avoid ignoring relay_malfunction logic
|
||||
gas_pressed_prev = false;
|
||||
brake_pressed_prev = false;
|
||||
unsafe_mode = 0;
|
||||
}
|
||||
|
||||
void init_tests_toyota(void){
|
||||
|
|
|
@ -3,7 +3,8 @@ import unittest
|
|||
import numpy as np
|
||||
from panda import Panda
|
||||
from panda.tests.safety import libpandasafety_py
|
||||
from panda.tests.safety.common import StdTest, make_msg, MAX_WRONG_COUNTERS, UNSAFE_MODE
|
||||
from panda.tests.safety.common import StdTest, make_msg, interceptor_msg, \
|
||||
MAX_WRONG_COUNTERS, UNSAFE_MODE
|
||||
|
||||
MAX_BRAKE = 255
|
||||
|
||||
|
@ -87,13 +88,6 @@ class TestHondaSafety(unittest.TestCase):
|
|||
to_send[0].RDLR = ((brake & 0x3) << 14) | ((brake & 0x3FF) >> 2)
|
||||
return to_send
|
||||
|
||||
def _send_interceptor_msg(self, gas, addr):
|
||||
to_send = make_msg(0, addr, 6)
|
||||
gas2 = gas * 2
|
||||
to_send[0].RDLR = ((gas & 0xff) << 8) | ((gas & 0xff00) >> 8) | \
|
||||
((gas2 & 0xff) << 24) | ((gas2 & 0xff00) << 8)
|
||||
return to_send
|
||||
|
||||
def _send_steer_msg(self, steer):
|
||||
bus = 2 if self.safety.get_honda_hw() == HONDA_BG_HW else 0
|
||||
to_send = make_msg(bus, 0xE4, 6)
|
||||
|
@ -176,11 +170,11 @@ class TestHondaSafety(unittest.TestCase):
|
|||
self.assertTrue(self.safety.get_gas_pressed_prev())
|
||||
|
||||
def test_prev_gas_interceptor(self):
|
||||
self.safety.safety_rx_hook(self._send_interceptor_msg(0x0, 0x201))
|
||||
self.safety.safety_rx_hook(interceptor_msg(0x0, 0x201))
|
||||
self.assertFalse(self.safety.get_gas_interceptor_prev())
|
||||
self.safety.safety_rx_hook(self._send_interceptor_msg(0x1000, 0x201))
|
||||
self.safety.safety_rx_hook(interceptor_msg(0x1000, 0x201))
|
||||
self.assertTrue(self.safety.get_gas_interceptor_prev())
|
||||
self.safety.safety_rx_hook(self._send_interceptor_msg(0x0, 0x201))
|
||||
self.safety.safety_rx_hook(interceptor_msg(0x0, 0x201))
|
||||
self.safety.set_gas_interceptor_detected(False)
|
||||
|
||||
def test_disengage_on_gas(self):
|
||||
|
@ -205,31 +199,31 @@ class TestHondaSafety(unittest.TestCase):
|
|||
|
||||
def test_disengage_on_gas_interceptor(self):
|
||||
for g in range(0, 0x1000):
|
||||
self.safety.safety_rx_hook(self._send_interceptor_msg(0, 0x201))
|
||||
self.safety.safety_rx_hook(interceptor_msg(0, 0x201))
|
||||
self.safety.set_controls_allowed(True)
|
||||
self.safety.safety_rx_hook(self._send_interceptor_msg(g, 0x201))
|
||||
self.safety.safety_rx_hook(interceptor_msg(g, 0x201))
|
||||
remain_enabled = g <= INTERCEPTOR_THRESHOLD
|
||||
self.assertEqual(remain_enabled, self.safety.get_controls_allowed())
|
||||
self.safety.safety_rx_hook(self._send_interceptor_msg(0, 0x201))
|
||||
self.safety.safety_rx_hook(interceptor_msg(0, 0x201))
|
||||
self.safety.set_gas_interceptor_detected(False)
|
||||
|
||||
def test_unsafe_mode_no_disengage_on_gas_interceptor(self):
|
||||
self.safety.set_controls_allowed(True)
|
||||
self.safety.set_unsafe_mode(UNSAFE_MODE.DISABLE_DISENGAGE_ON_GAS)
|
||||
for g in range(0, 0x1000):
|
||||
self.safety.safety_rx_hook(self._send_interceptor_msg(g, 0x201))
|
||||
self.safety.safety_rx_hook(interceptor_msg(g, 0x201))
|
||||
self.assertTrue(self.safety.get_controls_allowed())
|
||||
self.safety.safety_rx_hook(self._send_interceptor_msg(0, 0x201))
|
||||
self.safety.safety_rx_hook(interceptor_msg(0, 0x201))
|
||||
self.safety.set_gas_interceptor_detected(False)
|
||||
self.safety.set_unsafe_mode(UNSAFE_MODE.DEFAULT)
|
||||
self.safety.set_controls_allowed(False)
|
||||
|
||||
def test_allow_engage_with_gas_interceptor_pressed(self):
|
||||
self.safety.safety_rx_hook(self._send_interceptor_msg(0x1000, 0x201))
|
||||
self.safety.safety_rx_hook(interceptor_msg(0x1000, 0x201))
|
||||
self.safety.set_controls_allowed(1)
|
||||
self.safety.safety_rx_hook(self._send_interceptor_msg(0x1000, 0x201))
|
||||
self.safety.safety_rx_hook(interceptor_msg(0x1000, 0x201))
|
||||
self.assertTrue(self.safety.get_controls_allowed())
|
||||
self.safety.safety_rx_hook(self._send_interceptor_msg(0, 0x201))
|
||||
self.safety.safety_rx_hook(interceptor_msg(0, 0x201))
|
||||
self.safety.set_gas_interceptor_detected(False)
|
||||
|
||||
def test_brake_safety_check(self):
|
||||
|
@ -258,7 +252,7 @@ class TestHondaSafety(unittest.TestCase):
|
|||
send = True
|
||||
else:
|
||||
send = gas == 0
|
||||
self.assertEqual(send, self.safety.safety_tx_hook(self._send_interceptor_msg(gas, 0x200)))
|
||||
self.assertEqual(send, self.safety.safety_tx_hook(interceptor_msg(gas, 0x200)))
|
||||
|
||||
def test_steer_safety_check(self):
|
||||
self.safety.set_controls_allowed(0)
|
||||
|
@ -365,22 +359,22 @@ class TestHondaSafety(unittest.TestCase):
|
|||
self.safety.safety_rx_hook(self._gas_msg(1))
|
||||
elif pedal == 'interceptor':
|
||||
# gas_interceptor_prev > INTERCEPTOR_THRESHOLD
|
||||
self.safety.safety_rx_hook(self._send_interceptor_msg(INTERCEPTOR_THRESHOLD+1, 0x201))
|
||||
self.safety.safety_rx_hook(self._send_interceptor_msg(INTERCEPTOR_THRESHOLD+1, 0x201))
|
||||
self.safety.safety_rx_hook(interceptor_msg(INTERCEPTOR_THRESHOLD+1, 0x201))
|
||||
self.safety.safety_rx_hook(interceptor_msg(INTERCEPTOR_THRESHOLD+1, 0x201))
|
||||
|
||||
self.safety.set_controls_allowed(1)
|
||||
hw = self.safety.get_honda_hw()
|
||||
if hw == HONDA_N_HW:
|
||||
self.safety.set_honda_fwd_brake(False)
|
||||
self.assertFalse(self.safety.safety_tx_hook(self._send_brake_msg(MAX_BRAKE)))
|
||||
self.assertFalse(self.safety.safety_tx_hook(self._send_interceptor_msg(INTERCEPTOR_THRESHOLD, 0x200)))
|
||||
self.assertFalse(self.safety.safety_tx_hook(interceptor_msg(INTERCEPTOR_THRESHOLD, 0x200)))
|
||||
self.assertFalse(self.safety.safety_tx_hook(self._send_steer_msg(0x1000)))
|
||||
|
||||
# reset status
|
||||
self.safety.set_controls_allowed(0)
|
||||
self.safety.safety_tx_hook(self._send_brake_msg(0))
|
||||
self.safety.safety_tx_hook(self._send_steer_msg(0))
|
||||
self.safety.safety_tx_hook(self._send_interceptor_msg(0, 0x200))
|
||||
self.safety.safety_tx_hook(interceptor_msg(0, 0x200))
|
||||
if pedal == 'brake':
|
||||
self.safety.safety_rx_hook(self._speed_msg(0))
|
||||
self.safety.safety_rx_hook(self._brake_msg(0))
|
||||
|
@ -403,8 +397,8 @@ class TestHondaSafety(unittest.TestCase):
|
|||
allow_ctrl = True
|
||||
elif pedal == 'interceptor':
|
||||
# gas_interceptor_prev > INTERCEPTOR_THRESHOLD
|
||||
self.safety.safety_rx_hook(self._send_interceptor_msg(INTERCEPTOR_THRESHOLD+1, 0x201))
|
||||
self.safety.safety_rx_hook(self._send_interceptor_msg(INTERCEPTOR_THRESHOLD+1, 0x201))
|
||||
self.safety.safety_rx_hook(interceptor_msg(INTERCEPTOR_THRESHOLD+1, 0x201))
|
||||
self.safety.safety_rx_hook(interceptor_msg(INTERCEPTOR_THRESHOLD+1, 0x201))
|
||||
allow_ctrl = True
|
||||
|
||||
self.safety.set_controls_allowed(1)
|
||||
|
@ -412,14 +406,14 @@ class TestHondaSafety(unittest.TestCase):
|
|||
if hw == HONDA_N_HW:
|
||||
self.safety.set_honda_fwd_brake(False)
|
||||
self.assertEqual(allow_ctrl, self.safety.safety_tx_hook(self._send_brake_msg(MAX_BRAKE)))
|
||||
self.assertEqual(allow_ctrl, self.safety.safety_tx_hook(self._send_interceptor_msg(INTERCEPTOR_THRESHOLD, 0x200)))
|
||||
self.assertEqual(allow_ctrl, self.safety.safety_tx_hook(interceptor_msg(INTERCEPTOR_THRESHOLD, 0x200)))
|
||||
self.assertEqual(allow_ctrl, self.safety.safety_tx_hook(self._send_steer_msg(0x1000)))
|
||||
# reset status
|
||||
self.safety.set_controls_allowed(0)
|
||||
self.safety.set_unsafe_mode(UNSAFE_MODE.DEFAULT)
|
||||
self.safety.safety_tx_hook(self._send_brake_msg(0))
|
||||
self.safety.safety_tx_hook(self._send_steer_msg(0))
|
||||
self.safety.safety_tx_hook(self._send_interceptor_msg(0, 0x200))
|
||||
self.safety.safety_tx_hook(interceptor_msg(0, 0x200))
|
||||
if pedal == 'brake':
|
||||
self.safety.safety_rx_hook(self._speed_msg(0))
|
||||
self.safety.safety_rx_hook(self._brake_msg(0))
|
||||
|
|
|
@ -3,59 +3,42 @@ import unittest
|
|||
import numpy as np
|
||||
from panda import Panda
|
||||
from panda.tests.safety import libpandasafety_py
|
||||
from panda.tests.safety.common import StdTest, make_msg, UNSAFE_MODE
|
||||
from panda.tests.safety.common import CANPackerPanda, PandaSafetyTest, \
|
||||
interceptor_msg, UNSAFE_MODE
|
||||
|
||||
MAX_RATE_UP = 10
|
||||
MAX_RATE_DOWN = 25
|
||||
MAX_TORQUE = 1500
|
||||
|
||||
MAX_ACCEL = 1500
|
||||
MIN_ACCEL = -3000
|
||||
MAX_ACCEL = 1.5
|
||||
MIN_ACCEL = -3.0
|
||||
|
||||
ISO_MAX_ACCEL = 2000
|
||||
ISO_MIN_ACCEL = -3500
|
||||
ISO_MAX_ACCEL = 2.0
|
||||
ISO_MIN_ACCEL = -3.5
|
||||
|
||||
MAX_RT_DELTA = 375
|
||||
RT_INTERVAL = 250000
|
||||
|
||||
STANDSTILL_THRESHOLD = 100 # 1kph
|
||||
|
||||
MAX_TORQUE_ERROR = 350
|
||||
INTERCEPTOR_THRESHOLD = 475
|
||||
|
||||
TX_MSGS = [[0x283, 0], [0x2E6, 0], [0x2E7, 0], [0x33E, 0], [0x344, 0], [0x365, 0], [0x366, 0], [0x4CB, 0], # DSU bus 0
|
||||
[0x128, 1], [0x141, 1], [0x160, 1], [0x161, 1], [0x470, 1], # DSU bus 1
|
||||
[0x2E4, 0], [0x411, 0], [0x412, 0], [0x343, 0], [0x1D2, 0], # LKAS + ACC
|
||||
[0x200, 0], [0x750, 0]]; # interceptor + blindspot monitor
|
||||
class TestToyotaSafety(PandaSafetyTest):
|
||||
|
||||
TX_MSGS = [[0x283, 0], [0x2E6, 0], [0x2E7, 0], [0x33E, 0], [0x344, 0], [0x365, 0], [0x366, 0], [0x4CB, 0], # DSU bus 0
|
||||
[0x128, 1], [0x141, 1], [0x160, 1], [0x161, 1], [0x470, 1], # DSU bus 1
|
||||
[0x2E4, 0], [0x411, 0], [0x412, 0], [0x343, 0], [0x1D2, 0], # LKAS + ACC
|
||||
[0x200, 0], [0x750, 0]] # interceptor + blindspot monitor
|
||||
STANDSTILL_THRESHOLD = 1 # 1kph
|
||||
RELAY_MALFUNCTION_ADDR = 0x2E4
|
||||
RELAY_MALFUNCTION_BUS = 0
|
||||
FWD_BLACKLISTED_ADDRS = {2: [0x2E4, 0x412, 0x191, 0x343]}
|
||||
FWD_BUS_LOOKUP = {0: 2, 2: 0}
|
||||
|
||||
def twos_comp(val, bits):
|
||||
if val >= 0:
|
||||
return val
|
||||
else:
|
||||
return (2**bits) + val
|
||||
|
||||
def sign(a):
|
||||
if a > 0:
|
||||
return 1
|
||||
else:
|
||||
return -1
|
||||
|
||||
def toyota_checksum(msg, addr, len_msg):
|
||||
checksum = (len_msg + addr + (addr >> 8))
|
||||
for i in range(len_msg):
|
||||
if i < 4:
|
||||
checksum += (msg.RDLR >> (8 * i))
|
||||
else:
|
||||
checksum += (msg.RDHR >> (8 * (i - 4)))
|
||||
return checksum & 0xff
|
||||
|
||||
|
||||
class TestToyotaSafety(unittest.TestCase):
|
||||
@classmethod
|
||||
def setUp(cls):
|
||||
cls.packer = CANPackerPanda("toyota_prius_2017_pt_generated")
|
||||
cls.safety = libpandasafety_py.libpandasafety
|
||||
cls.safety.set_safety_hooks(Panda.SAFETY_TOYOTA, 100)
|
||||
cls.safety.set_safety_hooks(Panda.SAFETY_TOYOTA, 66)
|
||||
cls.safety.init_tests_toyota()
|
||||
|
||||
def _set_prev_torque(self, t):
|
||||
|
@ -64,171 +47,94 @@ class TestToyotaSafety(unittest.TestCase):
|
|||
self.safety.set_toyota_torque_meas(t, t)
|
||||
|
||||
def _torque_meas_msg(self, torque):
|
||||
t = twos_comp(torque, 16)
|
||||
to_send = make_msg(0, 0x260)
|
||||
to_send[0].RDHR = (t & 0xff00) | ((t & 0xFF) << 16)
|
||||
to_send[0].RDHR |= toyota_checksum(to_send[0], 0x260, 8) << 24
|
||||
return to_send
|
||||
values = {"STEER_TORQUE_EPS": torque}
|
||||
return self.packer.make_can_msg_panda("STEER_TORQUE_SENSOR", 0, values)
|
||||
|
||||
def _torque_msg(self, torque):
|
||||
t = twos_comp(torque, 16)
|
||||
to_send = make_msg(0, 0x2E4)
|
||||
to_send[0].RDLR = t | ((t & 0xFF) << 16)
|
||||
return to_send
|
||||
values = {"STEER_TORQUE_CMD": torque}
|
||||
return self.packer.make_can_msg_panda("STEERING_LKA", 0, values)
|
||||
|
||||
def _accel_msg(self, accel):
|
||||
to_send = make_msg(0, 0x343)
|
||||
a = twos_comp(accel, 16)
|
||||
to_send[0].RDLR = (a & 0xFF) << 8 | (a >> 8)
|
||||
return to_send
|
||||
values = {"ACCEL_CMD": accel}
|
||||
return self.packer.make_can_msg_panda("ACC_CONTROL", 0, values)
|
||||
|
||||
def _speed_msg(self, s):
|
||||
offset = (0x6f << 8) + 0x1a # there is a 0x1a6f offset in the signal
|
||||
to_send = make_msg(0, 0xaa)
|
||||
to_send[0].RDLR = ((s & 0xFF) << 8 | (s >> 8)) + offset
|
||||
to_send[0].RDLR += ((s & 0xFF) << 24 | ((s >> 8) << 16)) + (offset << 16)
|
||||
to_send[0].RDHR = ((s & 0xFF) << 8 | (s >> 8)) + offset
|
||||
to_send[0].RDHR += ((s & 0xFF) << 24 | ((s >> 8) << 16)) + (offset << 16)
|
||||
return to_send
|
||||
values = {("WHEEL_SPEED_%s"%n): s for n in ["FR", "FL", "RR", "RL"]}
|
||||
return self.packer.make_can_msg_panda("WHEEL_SPEEDS", 0, values)
|
||||
|
||||
def _brake_msg(self, brake):
|
||||
to_send = make_msg(0, 0x226)
|
||||
to_send[0].RDHR = brake << 5
|
||||
to_send[0].RDHR |= toyota_checksum(to_send[0], 0x226, 8) << 24
|
||||
return to_send
|
||||
def _brake_msg(self, pressed):
|
||||
values = {"BRAKE_PRESSED": pressed}
|
||||
return self.packer.make_can_msg_panda("BRAKE_MODULE", 0, values)
|
||||
|
||||
def _gas_pressed_msg(self, pressed, enable_cruise=False):
|
||||
to_send = make_msg(0, 0x1D2)
|
||||
to_send[0].RDLR = ((1*(not pressed)) << 4) | (1*enable_cruise << 5)
|
||||
to_send[0].RDHR = (toyota_checksum(to_send[0], 0x1D2, 8) << 24)
|
||||
return to_send
|
||||
|
||||
def _send_interceptor_msg(self, gas, addr):
|
||||
gas2 = gas * 2
|
||||
to_send = make_msg(0, addr, 6)
|
||||
to_send[0].RDLR = ((gas & 0xff) << 8) | ((gas & 0xff00) >> 8) | \
|
||||
((gas2 & 0xff) << 24) | ((gas2 & 0xff00) << 8)
|
||||
return to_send
|
||||
def _gas_msg(self, pressed):
|
||||
cruise_active = self.safety.get_controls_allowed()
|
||||
values = {"GAS_RELEASED": not pressed, "CRUISE_ACTIVE": cruise_active}
|
||||
return self.packer.make_can_msg_panda("PCM_CRUISE", 0, values)
|
||||
|
||||
def _pcm_cruise_msg(self, cruise_on):
|
||||
to_send = make_msg(0, 0x1D2)
|
||||
to_send[0].RDLR = cruise_on << 5
|
||||
to_send[0].RDHR = to_send[0].RDHR | (toyota_checksum(to_send[0], 0x1D2, 8) << 24)
|
||||
return to_send
|
||||
|
||||
def test_spam_can_buses(self):
|
||||
StdTest.test_spam_can_buses(self, TX_MSGS)
|
||||
|
||||
def test_relay_malfunction(self):
|
||||
StdTest.test_relay_malfunction(self, 0x2E4)
|
||||
|
||||
def test_default_controls_not_allowed(self):
|
||||
self.assertFalse(self.safety.get_controls_allowed())
|
||||
|
||||
def test_manually_enable_controls_allowed(self):
|
||||
StdTest.test_manually_enable_controls_allowed(self)
|
||||
values = {"CRUISE_ACTIVE": cruise_on}
|
||||
values["CHECKSUM"] = 1
|
||||
return self.packer.make_can_msg_panda("PCM_CRUISE", 0, values)
|
||||
|
||||
def test_enable_control_allowed_from_cruise(self):
|
||||
self.safety.safety_rx_hook(self._pcm_cruise_msg(False))
|
||||
self._rx(self._pcm_cruise_msg(False))
|
||||
self.assertFalse(self.safety.get_controls_allowed())
|
||||
self.safety.safety_rx_hook(self._pcm_cruise_msg(True))
|
||||
self._rx(self._pcm_cruise_msg(True))
|
||||
self.assertTrue(self.safety.get_controls_allowed())
|
||||
|
||||
def test_disable_control_allowed_from_cruise(self):
|
||||
self.safety.set_controls_allowed(1)
|
||||
self.safety.safety_rx_hook(self._pcm_cruise_msg(False))
|
||||
self._rx(self._pcm_cruise_msg(False))
|
||||
self.assertFalse(self.safety.get_controls_allowed())
|
||||
|
||||
def test_prev_gas(self):
|
||||
for pressed in [True, False]:
|
||||
self.safety.safety_rx_hook(self._gas_pressed_msg(pressed))
|
||||
self.assertEqual(pressed, self.safety.get_gas_pressed_prev())
|
||||
|
||||
def test_prev_gas_interceptor(self):
|
||||
self.safety.safety_rx_hook(self._send_interceptor_msg(0x0, 0x201))
|
||||
self._rx(interceptor_msg(0x0, 0x201))
|
||||
self.assertFalse(self.safety.get_gas_interceptor_prev())
|
||||
self.safety.safety_rx_hook(self._send_interceptor_msg(0x1000, 0x201))
|
||||
self._rx(interceptor_msg(0x1000, 0x201))
|
||||
self.assertTrue(self.safety.get_gas_interceptor_prev())
|
||||
self.safety.safety_rx_hook(self._send_interceptor_msg(0x0, 0x201))
|
||||
self.safety.set_gas_interceptor_detected(False)
|
||||
|
||||
def test_disengage_on_gas(self):
|
||||
self.safety.safety_rx_hook(self._gas_pressed_msg(False))
|
||||
self.safety.set_controls_allowed(True)
|
||||
self.safety.safety_rx_hook(self._gas_pressed_msg(True, enable_cruise=True))
|
||||
self.assertFalse(self.safety.get_controls_allowed())
|
||||
|
||||
def test_unsafe_mode_no_disengage_on_gas(self):
|
||||
self.safety.safety_rx_hook(self._gas_pressed_msg(False))
|
||||
self.safety.set_controls_allowed(True)
|
||||
self.safety.set_unsafe_mode(UNSAFE_MODE.DISABLE_DISENGAGE_ON_GAS)
|
||||
self.safety.safety_rx_hook(self._gas_pressed_msg(True, enable_cruise=True))
|
||||
self.assertTrue(self.safety.get_controls_allowed())
|
||||
self.safety.set_unsafe_mode(UNSAFE_MODE.DEFAULT)
|
||||
|
||||
def test_allow_engage_with_gas_pressed(self):
|
||||
self.safety.safety_rx_hook(self._gas_pressed_msg(True))
|
||||
self.safety.set_controls_allowed(True)
|
||||
for _ in range(2):
|
||||
# since cruise msg is used for gas pedal state, cruise bit must
|
||||
# also be set for this test or else it will set controls_allowed
|
||||
self.safety.safety_rx_hook(self._gas_pressed_msg(True, enable_cruise=True))
|
||||
self.assertTrue(self.safety.get_controls_allowed())
|
||||
self._rx(interceptor_msg(0x0, 0x201))
|
||||
|
||||
def test_disengage_on_gas_interceptor(self):
|
||||
for g in range(0, 0x1000):
|
||||
self.safety.safety_rx_hook(self._send_interceptor_msg(0, 0x201))
|
||||
self._rx(interceptor_msg(0, 0x201))
|
||||
self.safety.set_controls_allowed(True)
|
||||
self.safety.safety_rx_hook(self._send_interceptor_msg(g, 0x201))
|
||||
self._rx(interceptor_msg(g, 0x201))
|
||||
remain_enabled = g <= INTERCEPTOR_THRESHOLD
|
||||
self.assertEqual(remain_enabled, self.safety.get_controls_allowed())
|
||||
self.safety.safety_rx_hook(self._send_interceptor_msg(0, 0x201))
|
||||
self._rx(interceptor_msg(0, 0x201))
|
||||
self.safety.set_gas_interceptor_detected(False)
|
||||
|
||||
def test_unsafe_mode_no_disengage_on_gas_interceptor(self):
|
||||
self.safety.set_controls_allowed(True)
|
||||
self.safety.set_unsafe_mode(UNSAFE_MODE.DISABLE_DISENGAGE_ON_GAS)
|
||||
for g in range(0, 0x1000):
|
||||
self.safety.safety_rx_hook(self._send_interceptor_msg(g, 0x201))
|
||||
self._rx(interceptor_msg(g, 0x201))
|
||||
self.assertTrue(self.safety.get_controls_allowed())
|
||||
self.safety.safety_rx_hook(self._send_interceptor_msg(0, 0x201))
|
||||
self._rx(interceptor_msg(0, 0x201))
|
||||
self.safety.set_gas_interceptor_detected(False)
|
||||
self.safety.set_unsafe_mode(UNSAFE_MODE.DEFAULT)
|
||||
self.safety.set_controls_allowed(False)
|
||||
|
||||
def test_brake_disengage(self):
|
||||
StdTest.test_allow_brake_at_zero_speed(self)
|
||||
StdTest.test_not_allow_brake_when_moving(self, STANDSTILL_THRESHOLD)
|
||||
|
||||
def test_allow_engage_with_gas_interceptor_pressed(self):
|
||||
self.safety.safety_rx_hook(self._send_interceptor_msg(0x1000, 0x201))
|
||||
self._rx(interceptor_msg(0x1000, 0x201))
|
||||
self.safety.set_controls_allowed(1)
|
||||
self.safety.safety_rx_hook(self._send_interceptor_msg(0x1000, 0x201))
|
||||
self._rx(interceptor_msg(0x1000, 0x201))
|
||||
self.assertTrue(self.safety.get_controls_allowed())
|
||||
self.safety.safety_rx_hook(self._send_interceptor_msg(0, 0x201))
|
||||
self.safety.set_gas_interceptor_detected(False)
|
||||
self._rx(interceptor_msg(0, 0x201))
|
||||
|
||||
def test_accel_actuation_limits(self):
|
||||
for accel in np.arange(MIN_ACCEL - 1000, MAX_ACCEL + 1000, 100):
|
||||
for controls_allowed in [True, False]:
|
||||
self.safety.set_controls_allowed(controls_allowed)
|
||||
if controls_allowed:
|
||||
send = MIN_ACCEL <= accel <= MAX_ACCEL
|
||||
else:
|
||||
send = accel == 0
|
||||
self.assertEqual(send, self.safety.safety_tx_hook(self._accel_msg(accel)))
|
||||
limits = ((MIN_ACCEL, MAX_ACCEL, UNSAFE_MODE.DEFAULT),
|
||||
(ISO_MIN_ACCEL, ISO_MAX_ACCEL, UNSAFE_MODE.RAISE_LONGITUDINAL_LIMITS_TO_ISO_MAX))
|
||||
|
||||
def test_unsafe_iso_accel_actuation_limits(self):
|
||||
for accel in np.arange(ISO_MIN_ACCEL - 1000, ISO_MAX_ACCEL + 1000, 100):
|
||||
for controls_allowed in [True, False]:
|
||||
self.safety.set_controls_allowed(controls_allowed)
|
||||
self.safety.set_unsafe_mode(UNSAFE_MODE.RAISE_LONGITUDINAL_LIMITS_TO_ISO_MAX)
|
||||
if controls_allowed:
|
||||
send = ISO_MIN_ACCEL <= accel <= ISO_MAX_ACCEL
|
||||
else:
|
||||
send = accel == 0
|
||||
self.assertEqual(send, self.safety.safety_tx_hook(self._accel_msg(accel)))
|
||||
self.safety.set_unsafe_mode(UNSAFE_MODE.DEFAULT)
|
||||
for min_accel, max_accel, unsafe_mode in limits:
|
||||
for accel in np.arange(min_accel - 1, max_accel + 1, 0.1):
|
||||
for controls_allowed in [True, False]:
|
||||
self.safety.set_controls_allowed(controls_allowed)
|
||||
self.safety.set_unsafe_mode(unsafe_mode)
|
||||
if controls_allowed:
|
||||
should_tx = int(min_accel*1000) <= int(accel*1000) <= int(max_accel*1000)
|
||||
else:
|
||||
should_tx = np.isclose(accel, 0, atol=0.0001)
|
||||
self.assertEqual(should_tx, self._tx(self._accel_msg(accel)))
|
||||
|
||||
def test_torque_absolute_limits(self):
|
||||
for controls_allowed in [True, False]:
|
||||
|
@ -243,16 +149,16 @@ class TestToyotaSafety(unittest.TestCase):
|
|||
else:
|
||||
send = torque == 0
|
||||
|
||||
self.assertEqual(send, self.safety.safety_tx_hook(self._torque_msg(torque)))
|
||||
self.assertEqual(send, self._tx(self._torque_msg(torque)))
|
||||
|
||||
def test_non_realtime_limit_up(self):
|
||||
self.safety.set_controls_allowed(True)
|
||||
|
||||
self._set_prev_torque(0)
|
||||
self.assertTrue(self.safety.safety_tx_hook(self._torque_msg(MAX_RATE_UP)))
|
||||
self.assertTrue(self._tx(self._torque_msg(MAX_RATE_UP)))
|
||||
|
||||
self._set_prev_torque(0)
|
||||
self.assertFalse(self.safety.safety_tx_hook(self._torque_msg(MAX_RATE_UP + 1)))
|
||||
self.assertFalse(self._tx(self._torque_msg(MAX_RATE_UP + 1)))
|
||||
|
||||
def test_non_realtime_limit_down(self):
|
||||
self.safety.set_controls_allowed(True)
|
||||
|
@ -260,12 +166,12 @@ class TestToyotaSafety(unittest.TestCase):
|
|||
self.safety.set_toyota_rt_torque_last(1000)
|
||||
self.safety.set_toyota_torque_meas(500, 500)
|
||||
self.safety.set_toyota_desired_torque_last(1000)
|
||||
self.assertTrue(self.safety.safety_tx_hook(self._torque_msg(1000 - MAX_RATE_DOWN)))
|
||||
self.assertTrue(self._tx(self._torque_msg(1000 - MAX_RATE_DOWN)))
|
||||
|
||||
self.safety.set_toyota_rt_torque_last(1000)
|
||||
self.safety.set_toyota_torque_meas(500, 500)
|
||||
self.safety.set_toyota_desired_torque_last(1000)
|
||||
self.assertFalse(self.safety.safety_tx_hook(self._torque_msg(1000 - MAX_RATE_DOWN + 1)))
|
||||
self.assertFalse(self._tx(self._torque_msg(1000 - MAX_RATE_DOWN + 1)))
|
||||
|
||||
def test_exceed_torque_sensor(self):
|
||||
self.safety.set_controls_allowed(True)
|
||||
|
@ -274,9 +180,9 @@ class TestToyotaSafety(unittest.TestCase):
|
|||
self._set_prev_torque(0)
|
||||
for t in np.arange(0, MAX_TORQUE_ERROR + 10, 10):
|
||||
t *= sign
|
||||
self.assertTrue(self.safety.safety_tx_hook(self._torque_msg(t)))
|
||||
self.assertTrue(self._tx(self._torque_msg(t)))
|
||||
|
||||
self.assertFalse(self.safety.safety_tx_hook(self._torque_msg(sign * (MAX_TORQUE_ERROR + 10))))
|
||||
self.assertFalse(self._tx(self._torque_msg(sign * (MAX_TORQUE_ERROR + 10))))
|
||||
|
||||
def test_realtime_limit_up(self):
|
||||
self.safety.set_controls_allowed(True)
|
||||
|
@ -287,46 +193,42 @@ class TestToyotaSafety(unittest.TestCase):
|
|||
for t in np.arange(0, 380, 10):
|
||||
t *= sign
|
||||
self.safety.set_toyota_torque_meas(t, t)
|
||||
self.assertTrue(self.safety.safety_tx_hook(self._torque_msg(t)))
|
||||
self.assertFalse(self.safety.safety_tx_hook(self._torque_msg(sign * 380)))
|
||||
self.assertTrue(self._tx(self._torque_msg(t)))
|
||||
self.assertFalse(self._tx(self._torque_msg(sign * 380)))
|
||||
|
||||
self._set_prev_torque(0)
|
||||
for t in np.arange(0, 370, 10):
|
||||
t *= sign
|
||||
self.safety.set_toyota_torque_meas(t, t)
|
||||
self.assertTrue(self.safety.safety_tx_hook(self._torque_msg(t)))
|
||||
self.assertTrue(self._tx(self._torque_msg(t)))
|
||||
|
||||
# Increase timer to update rt_torque_last
|
||||
self.safety.set_timer(RT_INTERVAL + 1)
|
||||
self.assertTrue(self.safety.safety_tx_hook(self._torque_msg(sign * 370)))
|
||||
self.assertTrue(self.safety.safety_tx_hook(self._torque_msg(sign * 380)))
|
||||
self.assertTrue(self._tx(self._torque_msg(sign * 370)))
|
||||
self.assertTrue(self._tx(self._torque_msg(sign * 380)))
|
||||
|
||||
def test_torque_measurements(self):
|
||||
self.safety.safety_rx_hook(self._torque_meas_msg(50))
|
||||
self.safety.safety_rx_hook(self._torque_meas_msg(-50))
|
||||
self.safety.safety_rx_hook(self._torque_meas_msg(0))
|
||||
self.safety.safety_rx_hook(self._torque_meas_msg(0))
|
||||
self.safety.safety_rx_hook(self._torque_meas_msg(0))
|
||||
self.safety.safety_rx_hook(self._torque_meas_msg(0))
|
||||
for trq in [50, -50, 0, 0, 0, 0]:
|
||||
self._rx(self._torque_meas_msg(trq))
|
||||
|
||||
# toyota safety adds one to be conservative on rounding
|
||||
self.assertEqual(-51, self.safety.get_toyota_torque_meas_min())
|
||||
self.assertEqual(51, self.safety.get_toyota_torque_meas_max())
|
||||
|
||||
self.safety.safety_rx_hook(self._torque_meas_msg(0))
|
||||
self._rx(self._torque_meas_msg(0))
|
||||
self.assertEqual(1, self.safety.get_toyota_torque_meas_max())
|
||||
self.assertEqual(-51, self.safety.get_toyota_torque_meas_min())
|
||||
|
||||
self.safety.safety_rx_hook(self._torque_meas_msg(0))
|
||||
self._rx(self._torque_meas_msg(0))
|
||||
self.assertEqual(1, self.safety.get_toyota_torque_meas_max())
|
||||
self.assertEqual(-1, self.safety.get_toyota_torque_meas_min())
|
||||
|
||||
def test_gas_interceptor_safety_check(self):
|
||||
|
||||
self.safety.set_controls_allowed(0)
|
||||
self.assertTrue(self.safety.safety_tx_hook(self._send_interceptor_msg(0, 0x200)))
|
||||
self.assertFalse(self.safety.safety_tx_hook(self._send_interceptor_msg(0x1000, 0x200)))
|
||||
self.assertTrue(self._tx(interceptor_msg(0, 0x200)))
|
||||
self.assertFalse(self._tx(interceptor_msg(0x1000, 0x200)))
|
||||
self.safety.set_controls_allowed(1)
|
||||
self.assertTrue(self.safety.safety_tx_hook(self._send_interceptor_msg(0x1000, 0x200)))
|
||||
self.assertTrue(self._tx(interceptor_msg(0x1000, 0x200)))
|
||||
|
||||
def test_rx_hook(self):
|
||||
# checksum checks
|
||||
|
@ -336,30 +238,11 @@ class TestToyotaSafety(unittest.TestCase):
|
|||
to_push = self._torque_meas_msg(0)
|
||||
if msg == "pcm":
|
||||
to_push = self._pcm_cruise_msg(1)
|
||||
self.assertTrue(self.safety.safety_rx_hook(to_push))
|
||||
self.assertTrue(self._rx(to_push))
|
||||
to_push[0].RDHR = 0
|
||||
self.assertFalse(self.safety.safety_rx_hook(to_push))
|
||||
self.assertFalse(self._rx(to_push))
|
||||
self.assertFalse(self.safety.get_controls_allowed())
|
||||
|
||||
def test_fwd_hook(self):
|
||||
|
||||
buss = list(range(0x0, 0x3))
|
||||
msgs = list(range(0x1, 0x800))
|
||||
|
||||
blocked_msgs = [0x2E4, 0x412, 0x191]
|
||||
blocked_msgs += [0x343]
|
||||
for b in buss:
|
||||
for m in msgs:
|
||||
if b == 0:
|
||||
fwd_bus = 2
|
||||
elif b == 1:
|
||||
fwd_bus = -1
|
||||
elif b == 2:
|
||||
fwd_bus = -1 if m in blocked_msgs else 0
|
||||
|
||||
# assume len 8
|
||||
self.assertEqual(fwd_bus, self.safety.safety_fwd_hook(b, make_msg(b, m, 8)))
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
unittest.main()
|
||||
|
|
Loading…
Reference in New Issue