Subaru safety refactoring (#532)

* Update subaru safety tx messages

* Add Subaru preglobal driver torque limits

* fix preglobal desired_torque

* fix subaru preglobal torque rate limit and tx messages

* readability update for desired_torque

* fix preglobal tests

* Subaru safety refactoring, added missing legacy checks and updated test

* Remove subaru_global check from tests

* Reorder legacy constants, remove subaru_init

* Update Subaru legacy safety and tests to match dbc scaling factors

* remove scaling factor from torque_driver

* Change preglobal driver torque scaling factor

* Change driver torque factor to 10

* Fix preglobal dbc name

* Fix Subaru legacy safety test

* update openpilot commit

* init valid with one line
master
martinl 2020-06-12 01:51:05 +03:00 committed by GitHub
parent 76f347165f
commit b2c86eb66b
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 341 additions and 118 deletions

View File

@ -59,7 +59,7 @@ RUN cd /tmp && \
RUN cd /tmp && \
git clone https://github.com/commaai/openpilot.git tmppilot || true && \
cd /tmp/tmppilot && \
git pull && git checkout a777fa851e8a444cadb516fc4e35b641018f0559 && \
git pull && git checkout fab939eebc7fcf67dbbb52f9352ba67c41598746 && \
git submodule update --init cereal opendbc && \
mkdir /tmp/openpilot && \
cp -pR SConstruct tools/ selfdrive/ common/ cereal/ opendbc/ /tmp/openpilot && \

View File

@ -9,10 +9,11 @@ const int SUBARU_DRIVER_TORQUE_ALLOWANCE = 60;
const int SUBARU_DRIVER_TORQUE_FACTOR = 10;
const int SUBARU_STANDSTILL_THRSLD = 20; // about 1kph
const int SUBARU_L_DRIVER_TORQUE_ALLOWANCE = 75;
const int SUBARU_L_DRIVER_TORQUE_FACTOR = 10;
const CanMsg SUBARU_TX_MSGS[] = {{0x122, 0, 8}, {0x221, 0, 8}, {0x322, 0, 8}};
const CanMsg SUBARU_L_TX_MSGS[] = {{0x164, 0, 8}, {0x221, 0, 8}, {0x322, 0, 8}};
const int SUBARU_TX_MSGS_LEN = sizeof(SUBARU_TX_MSGS) / sizeof(SUBARU_TX_MSGS[0]);
const int SUBARU_L_TX_MSGS_LEN = sizeof(SUBARU_L_TX_MSGS) / sizeof(SUBARU_L_TX_MSGS[0]);
AddrCheckStruct subaru_rx_checks[] = {
{.msg = {{ 0x40, 0, 8, .check_checksum = true, .max_counter = 15U, .expected_timestep = 10000U}}},
@ -21,17 +22,19 @@ AddrCheckStruct subaru_rx_checks[] = {
{.msg = {{0x13a, 0, 8, .check_checksum = true, .max_counter = 15U, .expected_timestep = 20000U}}},
{.msg = {{0x240, 0, 8, .check_checksum = true, .max_counter = 15U, .expected_timestep = 50000U}}},
};
const int SUBARU_RX_CHECK_LEN = sizeof(subaru_rx_checks) / sizeof(subaru_rx_checks[0]);
const CanMsg SUBARU_L_TX_MSGS[] = {{0x161, 0, 8}, {0x164, 0, 8}};
const int SUBARU_L_TX_MSGS_LEN = sizeof(SUBARU_L_TX_MSGS) / sizeof(SUBARU_L_TX_MSGS[0]);
// TODO: do checksum and counter checks after adding the signals to the outback dbc file
AddrCheckStruct subaru_l_rx_checks[] = {
{.msg = {{0x140, 0, 8, .expected_timestep = 10000U}}},
{.msg = {{0x371, 0, 8, .expected_timestep = 20000U}}},
{.msg = {{0x144, 0, 8, .expected_timestep = 50000U}}},
};
const int SUBARU_RX_CHECK_LEN = sizeof(subaru_rx_checks) / sizeof(subaru_rx_checks[0]);
const int SUBARU_L_RX_CHECK_LEN = sizeof(subaru_l_rx_checks) / sizeof(subaru_l_rx_checks[0]);
bool subaru_global = false;
static uint8_t subaru_get_checksum(CAN_FIFOMailBox_TypeDef *to_push) {
return (uint8_t)GET_BYTE(to_push, 0);
}
@ -52,37 +55,23 @@ static uint8_t subaru_compute_checksum(CAN_FIFOMailBox_TypeDef *to_push) {
static int subaru_rx_hook(CAN_FIFOMailBox_TypeDef *to_push) {
bool valid = false;
if (subaru_global) {
valid = addr_safety_check(to_push, subaru_rx_checks, SUBARU_RX_CHECK_LEN,
subaru_get_checksum, subaru_compute_checksum, subaru_get_counter);
} else {
valid = addr_safety_check(to_push, subaru_l_rx_checks, SUBARU_L_RX_CHECK_LEN,
NULL, NULL, NULL);
}
bool valid = addr_safety_check(to_push, subaru_rx_checks, SUBARU_RX_CHECK_LEN,
subaru_get_checksum, subaru_compute_checksum, subaru_get_counter);
bool unsafe_allow_gas = unsafe_mode & UNSAFE_DISABLE_DISENGAGE_ON_GAS;
if (valid && (GET_BUS(to_push) == 0)) {
int addr = GET_ADDR(to_push);
if (((addr == 0x119) && subaru_global) ||
((addr == 0x371) && !subaru_global)) {
if (addr == 0x119) {
int torque_driver_new;
if (subaru_global) {
torque_driver_new = ((GET_BYTES_04(to_push) >> 16) & 0x7FF);
torque_driver_new = -1 * to_signed(torque_driver_new, 11);
} else {
torque_driver_new = (GET_BYTE(to_push, 3) >> 5) + (GET_BYTE(to_push, 4) << 3);
torque_driver_new = to_signed(torque_driver_new, 11);
}
torque_driver_new = ((GET_BYTES_04(to_push) >> 16) & 0x7FF);
torque_driver_new = -1 * to_signed(torque_driver_new, 11);
update_sample(&torque_driver, torque_driver_new);
}
// enter controls on rising edge of ACC, exit controls on ACC off
if (((addr == 0x240) && subaru_global) ||
((addr == 0x144) && !subaru_global)) {
int bit_shift = subaru_global ? 9 : 17;
int cruise_engaged = ((GET_BYTES_48(to_push) >> bit_shift) & 1);
if (addr == 0x240) {
int cruise_engaged = ((GET_BYTES_48(to_push) >> 9) & 1);
if (cruise_engaged && !cruise_engaged_prev) {
controls_allowed = 1;
}
@ -93,15 +82,15 @@ static int subaru_rx_hook(CAN_FIFOMailBox_TypeDef *to_push) {
}
// sample subaru wheel speed, averaging opposite corners
if ((addr == 0x13a) && subaru_global) {
if (addr == 0x13a) {
int subaru_speed = (GET_BYTES_04(to_push) >> 12) & 0x1FFF; // FR
subaru_speed += (GET_BYTES_48(to_push) >> 6) & 0x1FFF; // RL
subaru_speed /= 2;
vehicle_moving = subaru_speed > SUBARU_STANDSTILL_THRSLD;
}
// exit controls on rising edge of brake press (TODO: missing check for unsupported legacy models)
if ((addr == 0x139) && subaru_global) {
// exit controls on rising edge of brake press
if (addr == 0x139) {
bool brake_pressed = (GET_BYTES_48(to_push) & 0xFFF0) > 0;
if (brake_pressed && (!brake_pressed_prev || vehicle_moving)) {
controls_allowed = 0;
@ -110,18 +99,76 @@ static int subaru_rx_hook(CAN_FIFOMailBox_TypeDef *to_push) {
}
// exit controls on rising edge of gas press
if (((addr == 0x40) && subaru_global) ||
((addr == 0x140) && !subaru_global)) {
int byte = subaru_global ? 4 : 0;
bool gas_pressed = GET_BYTE(to_push, byte) != 0;
if (addr == 0x40) {
bool gas_pressed = GET_BYTE(to_push, 4) != 0;
if (!unsafe_allow_gas && gas_pressed && !gas_pressed_prev) {
controls_allowed = 0;
}
gas_pressed_prev = gas_pressed;
}
if ((safety_mode_cnt > RELAY_TRNS_TIMEOUT) &&
(((addr == 0x122) && subaru_global) || ((addr == 0x164) && !subaru_global))) {
if ((safety_mode_cnt > RELAY_TRNS_TIMEOUT) && (addr == 0x122)) {
relay_malfunction_set();
}
}
return valid;
}
static int subaru_legacy_rx_hook(CAN_FIFOMailBox_TypeDef *to_push) {
bool valid = addr_safety_check(to_push, subaru_l_rx_checks, SUBARU_L_RX_CHECK_LEN,
NULL, NULL, NULL);
bool unsafe_allow_gas = unsafe_mode & UNSAFE_DISABLE_DISENGAGE_ON_GAS;
if (valid && (GET_BUS(to_push) == 0)) {
int addr = GET_ADDR(to_push);
if (addr == 0x371) {
int torque_driver_new;
torque_driver_new = (GET_BYTE(to_push, 3) >> 5) + (GET_BYTE(to_push, 4) << 3);
torque_driver_new = to_signed(torque_driver_new, 11);
update_sample(&torque_driver, torque_driver_new);
}
// enter controls on rising edge of ACC, exit controls on ACC off
if (addr == 0x144) {
int cruise_engaged = ((GET_BYTES_48(to_push) >> 17) & 1);
if (cruise_engaged && !cruise_engaged_prev) {
controls_allowed = 1;
}
if (!cruise_engaged) {
controls_allowed = 0;
}
cruise_engaged_prev = cruise_engaged;
}
// sample subaru wheel speed, averaging opposite corners
if (addr == 0xD4) {
int subaru_speed = (GET_BYTES_04(to_push) >> 16) & 0xFFFF; // FR
subaru_speed += GET_BYTES_48(to_push) & 0xFFFF; // RL
subaru_speed /= 2;
vehicle_moving = subaru_speed > SUBARU_STANDSTILL_THRSLD;
}
// exit controls on rising edge of brake press
if (addr == 0xD1) {
bool brake_pressed = ((GET_BYTES_04(to_push) >> 16) & 0xFF) > 0;
if (brake_pressed && (!brake_pressed_prev || vehicle_moving)) {
controls_allowed = 0;
}
brake_pressed_prev = brake_pressed;
}
// exit controls on rising edge of gas press
if (addr == 0x140) {
bool gas_pressed = GET_BYTE(to_push, 0) != 0;
if (!unsafe_allow_gas && gas_pressed && !gas_pressed_prev) {
controls_allowed = 0;
}
gas_pressed_prev = gas_pressed;
}
if ((safety_mode_cnt > RELAY_TRNS_TIMEOUT) && (addr == 0x164)) {
relay_malfunction_set();
}
}
@ -132,8 +179,7 @@ static int subaru_tx_hook(CAN_FIFOMailBox_TypeDef *to_send) {
int tx = 1;
int addr = GET_ADDR(to_send);
if ((!msg_allowed(to_send, SUBARU_TX_MSGS, SUBARU_TX_MSGS_LEN) && subaru_global) ||
(!msg_allowed(to_send, SUBARU_L_TX_MSGS, SUBARU_L_TX_MSGS_LEN) && !subaru_global)) {
if (!msg_allowed(to_send, SUBARU_TX_MSGS, SUBARU_TX_MSGS_LEN)) {
tx = 0;
}
@ -142,12 +188,11 @@ static int subaru_tx_hook(CAN_FIFOMailBox_TypeDef *to_send) {
}
// steer cmd checks
if (((addr == 0x122) && subaru_global) ||
((addr == 0x164) && !subaru_global)) {
int bit_shift = subaru_global ? 16 : 8;
int desired_torque = ((GET_BYTES_04(to_send) >> bit_shift) & 0x1FFF);
if (addr == 0x122) {
int desired_torque = ((GET_BYTES_04(to_send) >> 16) & 0x1FFF);
bool violation = 0;
uint32_t ts = TIM2->CNT;
desired_torque = -1 * to_signed(desired_torque, 13);
if (controls_allowed) {
@ -194,6 +239,70 @@ static int subaru_tx_hook(CAN_FIFOMailBox_TypeDef *to_send) {
return tx;
}
static int subaru_legacy_tx_hook(CAN_FIFOMailBox_TypeDef *to_send) {
int tx = 1;
int addr = GET_ADDR(to_send);
if (!msg_allowed(to_send, SUBARU_L_TX_MSGS, SUBARU_L_TX_MSGS_LEN)) {
tx = 0;
}
if (relay_malfunction) {
tx = 0;
}
// steer cmd checks
if (addr == 0x164) {
int desired_torque = ((GET_BYTES_04(to_send) >> 8) & 0x1FFF);
bool violation = 0;
uint32_t ts = TIM2->CNT;
desired_torque = -1 * to_signed(desired_torque, 13);
if (controls_allowed) {
// *** global torque limit check ***
violation |= max_limit_check(desired_torque, SUBARU_MAX_STEER, -SUBARU_MAX_STEER);
// *** torque rate limit check ***
violation |= driver_limit_check(desired_torque, desired_torque_last, &torque_driver,
SUBARU_MAX_STEER, SUBARU_MAX_RATE_UP, SUBARU_MAX_RATE_DOWN,
SUBARU_L_DRIVER_TORQUE_ALLOWANCE, SUBARU_L_DRIVER_TORQUE_FACTOR);
// used next time
desired_torque_last = desired_torque;
// *** torque real time rate limit check ***
violation |= rt_rate_limit_check(desired_torque, rt_torque_last, SUBARU_MAX_RT_DELTA);
// every RT_INTERVAL set the new limits
uint32_t ts_elapsed = get_ts_elapsed(ts, ts_last);
if (ts_elapsed > SUBARU_RT_INTERVAL) {
rt_torque_last = desired_torque;
ts_last = ts;
}
}
// no torque if controls is not allowed
if (!controls_allowed && (desired_torque != 0)) {
violation = 1;
}
// reset to 0 if either controls is not allowed or there's a violation
if (violation || !controls_allowed) {
desired_torque_last = 0;
rt_torque_last = 0;
ts_last = ts;
}
if (violation) {
tx = 0;
}
}
return tx;
}
static int subaru_fwd_hook(int bus_num, CAN_FIFOMailBox_TypeDef *to_fwd) {
int bus_fwd = -1;
@ -202,14 +311,12 @@ static int subaru_fwd_hook(int bus_num, CAN_FIFOMailBox_TypeDef *to_fwd) {
bus_fwd = 2; // Camera CAN
}
if (bus_num == 2) {
// 290 is LKAS for Global Platform
// 356 is LKAS for outback 2015
// 545 is ES_Distance
// 802 is ES_LKAS
// Global platform
// 0x122 ES_LKAS
// 0x221 ES_Distance
// 0x322 ES_LKAS_State
int addr = GET_ADDR(to_fwd);
int block_msg = ((addr == 0x122) && subaru_global) ||
((addr == 0x164) && !subaru_global) ||
(addr == 0x221) || (addr == 0x322);
int block_msg = ((addr == 0x122) || (addr == 0x221) || (addr == 0x322));
if (!block_msg) {
bus_fwd = 0; // Main CAN
}
@ -219,22 +326,30 @@ static int subaru_fwd_hook(int bus_num, CAN_FIFOMailBox_TypeDef *to_fwd) {
return bus_fwd;
}
static void subaru_init(int16_t param) {
UNUSED(param);
controls_allowed = false;
relay_malfunction_reset();
subaru_global = true;
}
static int subaru_legacy_fwd_hook(int bus_num, CAN_FIFOMailBox_TypeDef *to_fwd) {
int bus_fwd = -1;
static void subaru_legacy_init(int16_t param) {
UNUSED(param);
controls_allowed = false;
relay_malfunction_reset();
subaru_global = false;
if (!relay_malfunction) {
if (bus_num == 0) {
bus_fwd = 2; // Camera CAN
}
if (bus_num == 2) {
// Preglobal platform
// 0x161 is ES_CruiseThrottle
// 0x164 is ES_LKAS
int addr = GET_ADDR(to_fwd);
int block_msg = ((addr == 0x161) || (addr == 0x164));
if (!block_msg) {
bus_fwd = 0; // Main CAN
}
}
}
// fallback to do not forward
return bus_fwd;
}
const safety_hooks subaru_hooks = {
.init = subaru_init,
.init = nooutput_init,
.rx = subaru_rx_hook,
.tx = subaru_tx_hook,
.tx_lin = nooutput_tx_lin_hook,
@ -244,11 +359,11 @@ const safety_hooks subaru_hooks = {
};
const safety_hooks subaru_legacy_hooks = {
.init = subaru_legacy_init,
.rx = subaru_rx_hook,
.tx = subaru_tx_hook,
.init = nooutput_init,
.rx = subaru_legacy_rx_hook,
.tx = subaru_legacy_tx_hook,
.tx_lin = nooutput_tx_lin_hook,
.fwd = subaru_fwd_hook,
.fwd = subaru_legacy_fwd_hook,
.addr_check = subaru_l_rx_checks,
.addr_check_len = sizeof(subaru_l_rx_checks) / sizeof(subaru_l_rx_checks[0]),
};

View File

@ -70,7 +70,6 @@ void set_honda_alt_brake_msg(bool);
void set_honda_bosch_long(bool c);
int get_honda_hw(void);
bool get_subaru_global(void);
""")
libpandasafety = ffi.dlopen(libpandasafety_fn)

View File

@ -138,10 +138,6 @@ int get_hw_type(void){
return hw_type;
}
bool get_subaru_global(void){
return subaru_global;
}
void set_timer(uint32_t t){
timer.CNT = t;
}

View File

@ -28,7 +28,7 @@ class TestSubaruSafety(common.PandaSafetyTest):
STANDSTILL_THRESHOLD = 20 # 1kph (see dbc file)
RELAY_MALFUNCTION_ADDR = 0x122
RELAY_MALFUNCTION_BUS = 0
FWD_BLACKLISTED_ADDRS = {2: [290, 545, 802]}
FWD_BLACKLISTED_ADDRS = {2: [0x122, 0x221, 0x322]}
FWD_BUS_LOOKUP = {0: 2, 2: 0}
def setUp(self):
@ -165,52 +165,5 @@ class TestSubaruSafety(common.PandaSafetyTest):
self.assertTrue(self._tx(self._torque_msg(sign * (MAX_RT_DELTA + 1))))
class TestSubaruLegacySafety(TestSubaruSafety):
TX_MSGS = [[0x164, 0], [0x221, 0], [0x322, 0]]
RELAY_MALFUNCTION_ADDR = 0x164
RELAY_MALFUNCTION_BUS = 0
FWD_BLACKLISTED_ADDRS = {2: [356, 545, 802]}
FWD_BUS_LOOKUP = {0: 2, 2: 0}
def setUp(self):
self.packer = CANPackerPanda("subaru_outback_2015_eyesight")
self.safety = libpandasafety_py.libpandasafety
self.safety.set_safety_hooks(Panda.SAFETY_SUBARU_LEGACY, 0)
self.safety.init_tests()
# subaru legacy safety doesn't have brake checks
def test_prev_brake(self):
pass
def test_not_allow_brake_when_moving(self):
pass
def test_allow_brake_at_zero_speed(self):
pass
# doesn't have speed checks either
def test_sample_speed(self):
pass
def _torque_driver_msg(self, torque):
# TODO: figure out if this scaling factor from the DBC is right.
# if it is, remove the scaling from here and put it in the safety code
values = {"Steer_Torque_Sensor": torque * 8}
return self.packer.make_can_msg_panda("Steering_Torque", 0, values)
def _torque_msg(self, torque):
values = {"LKAS_Command": torque}
return self.packer.make_can_msg_panda("ES_LKAS", 0, values)
def _gas_msg(self, gas):
values = {"Throttle_Pedal": gas}
return self.packer.make_can_msg_panda("Throttle", 0, values)
def _pcm_status_msg(self, enable):
values = {"Cruise_Activated": enable}
return self.packer.make_can_msg_panda("CruiseControl", 0, values)
if __name__ == "__main__":
unittest.main()

View File

@ -0,0 +1,160 @@
#!/usr/bin/env python3
import unittest
import numpy as np
from panda import Panda
from panda.tests.safety import libpandasafety_py
import panda.tests.safety.common as common
from panda.tests.safety.common import CANPackerPanda
MAX_RATE_UP = 50
MAX_RATE_DOWN = 70
MAX_STEER = 2047
MAX_RT_DELTA = 940
RT_INTERVAL = 250000
DRIVER_TORQUE_ALLOWANCE = 75
DRIVER_TORQUE_FACTOR = 10
class TestSubaruLegacySafety(common.PandaSafetyTest):
cnt_gas = 0
TX_MSGS = [[0x161, 0], [0x164, 0]]
STANDSTILL_THRESHOLD = 20 # 1kph (see dbc file)
RELAY_MALFUNCTION_ADDR = 0x164
RELAY_MALFUNCTION_BUS = 0
FWD_BLACKLISTED_ADDRS = {2: [0x161, 0x164]}
FWD_BUS_LOOKUP = {0: 2, 2: 0}
def setUp(self):
self.packer = CANPackerPanda("subaru_outback_2015_generated")
self.safety = libpandasafety_py.libpandasafety
self.safety.set_safety_hooks(Panda.SAFETY_SUBARU_LEGACY, 0)
self.safety.init_tests()
def _set_prev_torque(self, t):
self.safety.set_desired_torque_last(t)
self.safety.set_rt_torque_last(t)
def _torque_driver_msg(self, torque):
values = {"Steer_Torque_Sensor": torque}
return self.packer.make_can_msg_panda("Steering_Torque", 0, values)
def _speed_msg(self, speed):
# subaru safety doesn't use the scaled value, so undo the scaling
values = {s: speed*0.0592 for s in ["FR", "FL", "RR", "RL"]}
return self.packer.make_can_msg_panda("Wheel_Speeds", 0, values)
def _brake_msg(self, brake):
values = {"Brake_Pedal": brake}
return self.packer.make_can_msg_panda("Brake_Pedal", 0, values)
def _torque_msg(self, torque):
values = {"LKAS_Command": torque}
return self.packer.make_can_msg_panda("ES_LKAS", 0, values)
def _gas_msg(self, gas):
values = {"Throttle_Pedal": gas, "Counter": self.cnt_gas % 4}
self.__class__.cnt_gas += 1
return self.packer.make_can_msg_panda("Throttle", 0, values)
def _pcm_status_msg(self, enable):
values = {"Cruise_Activated": enable}
return self.packer.make_can_msg_panda("CruiseControl", 0, values)
def _set_torque_driver(self, min_t, max_t):
for _ in range(0, 5):
self._rx(self._torque_driver_msg(min_t))
self._rx(self._torque_driver_msg(max_t))
def test_steer_safety_check(self):
for enabled in [0, 1]:
for t in range(-3000, 3000):
self.safety.set_controls_allowed(enabled)
self._set_prev_torque(t)
block = abs(t) > MAX_STEER or (not enabled and abs(t) > 0)
self.assertEqual(not block, self._tx(self._torque_msg(t)))
def test_non_realtime_limit_up(self):
self._set_torque_driver(0, 0)
self.safety.set_controls_allowed(True)
self._set_prev_torque(0)
self.assertTrue(self._tx(self._torque_msg(MAX_RATE_UP)))
self._set_prev_torque(0)
self.assertTrue(self._tx(self._torque_msg(-MAX_RATE_UP)))
self._set_prev_torque(0)
self.assertFalse(self._tx(self._torque_msg(MAX_RATE_UP + 1)))
self.safety.set_controls_allowed(True)
self._set_prev_torque(0)
self.assertFalse(self._tx(self._torque_msg(-MAX_RATE_UP - 1)))
def test_non_realtime_limit_down(self):
self._set_torque_driver(0, 0)
self.safety.set_controls_allowed(True)
def test_against_torque_driver(self):
self.safety.set_controls_allowed(True)
for sign in [-1, 1]:
for t in np.arange(0, DRIVER_TORQUE_ALLOWANCE + 1, 1):
t *= -sign
self._set_torque_driver(t, t)
self._set_prev_torque(MAX_STEER * sign)
self.assertTrue(self._tx(self._torque_msg(MAX_STEER * sign)))
self._set_torque_driver(DRIVER_TORQUE_ALLOWANCE + 1, DRIVER_TORQUE_ALLOWANCE + 1)
self.assertFalse(self._tx(self._torque_msg(-MAX_STEER)))
# arbitrary high driver torque to ensure max steer torque is allowed
max_driver_torque = int(MAX_STEER / DRIVER_TORQUE_FACTOR + DRIVER_TORQUE_ALLOWANCE + 1)
# spot check some individual cases
for sign in [-1, 1]:
driver_torque = (DRIVER_TORQUE_ALLOWANCE + 10) * sign
torque_desired = (MAX_STEER - 10 * DRIVER_TORQUE_FACTOR) * sign
delta = 1 * sign
self._set_prev_torque(torque_desired)
self._set_torque_driver(-driver_torque, -driver_torque)
self.assertTrue(self._tx(self._torque_msg(torque_desired)))
self._set_prev_torque(torque_desired + delta)
self._set_torque_driver(-driver_torque, -driver_torque)
self.assertFalse(self._tx(self._torque_msg(torque_desired + delta)))
self._set_prev_torque(MAX_STEER * sign)
self._set_torque_driver(-max_driver_torque * sign, -max_driver_torque * sign)
self.assertTrue(self._tx(self._torque_msg((MAX_STEER - MAX_RATE_DOWN) * sign)))
self._set_prev_torque(MAX_STEER * sign)
self._set_torque_driver(-max_driver_torque * sign, -max_driver_torque * sign)
self.assertTrue(self._tx(self._torque_msg(0)))
self._set_prev_torque(MAX_STEER * sign)
self._set_torque_driver(-max_driver_torque * sign, -max_driver_torque * sign)
self.assertFalse(self._tx(self._torque_msg((MAX_STEER - MAX_RATE_DOWN + 1) * sign)))
def test_realtime_limits(self):
self.safety.set_controls_allowed(True)
for sign in [-1, 1]:
self.safety.init_tests()
self._set_prev_torque(0)
self._set_torque_driver(0, 0)
for t in np.arange(0, MAX_RT_DELTA, 1):
t *= sign
self.assertTrue(self._tx(self._torque_msg(t)))
self.assertFalse(self._tx(self._torque_msg(sign * (MAX_RT_DELTA + 1))))
self._set_prev_torque(0)
for t in np.arange(0, MAX_RT_DELTA, 1):
t *= sign
self.assertTrue(self._tx(self._torque_msg(t)))
# Increase timer to update rt_torque_last
self.safety.set_timer(RT_INTERVAL + 1)
self.assertTrue(self._tx(self._torque_msg(sign * (MAX_RT_DELTA - 1))))
self.assertTrue(self._tx(self._torque_msg(sign * (MAX_RT_DELTA + 1))))
if __name__ == "__main__":
unittest.main()