Volkswagen safety updates: Phase 2 (#445)
* CRC and counter checks, standstill/brake checks * Clean up a tsk_06 that snuck through * Be consistent about how we call _msg_esp_05 * Reduce scope: haunted by the ghost of MISRA future * Improved check/test for in-motion braking * MISRA styling fixmaster
parent
b2ffaae60e
commit
598074c192
|
@ -57,6 +57,21 @@ int safety_fwd_hook(int bus_num, CAN_FIFOMailBox_TypeDef *to_fwd) {
|
|||
return current_hooks->fwd(bus_num, to_fwd);
|
||||
}
|
||||
|
||||
// Given a CRC-8 poly, generate a static lookup table to use with a fast CRC-8
|
||||
// algorithm. Called at init time for safety modes using CRC-8.
|
||||
void gen_crc_lookup_table(uint8_t poly, uint8_t crc_lut[]) {
|
||||
for (int i = 0; i < 256; i++) {
|
||||
uint8_t crc = i;
|
||||
for (int j = 0; j < 8; j++) {
|
||||
if ((crc & 0x80U) != 0U)
|
||||
crc = (uint8_t)((crc << 1) ^ poly);
|
||||
else
|
||||
crc <<= 1;
|
||||
}
|
||||
crc_lut[i] = crc;
|
||||
}
|
||||
}
|
||||
|
||||
bool msg_allowed(int addr, int bus, const AddrBus addr_list[], int len) {
|
||||
bool allowed = false;
|
||||
for (int i = 0; i < len; i++) {
|
||||
|
|
|
@ -8,7 +8,9 @@ const int VOLKSWAGEN_DRIVER_TORQUE_ALLOWANCE = 80;
|
|||
const int VOLKSWAGEN_DRIVER_TORQUE_FACTOR = 3;
|
||||
|
||||
// Safety-relevant CAN messages for the Volkswagen MQB platform
|
||||
#define MSG_ESP_19 0x0B2 // RX from ABS, for wheel speeds
|
||||
#define MSG_EPS_01 0x09F // RX from EPS, for driver steering torque
|
||||
#define MSG_ESP_05 0x106 // RX from ABS, for brake switch state
|
||||
#define MSG_MOTOR_20 0x121 // RX from ECU, for driver throttle input
|
||||
#define MSG_ACC_06 0x122 // RX from ACC radar, for status and engagement
|
||||
#define MSG_HCA_01 0x126 // TX by OP, Heading Control Assist steering torque
|
||||
|
@ -19,11 +21,12 @@ const int VOLKSWAGEN_DRIVER_TORQUE_FACTOR = 3;
|
|||
const AddrBus VOLKSWAGEN_MQB_TX_MSGS[] = {{MSG_HCA_01, 0}, {MSG_GRA_ACC_01, 0}, {MSG_GRA_ACC_01, 2}, {MSG_LDW_02, 0}};
|
||||
const int VOLKSWAGEN_MQB_TX_MSGS_LEN = sizeof(VOLKSWAGEN_MQB_TX_MSGS) / sizeof(VOLKSWAGEN_MQB_TX_MSGS[0]);
|
||||
|
||||
// TODO: do checksum and counter checks
|
||||
AddrCheckStruct volkswagen_mqb_rx_checks[] = {
|
||||
{.addr = {MSG_EPS_01}, .bus = 0, .expected_timestep = 10000U},
|
||||
{.addr = {MSG_ACC_06}, .bus = 0, .expected_timestep = 20000U},
|
||||
{.addr = {MSG_MOTOR_20}, .bus = 0, .expected_timestep = 20000U},
|
||||
{.addr = {MSG_ESP_19}, .bus = 0, .check_checksum = false, .max_counter = 0U, .expected_timestep = 10000U},
|
||||
{.addr = {MSG_EPS_01}, .bus = 0, .check_checksum = true, .max_counter = 15U, .expected_timestep = 10000U},
|
||||
{.addr = {MSG_ESP_05}, .bus = 0, .check_checksum = true, .max_counter = 15U, .expected_timestep = 20000U},
|
||||
{.addr = {MSG_MOTOR_20}, .bus = 0, .check_checksum = true, .max_counter = 15U, .expected_timestep = 20000U},
|
||||
{.addr = {MSG_ACC_06}, .bus = 0, .check_checksum = true, .max_counter = 15U, .expected_timestep = 20000U},
|
||||
};
|
||||
const int VOLKSWAGEN_MQB_RX_CHECKS_LEN = sizeof(volkswagen_mqb_rx_checks) / sizeof(volkswagen_mqb_rx_checks[0]);
|
||||
|
||||
|
@ -32,9 +35,56 @@ struct sample_t volkswagen_torque_driver; // Last few driver torques measured
|
|||
int volkswagen_rt_torque_last = 0;
|
||||
int volkswagen_desired_torque_last = 0;
|
||||
uint32_t volkswagen_ts_last = 0;
|
||||
bool volkswagen_moving = false;
|
||||
bool volkswagen_brake_pressed_prev = false;
|
||||
int volkswagen_gas_prev = 0;
|
||||
int volkswagen_torque_msg = 0;
|
||||
int volkswagen_lane_msg = 0;
|
||||
uint8_t volkswagen_crc8_lut_8h2f[256]; // Static lookup table for CRC8 poly 0x2F, aka 8H2F/AUTOSAR
|
||||
|
||||
|
||||
static uint8_t volkswagen_get_checksum(CAN_FIFOMailBox_TypeDef *to_push) {
|
||||
return (uint8_t)GET_BYTE(to_push, 0);
|
||||
}
|
||||
|
||||
static uint8_t volkswagen_get_counter(CAN_FIFOMailBox_TypeDef *to_push) {
|
||||
return (uint8_t)GET_BYTE(to_push, 1) & 0xFU;
|
||||
}
|
||||
|
||||
static uint8_t volkswagen_mqb_compute_crc(CAN_FIFOMailBox_TypeDef *to_push) {
|
||||
int addr = GET_ADDR(to_push);
|
||||
int len = GET_LEN(to_push);
|
||||
|
||||
// This is CRC-8H2F/AUTOSAR with a twist. See the OpenDBC implementation
|
||||
// of this algorithm for a version with explanatory comments.
|
||||
|
||||
uint8_t crc = 0xFFU;
|
||||
for (int i = 1; i < len; i++) {
|
||||
crc ^= (uint8_t)GET_BYTE(to_push, i);
|
||||
crc = volkswagen_crc8_lut_8h2f[crc];
|
||||
}
|
||||
|
||||
uint8_t counter = volkswagen_get_counter(to_push);
|
||||
switch(addr) {
|
||||
case MSG_EPS_01:
|
||||
crc ^= (uint8_t[]){0xF5,0xF5,0xF5,0xF5,0xF5,0xF5,0xF5,0xF5,0xF5,0xF5,0xF5,0xF5,0xF5,0xF5,0xF5,0xF5}[counter];
|
||||
break;
|
||||
case MSG_ESP_05:
|
||||
crc ^= (uint8_t[]){0x07,0x07,0x07,0x07,0x07,0x07,0x07,0x07,0x07,0x07,0x07,0x07,0x07,0x07,0x07,0x07}[counter];
|
||||
break;
|
||||
case MSG_MOTOR_20:
|
||||
crc ^= (uint8_t[]){0xE9,0x65,0xAE,0x6B,0x7B,0x35,0xE5,0x5F,0x4E,0xC7,0x86,0xA2,0xBB,0xDD,0xEB,0xB4}[counter];
|
||||
break;
|
||||
case MSG_ACC_06:
|
||||
crc ^= (uint8_t[]){0x37,0x7D,0xF3,0xA9,0x18,0x46,0x6D,0x4D,0x3D,0x71,0x92,0x9C,0xE5,0x32,0x10,0xB9}[counter];
|
||||
break;
|
||||
default: // Undefined CAN message, CRC check expected to fail
|
||||
break;
|
||||
}
|
||||
crc = volkswagen_crc8_lut_8h2f[crc];
|
||||
|
||||
return crc ^ 0xFFU;
|
||||
}
|
||||
|
||||
static void volkswagen_mqb_init(int16_t param) {
|
||||
UNUSED(param);
|
||||
|
@ -43,17 +93,29 @@ static void volkswagen_mqb_init(int16_t param) {
|
|||
relay_malfunction = false;
|
||||
volkswagen_torque_msg = MSG_HCA_01;
|
||||
volkswagen_lane_msg = MSG_LDW_02;
|
||||
gen_crc_lookup_table(0x2F, volkswagen_crc8_lut_8h2f);
|
||||
}
|
||||
|
||||
static int volkswagen_mqb_rx_hook(CAN_FIFOMailBox_TypeDef *to_push) {
|
||||
|
||||
bool valid = addr_safety_check(to_push, volkswagen_mqb_rx_checks, VOLKSWAGEN_MQB_RX_CHECKS_LEN,
|
||||
NULL, NULL, NULL);
|
||||
volkswagen_get_checksum, volkswagen_mqb_compute_crc, volkswagen_get_counter);
|
||||
|
||||
if (valid) {
|
||||
int bus = GET_BUS(to_push);
|
||||
int addr = GET_ADDR(to_push);
|
||||
|
||||
// Update in-motion state by sampling front wheel speeds
|
||||
// Signal: ESP_19.ESP_VL_Radgeschw_02 (front left) in scaled km/h
|
||||
// Signal: ESP_19.ESP_VR_Radgeschw_02 (front right) in scaled km/h
|
||||
if ((bus == 0) && (addr == MSG_ESP_19)) {
|
||||
int wheel_speed_fl = GET_BYTE(to_push, 4) | (GET_BYTE(to_push, 5) << 8);
|
||||
int wheel_speed_fr = GET_BYTE(to_push, 6) | (GET_BYTE(to_push, 7) << 8);
|
||||
// Check for average front speed in excess of 0.3m/s, 1.08km/h
|
||||
// DBC speed scale 0.0075: 0.3m/s = 144, sum both wheels to compare
|
||||
volkswagen_moving = (wheel_speed_fl + wheel_speed_fr) > 288;
|
||||
}
|
||||
|
||||
// Update driver input torque samples
|
||||
// Signal: EPS_01.Driver_Strain (absolute torque)
|
||||
// Signal: EPS_01.Driver_Strain_VZ (direction)
|
||||
|
@ -83,6 +145,16 @@ static int volkswagen_mqb_rx_hook(CAN_FIFOMailBox_TypeDef *to_push) {
|
|||
volkswagen_gas_prev = gas;
|
||||
}
|
||||
|
||||
// Exit controls on rising edge of brake press
|
||||
// Signal: ESP_05.ESP_Fahrer_bremst
|
||||
if ((bus == 0) && (addr == MSG_ESP_05)) {
|
||||
bool brake_pressed = (GET_BYTE(to_push, 3) & 0x4) >> 2;
|
||||
if (brake_pressed && (!(volkswagen_brake_pressed_prev) || volkswagen_moving)) {
|
||||
controls_allowed = 0;
|
||||
}
|
||||
volkswagen_brake_pressed_prev = brake_pressed;
|
||||
}
|
||||
|
||||
// If there are HCA messages on bus 0 not sent by OP, there's a relay problem
|
||||
if ((safety_mode_cnt > RELAY_TRNS_TIMEOUT) && (bus == 0) && (addr == MSG_HCA_01)) {
|
||||
relay_malfunction = true;
|
||||
|
|
|
@ -49,6 +49,7 @@ bool driver_limit_check(int val, int val_last, struct sample_t *val_driver,
|
|||
const int MAX_ALLOWANCE, const int DRIVER_FACTOR);
|
||||
bool rt_rate_limit_check(int val, int val_last, const int MAX_RT_DELTA);
|
||||
float interpolate(struct lookup_t xy, float x);
|
||||
void gen_crc_lookup_table(uint8_t poly, uint8_t crc_lut[]);
|
||||
bool msg_allowed(int addr, int bus, const AddrBus addr_list[], int len);
|
||||
int get_addr_check_index(CAN_FIFOMailBox_TypeDef *to_push, AddrCheckStruct addr_list[], const int len);
|
||||
void update_counter(AddrCheckStruct addr_list[], int index, uint8_t counter);
|
||||
|
|
|
@ -9,3 +9,4 @@ requests
|
|||
flake8==3.7.9
|
||||
pylint==2.4.3
|
||||
cffi==1.11.4
|
||||
crcmod
|
||||
|
|
|
@ -94,6 +94,8 @@ void init_tests_volkswagen(void);
|
|||
int get_volkswagen_gas_prev(void);
|
||||
int get_volkswagen_torque_driver_min(void);
|
||||
int get_volkswagen_torque_driver_max(void);
|
||||
bool get_volkswagen_moving(void);
|
||||
bool get_volkswagen_brake_pressed_prev(void);
|
||||
void set_volkswagen_desired_torque_last(int t);
|
||||
void set_volkswagen_rt_torque_last(int t);
|
||||
void set_volkswagen_torque_driver(int min, int max);
|
||||
|
|
|
@ -232,6 +232,14 @@ void set_volkswagen_desired_torque_last(int t){
|
|||
volkswagen_desired_torque_last = t;
|
||||
}
|
||||
|
||||
int get_volkswagen_moving(void){
|
||||
return volkswagen_moving;
|
||||
}
|
||||
|
||||
int get_volkswagen_brake_pressed_prev(void){
|
||||
return volkswagen_brake_pressed_prev;
|
||||
}
|
||||
|
||||
int get_volkswagen_gas_prev(void){
|
||||
return volkswagen_gas_prev;
|
||||
}
|
||||
|
@ -334,6 +342,9 @@ void init_tests_subaru(void){
|
|||
|
||||
void init_tests_volkswagen(void){
|
||||
init_tests();
|
||||
volkswagen_moving = false;
|
||||
volkswagen_brake_pressed_prev = false;
|
||||
volkswagen_gas_prev = 0;
|
||||
volkswagen_torque_driver.min = 0;
|
||||
volkswagen_torque_driver.max = 0;
|
||||
volkswagen_desired_torque_last = 0;
|
||||
|
|
|
@ -1,10 +1,11 @@
|
|||
#!/usr/bin/env python3
|
||||
import unittest
|
||||
import numpy as np
|
||||
import crcmod
|
||||
from panda import Panda
|
||||
from panda.tests.safety import libpandasafety_py
|
||||
from panda.tests.safety.common import test_relay_malfunction, make_msg, \
|
||||
test_manually_enable_controls_allowed, test_spam_can_buses
|
||||
test_manually_enable_controls_allowed, test_spam_can_buses, MAX_WRONG_COUNTERS
|
||||
|
||||
MAX_RATE_UP = 4
|
||||
MAX_RATE_DOWN = 10
|
||||
|
@ -15,9 +16,11 @@ RT_INTERVAL = 250000
|
|||
DRIVER_TORQUE_ALLOWANCE = 80
|
||||
DRIVER_TORQUE_FACTOR = 3
|
||||
|
||||
MSG_ESP_19 = 0xB2 # RX from ABS, for wheel speeds
|
||||
MSG_EPS_01 = 0x9F # RX from EPS, for driver steering torque
|
||||
MSG_ACC_06 = 0x122 # RX from ACC radar, for status and engagement
|
||||
MSG_ESP_05 = 0x106 # RX from ABS, for brake light state
|
||||
MSG_MOTOR_20 = 0x121 # RX from ECU, for driver throttle input
|
||||
MSG_ACC_06 = 0x122 # RX from ACC radar, for status and engagement
|
||||
MSG_HCA_01 = 0x126 # TX by OP, Heading Control Assist steering torque
|
||||
MSG_GRA_ACC_01 = 0x12B # TX by OP, ACC control buttons for cancel/resume
|
||||
MSG_LDW_02 = 0x397 # TX by OP, Lane line recognition and text alerts
|
||||
|
@ -31,17 +34,66 @@ def sign(a):
|
|||
else:
|
||||
return -1
|
||||
|
||||
# Python crcmod works differently somehow from every other CRC calculator. The
|
||||
# implied leading 1 on the polynomial isn't a problem, but to get the right
|
||||
# result for CRC-8H2F/AUTOSAR, we have to feed it initCrc 0x00 instead of 0xFF.
|
||||
volkswagen_crc_8h2f = crcmod.mkCrcFun(0x12F, initCrc=0x00, rev=False, xorOut=0xFF)
|
||||
|
||||
def volkswagen_mqb_crc(msg, addr, len_msg):
|
||||
# This is CRC-8H2F/AUTOSAR with a twist. See the OpenDBC implementation of
|
||||
# this algorithm for a version with explanatory comments.
|
||||
msg_bytes = msg.RDLR.to_bytes(4, 'little') + msg.RDHR.to_bytes(4, 'little')
|
||||
counter = (msg.RDLR & 0xF00) >> 8
|
||||
if addr == MSG_EPS_01:
|
||||
magic_pad = b'\xF5\xF5\xF5\xF5\xF5\xF5\xF5\xF5\xF5\xF5\xF5\xF5\xF5\xF5\xF5\xF5'[counter]
|
||||
elif addr == MSG_ESP_05:
|
||||
magic_pad = b'\x07\x07\x07\x07\x07\x07\x07\x07\x07\x07\x07\x07\x07\x07\x07\x07'[counter]
|
||||
elif addr == MSG_MOTOR_20:
|
||||
magic_pad = b'\xE9\x65\xAE\x6B\x7B\x35\xE5\x5F\x4E\xC7\x86\xA2\xBB\xDD\xEB\xB4'[counter]
|
||||
elif addr == MSG_ACC_06:
|
||||
magic_pad = b'\x37\x7D\xF3\xA9\x18\x46\x6D\x4D\x3D\x71\x92\x9C\xE5\x32\x10\xB9'[counter]
|
||||
elif addr == MSG_HCA_01:
|
||||
magic_pad = b'\xDA\xDA\xDA\xDA\xDA\xDA\xDA\xDA\xDA\xDA\xDA\xDA\xDA\xDA\xDA\xDA'[counter]
|
||||
elif addr == MSG_GRA_ACC_01:
|
||||
magic_pad = b'\x6A\x38\xB4\x27\x22\xEF\xE1\xBB\xF8\x80\x84\x49\xC7\x9E\x1E\x2B'[counter]
|
||||
else:
|
||||
magic_pad = None
|
||||
return volkswagen_crc_8h2f(msg_bytes[1:len_msg] + magic_pad.to_bytes(1, 'little'))
|
||||
|
||||
class TestVolkswagenMqbSafety(unittest.TestCase):
|
||||
@classmethod
|
||||
def setUp(cls):
|
||||
cls.safety = libpandasafety_py.libpandasafety
|
||||
cls.safety.set_safety_hooks(Panda.SAFETY_VOLKSWAGEN_MQB, 0)
|
||||
cls.safety.init_tests_volkswagen()
|
||||
cls.cnt_eps_01 = 0
|
||||
cls.cnt_esp_05 = 0
|
||||
cls.cnt_motor_20 = 0
|
||||
cls.cnt_acc_06 = 0
|
||||
cls.cnt_hca_01 = 0
|
||||
cls.cnt_gra_acc_01 = 0
|
||||
|
||||
def _set_prev_torque(self, t):
|
||||
self.safety.set_volkswagen_desired_torque_last(t)
|
||||
self.safety.set_volkswagen_rt_torque_last(t)
|
||||
|
||||
# Wheel speeds
|
||||
def _esp_19_msg(self, speed):
|
||||
wheel_speed_scaled = int(speed / 0.0075)
|
||||
to_send = make_msg(0, MSG_ESP_19)
|
||||
to_send[0].RDLR = wheel_speed_scaled | (wheel_speed_scaled << 16)
|
||||
to_send[0].RDHR = wheel_speed_scaled | (wheel_speed_scaled << 16)
|
||||
return to_send
|
||||
|
||||
# Brake light switch
|
||||
def _esp_05_msg(self, brake):
|
||||
to_send = make_msg(0, MSG_ESP_05)
|
||||
to_send[0].RDLR = (0x1 << 26) if brake else 0
|
||||
to_send[0].RDLR |= (self.cnt_esp_05 % 16) << 8
|
||||
to_send[0].RDLR |= volkswagen_mqb_crc(to_send[0], MSG_ESP_05, 8)
|
||||
self.cnt_esp_05 += 1
|
||||
return to_send
|
||||
|
||||
# Driver steering input torque
|
||||
def _eps_01_msg(self, torque):
|
||||
to_send = make_msg(0, MSG_EPS_01)
|
||||
|
@ -49,6 +101,9 @@ class TestVolkswagenMqbSafety(unittest.TestCase):
|
|||
to_send[0].RDHR = ((t & 0x1FFF) << 8)
|
||||
if torque < 0:
|
||||
to_send[0].RDHR |= 0x1 << 23
|
||||
to_send[0].RDLR |= (self.cnt_eps_01 % 16) << 8
|
||||
to_send[0].RDLR |= volkswagen_mqb_crc(to_send[0], MSG_EPS_01, 8)
|
||||
self.cnt_eps_01 += 1
|
||||
return to_send
|
||||
|
||||
# openpilot steering output torque
|
||||
|
@ -58,24 +113,36 @@ class TestVolkswagenMqbSafety(unittest.TestCase):
|
|||
to_send[0].RDLR = (t & 0xFFF) << 16
|
||||
if torque < 0:
|
||||
to_send[0].RDLR |= 0x1 << 31
|
||||
to_send[0].RDLR |= (self.cnt_hca_01 % 16) << 8
|
||||
to_send[0].RDLR |= volkswagen_mqb_crc(to_send[0], MSG_HCA_01, 8)
|
||||
self.cnt_hca_01 += 1
|
||||
return to_send
|
||||
|
||||
# ACC engagement status
|
||||
def _acc_06_msg(self, status):
|
||||
to_send = make_msg(0, MSG_ACC_06)
|
||||
to_send[0].RDHR = (status & 0x7) << 28
|
||||
to_send[0].RDLR |= (self.cnt_acc_06 % 16) << 8
|
||||
to_send[0].RDLR |= volkswagen_mqb_crc(to_send[0], MSG_ACC_06, 8)
|
||||
self.cnt_acc_06 += 1
|
||||
return to_send
|
||||
|
||||
# Driver throttle input
|
||||
def _motor_20_msg(self, gas):
|
||||
to_send = make_msg(0, MSG_MOTOR_20)
|
||||
to_send[0].RDLR = (gas & 0xFF) << 12
|
||||
to_send[0].RDLR |= (self.cnt_motor_20 % 16) << 8
|
||||
to_send[0].RDLR |= volkswagen_mqb_crc(to_send[0], MSG_MOTOR_20, 8)
|
||||
self.cnt_motor_20 += 1
|
||||
return to_send
|
||||
|
||||
# Cruise control buttons
|
||||
def _gra_acc_01_msg(self, bit):
|
||||
to_send = make_msg(2, MSG_GRA_ACC_01)
|
||||
to_send[0].RDLR = 1 << bit
|
||||
to_send[0].RDLR |= (self.cnt_gra_acc_01 % 16) << 8
|
||||
to_send[0].RDLR |= volkswagen_mqb_crc(to_send[0], MSG_GRA_ACC_01, 8)
|
||||
self.cnt_gra_acc_01 += 1
|
||||
return to_send
|
||||
|
||||
def test_spam_can_buses(self):
|
||||
|
@ -102,6 +169,48 @@ class TestVolkswagenMqbSafety(unittest.TestCase):
|
|||
self.safety.safety_rx_hook(self._acc_06_msg(1))
|
||||
self.assertFalse(self.safety.get_controls_allowed())
|
||||
|
||||
def test_sample_speed(self):
|
||||
# Stationary
|
||||
self.safety.safety_rx_hook(self._esp_19_msg(0))
|
||||
self.assertEqual(0, self.safety.get_volkswagen_moving())
|
||||
# 1 km/h, just under 0.3 m/s safety grace threshold
|
||||
self.safety.safety_rx_hook(self._esp_19_msg(1))
|
||||
self.assertEqual(0, self.safety.get_volkswagen_moving())
|
||||
# 2 km/h, just over 0.3 m/s safety grace threshold
|
||||
self.safety.safety_rx_hook(self._esp_19_msg(2))
|
||||
self.assertEqual(1, self.safety.get_volkswagen_moving())
|
||||
# 144 km/h, openpilot V_CRUISE_MAX
|
||||
self.safety.safety_rx_hook(self._esp_19_msg(144))
|
||||
self.assertEqual(1, self.safety.get_volkswagen_moving())
|
||||
|
||||
def test_prev_brake(self):
|
||||
self.assertFalse(self.safety.get_volkswagen_brake_pressed_prev())
|
||||
self.safety.safety_rx_hook(self._esp_05_msg(True))
|
||||
self.assertTrue(self.safety.get_volkswagen_brake_pressed_prev())
|
||||
|
||||
def test_disengage_on_brake(self):
|
||||
self.safety.set_controls_allowed(1)
|
||||
self.safety.safety_rx_hook(self._esp_05_msg(True))
|
||||
self.assertFalse(self.safety.get_controls_allowed())
|
||||
|
||||
def test_allow_brake_at_zero_speed(self):
|
||||
# Brake was already pressed
|
||||
self.safety.safety_rx_hook(self._esp_05_msg(True))
|
||||
self.safety.set_controls_allowed(1)
|
||||
|
||||
self.safety.safety_rx_hook(self._esp_05_msg(True))
|
||||
self.assertTrue(self.safety.get_controls_allowed())
|
||||
self.safety.safety_rx_hook(self._esp_05_msg(False)) # reset no brakes
|
||||
|
||||
def test_not_allow_brake_when_moving(self):
|
||||
# Brake was already pressed
|
||||
self.safety.safety_rx_hook(self._esp_05_msg(True))
|
||||
self.safety.safety_rx_hook(self._esp_19_msg(100))
|
||||
self.safety.set_controls_allowed(1)
|
||||
|
||||
self.safety.safety_rx_hook(self._esp_05_msg(True))
|
||||
self.assertFalse(self.safety.get_controls_allowed())
|
||||
|
||||
def test_disengage_on_gas(self):
|
||||
self.safety.safety_rx_hook(self._motor_20_msg(0))
|
||||
self.safety.set_controls_allowed(True)
|
||||
|
@ -236,6 +345,54 @@ class TestVolkswagenMqbSafety(unittest.TestCase):
|
|||
self.assertEqual(0, self.safety.get_volkswagen_torque_driver_max())
|
||||
self.assertEqual(0, self.safety.get_volkswagen_torque_driver_min())
|
||||
|
||||
def test_rx_hook(self):
|
||||
# checksum checks
|
||||
# TODO: Would be ideal to check ESP_19 as well, but it has no checksum
|
||||
# or counter, and I'm not sure if we can easily validate Panda's simple
|
||||
# temporal reception-rate check here.
|
||||
for msg in [MSG_EPS_01, MSG_ESP_05, MSG_MOTOR_20, MSG_ACC_06]:
|
||||
self.safety.set_controls_allowed(1)
|
||||
if msg == MSG_EPS_01:
|
||||
to_push = self._eps_01_msg(0)
|
||||
if msg == MSG_ESP_05:
|
||||
to_push = self._esp_05_msg(False)
|
||||
if msg == MSG_MOTOR_20:
|
||||
to_push = self._motor_20_msg(0)
|
||||
if msg == MSG_ACC_06:
|
||||
to_push = self._acc_06_msg(3)
|
||||
self.assertTrue(self.safety.safety_rx_hook(to_push))
|
||||
to_push[0].RDHR ^= 0xFF
|
||||
self.assertFalse(self.safety.safety_rx_hook(to_push))
|
||||
self.assertFalse(self.safety.get_controls_allowed())
|
||||
|
||||
# counter
|
||||
# reset wrong_counters to zero by sending valid messages
|
||||
for i in range(MAX_WRONG_COUNTERS + 1):
|
||||
self.cnt_eps_01 = 0
|
||||
self.cnt_esp_05 = 0
|
||||
self.cnt_motor_20 = 0
|
||||
self.cnt_acc_06 = 0
|
||||
if i < MAX_WRONG_COUNTERS:
|
||||
self.safety.set_controls_allowed(1)
|
||||
self.safety.safety_rx_hook(self._eps_01_msg(0))
|
||||
self.safety.safety_rx_hook(self._esp_05_msg(False))
|
||||
self.safety.safety_rx_hook(self._motor_20_msg(0))
|
||||
self.safety.safety_rx_hook(self._acc_06_msg(3))
|
||||
else:
|
||||
self.assertFalse(self.safety.safety_rx_hook(self._eps_01_msg(0)))
|
||||
self.assertFalse(self.safety.safety_rx_hook(self._esp_05_msg(False)))
|
||||
self.assertFalse(self.safety.safety_rx_hook(self._motor_20_msg(0)))
|
||||
self.assertFalse(self.safety.safety_rx_hook(self._acc_06_msg(3)))
|
||||
self.assertFalse(self.safety.get_controls_allowed())
|
||||
|
||||
# restore counters for future tests with a couple of good messages
|
||||
for i in range(2):
|
||||
self.safety.set_controls_allowed(1)
|
||||
self.safety.safety_rx_hook(self._eps_01_msg(0))
|
||||
self.safety.safety_rx_hook(self._esp_05_msg(False))
|
||||
self.safety.safety_rx_hook(self._motor_20_msg(0))
|
||||
self.safety.safety_rx_hook(self._acc_06_msg(3))
|
||||
self.assertTrue(self.safety.get_controls_allowed())
|
||||
|
||||
def test_fwd_hook(self):
|
||||
buss = list(range(0x0, 0x3))
|
||||
|
|
Loading…
Reference in New Issue