Safety model for Volkswagen PQ35/PQ46/NMS (#474)

* Align with VW Community port 0.7.4-devel

* Import of PQ35/PQ46/NMS safety work

* Add safety replay drive for PQ35/PQ46/NMS

* Unwind community port hax for upstream, bump Panda ver

* Update comment

* Reduce whitespace diff

* Reduce whitespace diff

* Don't test the drive until it's uploaded

* Propitiate MISRA

* Ixnay on the VERSION LF

* Remove VERSION LF, update safety_replay with PQ drive

* Freshen cereal ref to include fields used in VW PQ test drive

* Remove superfluous return

* Bypass Docker caching issue
master
Jason Young 2020-04-01 12:38:27 -07:00 committed by GitHub
parent 51e0a55d6d
commit 08ef92d585
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
7 changed files with 530 additions and 7 deletions

View File

@ -1 +1 @@
v1.7.3
v1.7.4

View File

@ -37,6 +37,7 @@
#define SAFETY_GM_ASCM 18U
#define SAFETY_NOOUTPUT 19U
#define SAFETY_HONDA_BOSCH_HARNESS 20U
#define SAFETY_VOLKSWAGEN_PQ 21U
#define SAFETY_SUBARU_LEGACY 22U
uint16_t current_safety_mode = SAFETY_SILENT;
@ -203,6 +204,7 @@ const safety_hook_config safety_hook_registry[] = {
{SAFETY_SUBARU_LEGACY, &subaru_legacy_hooks},
{SAFETY_MAZDA, &mazda_hooks},
{SAFETY_VOLKSWAGEN_MQB, &volkswagen_mqb_hooks},
{SAFETY_VOLKSWAGEN_PQ, &volkswagen_pq_hooks},
{SAFETY_NOOUTPUT, &nooutput_hooks},
#ifdef ALLOW_DEBUG
{SAFETY_CADILLAC, &cadillac_hooks},

View File

@ -30,6 +30,26 @@ AddrCheckStruct volkswagen_mqb_rx_checks[] = {
};
const int VOLKSWAGEN_MQB_RX_CHECKS_LEN = sizeof(volkswagen_mqb_rx_checks) / sizeof(volkswagen_mqb_rx_checks[0]);
// Safety-relevant CAN messages for the Volkswagen PQ35/PQ46/NMS platforms
#define MSG_LENKHILFE_3 0x0D0 // RX from EPS, for steering angle and driver steering torque
#define MSG_HCA_1 0x0D2 // TX by OP, Heading Control Assist steering torque
#define MSG_MOTOR_2 0x288 // RX from ECU, for CC state and brake switch state
#define MSG_MOTOR_3 0x380 // RX from ECU, for driver throttle input
#define MSG_GRA_NEU 0x38A // TX by OP, ACC control buttons for cancel/resume
#define MSG_BREMSE_3 0x4A0 // RX from ABS, for wheel speeds
#define MSG_LDW_1 0x5BE // TX by OP, Lane line recognition and text alerts
// Transmit of GRA_Neu is allowed on bus 0 and 2 to keep compatibility with gateway and camera integration
const AddrBus VOLKSWAGEN_PQ_TX_MSGS[] = {{MSG_HCA_1, 0}, {MSG_GRA_NEU, 0}, {MSG_GRA_NEU, 2}, {MSG_LDW_1, 0}};
const int VOLKSWAGEN_PQ_TX_MSGS_LEN = sizeof(VOLKSWAGEN_PQ_TX_MSGS) / sizeof(VOLKSWAGEN_PQ_TX_MSGS[0]);
AddrCheckStruct volkswagen_pq_rx_checks[] = {
{.addr = {MSG_LENKHILFE_3}, .bus = 0, .check_checksum = true, .max_counter = 15U, .expected_timestep = 10000U},
{.addr = {MSG_MOTOR_2}, .bus = 0, .check_checksum = false, .max_counter = 0U, .expected_timestep = 20000U},
{.addr = {MSG_MOTOR_3}, .bus = 0, .check_checksum = false, .max_counter = 0U, .expected_timestep = 10000U},
{.addr = {MSG_BREMSE_3}, .bus = 0, .check_checksum = false, .max_counter = 0U, .expected_timestep = 10000U},
};
const int VOLKSWAGEN_PQ_RX_CHECKS_LEN = sizeof(volkswagen_pq_rx_checks) / sizeof(volkswagen_pq_rx_checks[0]);
struct sample_t volkswagen_torque_driver; // Last few driver torques measured
int volkswagen_rt_torque_last = 0;
@ -45,10 +65,17 @@ static uint8_t volkswagen_get_checksum(CAN_FIFOMailBox_TypeDef *to_push) {
return (uint8_t)GET_BYTE(to_push, 0);
}
static uint8_t volkswagen_get_counter(CAN_FIFOMailBox_TypeDef *to_push) {
static uint8_t volkswagen_mqb_get_counter(CAN_FIFOMailBox_TypeDef *to_push) {
// MQB message counters are consistently found at LSB 8.
return (uint8_t)GET_BYTE(to_push, 1) & 0xFU;
}
static uint8_t volkswagen_pq_get_counter(CAN_FIFOMailBox_TypeDef *to_push) {
// Few PQ messages have counters, and their offsets are inconsistent. This
// function works only for Lenkhilfe_3 at this time.
return (uint8_t)(GET_BYTE(to_push, 1) & 0xF0U) >> 4;
}
static uint8_t volkswagen_mqb_compute_crc(CAN_FIFOMailBox_TypeDef *to_push) {
int addr = GET_ADDR(to_push);
int len = GET_LEN(to_push);
@ -62,7 +89,7 @@ static uint8_t volkswagen_mqb_compute_crc(CAN_FIFOMailBox_TypeDef *to_push) {
crc = volkswagen_crc8_lut_8h2f[crc];
}
uint8_t counter = volkswagen_get_counter(to_push);
uint8_t counter = volkswagen_mqb_get_counter(to_push);
switch(addr) {
case MSG_EPS_01:
crc ^= (uint8_t[]){0xF5,0xF5,0xF5,0xF5,0xF5,0xF5,0xF5,0xF5,0xF5,0xF5,0xF5,0xF5,0xF5,0xF5,0xF5,0xF5}[counter];
@ -84,6 +111,17 @@ static uint8_t volkswagen_mqb_compute_crc(CAN_FIFOMailBox_TypeDef *to_push) {
return crc ^ 0xFFU;
}
static uint8_t volkswagen_pq_compute_checksum(CAN_FIFOMailBox_TypeDef *to_push) {
int len = GET_LEN(to_push);
uint8_t checksum = 0U;
for (int i = 1; i < len; i++) {
checksum ^= (uint8_t)GET_BYTE(to_push, i);
}
return checksum;
}
static void volkswagen_mqb_init(int16_t param) {
UNUSED(param);
@ -94,10 +132,19 @@ static void volkswagen_mqb_init(int16_t param) {
gen_crc_lookup_table(0x2F, volkswagen_crc8_lut_8h2f);
}
static void volkswagen_pq_init(int16_t param) {
UNUSED(param);
controls_allowed = false;
relay_malfunction = false;
volkswagen_torque_msg = MSG_HCA_1;
volkswagen_lane_msg = MSG_LDW_1;
}
static int volkswagen_mqb_rx_hook(CAN_FIFOMailBox_TypeDef *to_push) {
bool valid = addr_safety_check(to_push, volkswagen_mqb_rx_checks, VOLKSWAGEN_MQB_RX_CHECKS_LEN,
volkswagen_get_checksum, volkswagen_mqb_compute_crc, volkswagen_get_counter);
volkswagen_get_checksum, volkswagen_mqb_compute_crc, volkswagen_mqb_get_counter);
if (valid && (GET_BUS(to_push) == 0)) {
int addr = GET_ADDR(to_push);
@ -160,6 +207,73 @@ static int volkswagen_mqb_rx_hook(CAN_FIFOMailBox_TypeDef *to_push) {
return valid;
}
static int volkswagen_pq_rx_hook(CAN_FIFOMailBox_TypeDef *to_push) {
bool valid = addr_safety_check(to_push, volkswagen_pq_rx_checks, VOLKSWAGEN_PQ_RX_CHECKS_LEN,
volkswagen_get_checksum, volkswagen_pq_compute_checksum, volkswagen_pq_get_counter);
if (valid) {
int bus = GET_BUS(to_push);
int addr = GET_ADDR(to_push);
// Update in-motion state by sampling front wheel speeds
// Signal: Bremse_3.Radgeschw__VL_4_1 (front left)
// Signal: Bremse_3.Radgeschw__VR_4_1 (front right)
if ((bus == 0) && (addr == MSG_BREMSE_3)) {
int wheel_speed_fl = (GET_BYTE(to_push, 0) | (GET_BYTE(to_push, 1) << 8)) >> 1;
int wheel_speed_fr = (GET_BYTE(to_push, 2) | (GET_BYTE(to_push, 3) << 8)) >> 1;
// Check for average front speed in excess of 0.3m/s, 1.08km/h
// DBC speed scale 0.01: 0.3m/s = 108, sum both wheels to compare
volkswagen_moving = (wheel_speed_fl + wheel_speed_fr) > 216;
}
// Update driver input torque samples
// Signal: Lenkhilfe_3.LH3_LM (absolute torque)
// Signal: Lenkhilfe_3.LH3_LMSign (direction)
if ((bus == 0) && (addr == MSG_LENKHILFE_3)) {
int torque_driver_new = GET_BYTE(to_push, 2) | ((GET_BYTE(to_push, 3) & 0x3) << 8);
int sign = (GET_BYTE(to_push, 3) & 0x4) >> 2;
if (sign == 1) {
torque_driver_new *= -1;
}
update_sample(&volkswagen_torque_driver, torque_driver_new);
}
// Update ACC status from ECU for controls-allowed state
// Signal: Motor_2.GRA_Status
if ((bus == 0) && (addr == MSG_MOTOR_2)) {
int acc_status = (GET_BYTE(to_push, 2) & 0xC0) >> 6;
controls_allowed = ((acc_status == 1) || (acc_status == 2)) ? 1 : 0;
}
// Exit controls on rising edge of gas press
// Signal: Motor_3.Fahrpedal_Rohsignal
if ((bus == 0) && (addr == MSG_MOTOR_3)) {
int gas_pressed = (GET_BYTE(to_push, 2));
if (gas_pressed && !gas_pressed_prev) {
controls_allowed = 0;
}
gas_pressed_prev = gas_pressed;
}
// Exit controls on rising edge of brake press
// Signal: Motor_2.Bremslichtschalter
if ((bus == 0) && (addr == MSG_MOTOR_2)) {
bool brake_pressed = (GET_BYTE(to_push, 2) & 0x1);
if (brake_pressed && (!(brake_pressed_prev) || volkswagen_moving)) {
controls_allowed = 0;
}
brake_pressed_prev = brake_pressed;
}
// If there are HCA messages on bus 0 not sent by OP, there's a relay problem
if ((safety_mode_cnt > RELAY_TRNS_TIMEOUT) && (bus == 0) && (addr == MSG_HCA_1)) {
relay_malfunction = true;
}
}
return valid;
}
static bool volkswagen_steering_check(int desired_torque) {
bool violation = false;
uint32_t ts = TIM2->CNT;
@ -237,6 +351,44 @@ static int volkswagen_mqb_tx_hook(CAN_FIFOMailBox_TypeDef *to_send) {
return tx;
}
static int volkswagen_pq_tx_hook(CAN_FIFOMailBox_TypeDef *to_send) {
int addr = GET_ADDR(to_send);
int bus = GET_BUS(to_send);
int tx = 1;
if (!msg_allowed(addr, bus, VOLKSWAGEN_PQ_TX_MSGS, VOLKSWAGEN_PQ_TX_MSGS_LEN) || relay_malfunction) {
tx = 0;
}
// Safety check for HCA_1 Heading Control Assist torque
// Signal: HCA_1.LM_Offset (absolute torque)
// Signal: HCA_1.LM_Offsign (direction)
if (addr == MSG_HCA_1) {
int desired_torque = GET_BYTE(to_send, 2) | ((GET_BYTE(to_send, 3) & 0x7F) << 8);
desired_torque = desired_torque / 32; // DBC scale from PQ network to centi-Nm
int sign = (GET_BYTE(to_send, 3) & 0x80) >> 7;
if (sign == 1) {
desired_torque *= -1;
}
if (volkswagen_steering_check(desired_torque)) {
tx = 0;
}
}
// FORCE CANCEL: ensuring that only the cancel button press is sent when controls are off.
// This avoids unintended engagements while still allowing resume spam
if ((addr == MSG_GRA_NEU) && !controls_allowed) {
// disallow resume and set: bits 16 and 17
if ((GET_BYTE(to_send, 2) & 0x3) != 0) {
tx = 0;
}
}
// 1 allows the message through
return tx;
}
static int volkswagen_fwd_hook(int bus_num, CAN_FIFOMailBox_TypeDef *to_fwd) {
int addr = GET_ADDR(to_fwd);
int bus_fwd = -1;
@ -275,3 +427,14 @@ const safety_hooks volkswagen_mqb_hooks = {
.addr_check = volkswagen_mqb_rx_checks,
.addr_check_len = sizeof(volkswagen_mqb_rx_checks) / sizeof(volkswagen_mqb_rx_checks[0]),
};
// Volkswagen PQ35/PQ46/NMS platforms
const safety_hooks volkswagen_pq_hooks = {
.init = volkswagen_pq_init,
.rx = volkswagen_pq_rx_hook,
.tx = volkswagen_pq_tx_hook,
.tx_lin = nooutput_tx_lin_hook,
.fwd = volkswagen_fwd_hook,
.addr_check = volkswagen_pq_rx_checks,
.addr_check_len = sizeof(volkswagen_pq_rx_checks) / sizeof(volkswagen_pq_rx_checks[0]),
};

View File

@ -129,6 +129,7 @@ class Panda(object):
SAFETY_GM_ASCM = 18
SAFETY_NOOUTPUT = 19
SAFETY_HONDA_BOSCH_HARNESS = 20
SAFETY_VOLKSWAGEN_PQ = 21
SAFETY_SUBARU_LEGACY = 22
SERIAL_DEBUG = 0

View File

@ -0,0 +1,357 @@
#!/usr/bin/env python3
import unittest
import numpy as np
from panda import Panda
from panda.tests.safety import libpandasafety_py
from panda.tests.safety.common import StdTest, make_msg, MAX_WRONG_COUNTERS
MAX_RATE_UP = 4
MAX_RATE_DOWN = 10
MAX_STEER = 300
MAX_RT_DELTA = 75
RT_INTERVAL = 250000
DRIVER_TORQUE_ALLOWANCE = 80
DRIVER_TORQUE_FACTOR = 3
MSG_LENKHILFE_3 = 0x0D0 # RX from EPS, for steering angle and driver steering torque
MSG_HCA_1 = 0x0D2 # TX by OP, Heading Control Assist steering torque
MSG_MOTOR_2 = 0x288 # RX from ECU, for CC state and brake switch state
MSG_MOTOR_3 = 0x380 # RX from ECU, for driver throttle input
MSG_GRA_NEU = 0x38A # TX by OP, ACC control buttons for cancel/resume
MSG_BREMSE_3 = 0x4A0 # RX from ABS, for wheel speeds
MSG_LDW_1 = 0x5BE # TX by OP, Lane line recognition and text alerts
# Transmit of GRA_Neu is allowed on bus 0 and 2 to keep compatibility with gateway and camera integration
TX_MSGS = [[MSG_HCA_1, 0], [MSG_GRA_NEU, 0], [MSG_GRA_NEU, 2], [MSG_LDW_1, 0]]
def sign(a):
if a > 0:
return 1
else:
return -1
def volkswagen_pq_checksum(msg, addr, len_msg):
msg_bytes = msg.RDLR.to_bytes(4, 'little') + msg.RDHR.to_bytes(4, 'little')
msg_bytes = msg_bytes[1:len_msg]
checksum = 0
for i in msg_bytes:
checksum ^= i
return checksum
class TestVolkswagenPqSafety(unittest.TestCase):
cruise_engaged = False
brake_pressed = False
cnt_lenkhilfe_3 = 0
cnt_hca_1 = 0
@classmethod
def setUp(cls):
cls.safety = libpandasafety_py.libpandasafety
cls.safety.set_safety_hooks(Panda.SAFETY_VOLKSWAGEN_PQ, 0)
cls.safety.init_tests_volkswagen()
def _set_prev_torque(self, t):
self.safety.set_volkswagen_desired_torque_last(t)
self.safety.set_volkswagen_rt_torque_last(t)
# Wheel speeds (Bremse_3)
def _speed_msg(self, speed):
wheel_speed_scaled = int(speed / 0.01)
to_send = make_msg(0, MSG_BREMSE_3)
to_send[0].RDLR = (wheel_speed_scaled | (wheel_speed_scaled << 16)) << 1
to_send[0].RDHR = (wheel_speed_scaled | (wheel_speed_scaled << 16)) << 1
return to_send
# Brake light switch (shared message Motor_2)
def _brake_msg(self, brake):
self.__class__.brake_pressed = brake
return self._motor_2_msg()
# ACC engaged status (shared message Motor_2)
def _cruise_msg(self, cruise):
self.__class__.cruise_engaged = cruise
return self._motor_2_msg()
# Driver steering input torque
def _lenkhilfe_3_msg(self, torque):
to_send = make_msg(0, MSG_LENKHILFE_3)
t = abs(torque)
to_send[0].RDLR = ((t & 0x3FF) << 16)
if torque < 0:
to_send[0].RDLR |= 0x1 << 26
to_send[0].RDLR |= (self.cnt_lenkhilfe_3 % 16) << 12
to_send[0].RDLR |= volkswagen_pq_checksum(to_send[0], MSG_LENKHILFE_3, 8)
self.__class__.cnt_lenkhilfe_3 += 1
return to_send
# openpilot steering output torque
def _hca_1_msg(self, torque):
to_send = make_msg(0, MSG_HCA_1)
t = abs(torque) << 5 # DBC scale from centi-Nm to PQ network (approximated)
to_send[0].RDLR = (t & 0x7FFF) << 16
if torque < 0:
to_send[0].RDLR |= 0x1 << 31
to_send[0].RDLR |= (self.cnt_hca_1 % 16) << 8
to_send[0].RDLR |= volkswagen_pq_checksum(to_send[0], MSG_HCA_1, 8)
self.__class__.cnt_hca_1 += 1
return to_send
# ACC engagement and brake light switch status
# Called indirectly for compatibility with common.py tests
def _motor_2_msg(self):
to_send = make_msg(0, MSG_MOTOR_2)
to_send[0].RDLR = (0x1 << 16) if self.__class__.brake_pressed else 0
to_send[0].RDLR |= (self.__class__.cruise_engaged & 0x3) << 22
return to_send
# Driver throttle input
def _motor_3_msg(self, gas):
to_send = make_msg(0, MSG_MOTOR_3)
to_send[0].RDLR = (gas & 0xFF) << 16
return to_send
# Cruise control buttons
def _gra_neu_msg(self, bit):
to_send = make_msg(2, MSG_GRA_NEU)
to_send[0].RDLR = 1 << bit
to_send[0].RDLR |= volkswagen_pq_checksum(to_send[0], MSG_GRA_NEU, 8)
return to_send
def test_spam_can_buses(self):
StdTest.test_spam_can_buses(self, TX_MSGS)
def test_relay_malfunction(self):
StdTest.test_relay_malfunction(self, MSG_HCA_1)
def test_prev_gas(self):
for g in range(0, 256):
self.safety.safety_rx_hook(self._motor_3_msg(g))
self.assertEqual(True if g > 0 else False, self.safety.get_gas_pressed_prev())
def test_default_controls_not_allowed(self):
self.assertFalse(self.safety.get_controls_allowed())
def test_enable_control_allowed_from_cruise(self):
self.safety.safety_rx_hook(self._brake_msg(False))
self.safety.set_controls_allowed(0)
self.safety.safety_rx_hook(self._cruise_msg(True))
self.assertTrue(self.safety.get_controls_allowed())
def test_disable_control_allowed_from_cruise(self):
self.safety.safety_rx_hook(self._brake_msg(False))
self.safety.set_controls_allowed(1)
self.safety.safety_rx_hook(self._cruise_msg(False))
self.assertFalse(self.safety.get_controls_allowed())
def test_sample_speed(self):
# Stationary
self.safety.safety_rx_hook(self._speed_msg(0))
self.assertEqual(0, self.safety.get_volkswagen_moving())
# 1 km/h, just under 0.3 m/s safety grace threshold
self.safety.safety_rx_hook(self._speed_msg(1))
self.assertEqual(0, self.safety.get_volkswagen_moving())
# 2 km/h, just over 0.3 m/s safety grace threshold
self.safety.safety_rx_hook(self._speed_msg(2))
self.assertEqual(1, self.safety.get_volkswagen_moving())
# 144 km/h, openpilot V_CRUISE_MAX
self.safety.safety_rx_hook(self._speed_msg(144))
self.assertEqual(1, self.safety.get_volkswagen_moving())
def test_prev_brake(self):
self.assertFalse(self.safety.get_brake_pressed_prev())
self.safety.safety_rx_hook(self._brake_msg(True))
self.assertTrue(self.safety.get_brake_pressed_prev())
self.safety.safety_rx_hook(self._brake_msg(False))
def test_brake_disengage(self):
self.__class__.cruise_engaged = True # Hack due to brake and ACC signals being in the same message
StdTest.test_allow_brake_at_zero_speed(self)
StdTest.test_not_allow_brake_when_moving(self, 1)
def test_disengage_on_gas(self):
self.safety.safety_rx_hook(self._motor_3_msg(0))
self.safety.set_controls_allowed(True)
self.safety.safety_rx_hook(self._motor_3_msg(1))
self.assertFalse(self.safety.get_controls_allowed())
def test_allow_engage_with_gas_pressed(self):
self.safety.safety_rx_hook(self._motor_3_msg(1))
self.safety.set_controls_allowed(True)
self.safety.safety_rx_hook(self._motor_3_msg(1))
self.assertTrue(self.safety.get_controls_allowed())
self.safety.safety_rx_hook(self._motor_3_msg(1))
self.assertTrue(self.safety.get_controls_allowed())
def test_steer_safety_check(self):
for enabled in [0, 1]:
for t in range(-500, 500):
self.safety.set_controls_allowed(enabled)
self._set_prev_torque(t)
if abs(t) > MAX_STEER or (not enabled and abs(t) > 0):
self.assertFalse(self.safety.safety_tx_hook(self._hca_1_msg(t)))
else:
self.assertTrue(self.safety.safety_tx_hook(self._hca_1_msg(t)))
def test_manually_enable_controls_allowed(self):
StdTest.test_manually_enable_controls_allowed(self)
def test_spam_cancel_safety_check(self):
BIT_CANCEL = 9
BIT_SET = 16
BIT_RESUME = 17
self.safety.set_controls_allowed(0)
self.assertTrue(self.safety.safety_tx_hook(self._gra_neu_msg(BIT_CANCEL)))
self.assertFalse(self.safety.safety_tx_hook(self._gra_neu_msg(BIT_RESUME)))
self.assertFalse(self.safety.safety_tx_hook(self._gra_neu_msg(BIT_SET)))
# do not block resume if we are engaged already
self.safety.set_controls_allowed(1)
self.assertTrue(self.safety.safety_tx_hook(self._gra_neu_msg(BIT_RESUME)))
def test_non_realtime_limit_up(self):
self.safety.set_volkswagen_torque_driver(0, 0)
self.safety.set_controls_allowed(True)
self._set_prev_torque(0)
self.assertTrue(self.safety.safety_tx_hook(self._hca_1_msg(MAX_RATE_UP)))
self._set_prev_torque(0)
self.assertTrue(self.safety.safety_tx_hook(self._hca_1_msg(-MAX_RATE_UP)))
self._set_prev_torque(0)
self.assertFalse(self.safety.safety_tx_hook(self._hca_1_msg(MAX_RATE_UP + 1)))
self.safety.set_controls_allowed(True)
self._set_prev_torque(0)
self.assertFalse(self.safety.safety_tx_hook(self._hca_1_msg(-MAX_RATE_UP - 1)))
def test_non_realtime_limit_down(self):
self.safety.set_volkswagen_torque_driver(0, 0)
self.safety.set_controls_allowed(True)
def test_against_torque_driver(self):
self.safety.set_controls_allowed(True)
for sign in [-1, 1]:
for t in np.arange(0, DRIVER_TORQUE_ALLOWANCE + 1, 1):
t *= -sign
self.safety.set_volkswagen_torque_driver(t, t)
self._set_prev_torque(MAX_STEER * sign)
self.assertTrue(self.safety.safety_tx_hook(self._hca_1_msg(MAX_STEER * sign)))
self.safety.set_volkswagen_torque_driver(DRIVER_TORQUE_ALLOWANCE + 1, DRIVER_TORQUE_ALLOWANCE + 1)
self.assertFalse(self.safety.safety_tx_hook(self._hca_1_msg(-MAX_STEER)))
# spot check some individual cases
for sign in [-1, 1]:
driver_torque = (DRIVER_TORQUE_ALLOWANCE + 10) * sign
torque_desired = (MAX_STEER - 10 * DRIVER_TORQUE_FACTOR) * sign
delta = 1 * sign
self._set_prev_torque(torque_desired)
self.safety.set_volkswagen_torque_driver(-driver_torque, -driver_torque)
self.assertTrue(self.safety.safety_tx_hook(self._hca_1_msg(torque_desired)))
self._set_prev_torque(torque_desired + delta)
self.safety.set_volkswagen_torque_driver(-driver_torque, -driver_torque)
self.assertFalse(self.safety.safety_tx_hook(self._hca_1_msg(torque_desired + delta)))
self._set_prev_torque(MAX_STEER * sign)
self.safety.set_volkswagen_torque_driver(-MAX_STEER * sign, -MAX_STEER * sign)
self.assertTrue(self.safety.safety_tx_hook(self._hca_1_msg((MAX_STEER - MAX_RATE_DOWN) * sign)))
self._set_prev_torque(MAX_STEER * sign)
self.safety.set_volkswagen_torque_driver(-MAX_STEER * sign, -MAX_STEER * sign)
self.assertTrue(self.safety.safety_tx_hook(self._hca_1_msg(0)))
self._set_prev_torque(MAX_STEER * sign)
self.safety.set_volkswagen_torque_driver(-MAX_STEER * sign, -MAX_STEER * sign)
self.assertFalse(self.safety.safety_tx_hook(self._hca_1_msg((MAX_STEER - MAX_RATE_DOWN + 1) * sign)))
def test_realtime_limits(self):
self.safety.set_controls_allowed(True)
for sign in [-1, 1]:
self.safety.init_tests_volkswagen()
self._set_prev_torque(0)
self.safety.set_volkswagen_torque_driver(0, 0)
for t in np.arange(0, MAX_RT_DELTA, 1):
t *= sign
self.assertTrue(self.safety.safety_tx_hook(self._hca_1_msg(t)))
self.assertFalse(self.safety.safety_tx_hook(self._hca_1_msg(sign * (MAX_RT_DELTA + 1))))
self._set_prev_torque(0)
for t in np.arange(0, MAX_RT_DELTA, 1):
t *= sign
self.assertTrue(self.safety.safety_tx_hook(self._hca_1_msg(t)))
# Increase timer to update rt_torque_last
self.safety.set_timer(RT_INTERVAL + 1)
self.assertTrue(self.safety.safety_tx_hook(self._hca_1_msg(sign * (MAX_RT_DELTA - 1))))
self.assertTrue(self.safety.safety_tx_hook(self._hca_1_msg(sign * (MAX_RT_DELTA + 1))))
def test_torque_measurements(self):
self.safety.safety_rx_hook(self._lenkhilfe_3_msg(50))
self.safety.safety_rx_hook(self._lenkhilfe_3_msg(-50))
self.safety.safety_rx_hook(self._lenkhilfe_3_msg(0))
self.safety.safety_rx_hook(self._lenkhilfe_3_msg(0))
self.safety.safety_rx_hook(self._lenkhilfe_3_msg(0))
self.safety.safety_rx_hook(self._lenkhilfe_3_msg(0))
self.assertEqual(-50, self.safety.get_volkswagen_torque_driver_min())
self.assertEqual(50, self.safety.get_volkswagen_torque_driver_max())
self.safety.safety_rx_hook(self._lenkhilfe_3_msg(0))
self.assertEqual(0, self.safety.get_volkswagen_torque_driver_max())
self.assertEqual(-50, self.safety.get_volkswagen_torque_driver_min())
self.safety.safety_rx_hook(self._lenkhilfe_3_msg(0))
self.assertEqual(0, self.safety.get_volkswagen_torque_driver_max())
self.assertEqual(0, self.safety.get_volkswagen_torque_driver_min())
def test_rx_hook(self):
# checksum checks
# TODO: Would be ideal to check non-checksum non-counter messages as well,
# but I'm not sure if we can easily validate Panda's simple temporal
# reception-rate check here.
for msg in [MSG_LENKHILFE_3]:
self.safety.set_controls_allowed(1)
if msg == MSG_LENKHILFE_3:
to_push = self._lenkhilfe_3_msg(0)
self.assertTrue(self.safety.safety_rx_hook(to_push))
to_push[0].RDHR ^= 0xFF
self.assertFalse(self.safety.safety_rx_hook(to_push))
self.assertFalse(self.safety.get_controls_allowed())
# counter
# reset wrong_counters to zero by sending valid messages
for i in range(MAX_WRONG_COUNTERS + 1):
self.__class__.cnt_lenkhilfe_3 += 1
if i < MAX_WRONG_COUNTERS:
self.safety.set_controls_allowed(1)
self.safety.safety_rx_hook(self._lenkhilfe_3_msg(0))
else:
self.assertFalse(self.safety.safety_rx_hook(self._lenkhilfe_3_msg(0)))
self.assertFalse(self.safety.get_controls_allowed())
# restore counters for future tests with a couple of good messages
for i in range(2):
self.safety.set_controls_allowed(1)
self.safety.safety_rx_hook(self._lenkhilfe_3_msg(0))
self.assertTrue(self.safety.get_controls_allowed())
def test_fwd_hook(self):
buss = list(range(0x0, 0x3))
msgs = list(range(0x1, 0x800))
blocked_msgs_0to2 = []
blocked_msgs_2to0 = [MSG_HCA_1, MSG_LDW_1]
for b in buss:
for m in msgs:
if b == 0:
fwd_bus = -1 if m in blocked_msgs_0to2 else 2
elif b == 1:
fwd_bus = -1
elif b == 2:
fwd_bus = -1 if m in blocked_msgs_2to0 else 0
# assume len 8
self.assertEqual(fwd_bus, self.safety.safety_fwd_hook(b, make_msg(b, m, 8)))
if __name__ == "__main__":
unittest.main()

View File

@ -25,8 +25,7 @@ RUN mkdir /openpilot
WORKDIR /openpilot
RUN git clone https://github.com/commaai/cereal.git || true
WORKDIR /openpilot/cereal
RUN git pull
RUN git checkout bb2cc7572de99becce1bfbae63f3b38d5464e162
RUN git pull && git checkout 35040fe6bb9ebc31d38e98faa64d5ec4093ce3d5
COPY . /openpilot/panda
WORKDIR /openpilot/panda/tests/safety_replay

View File

@ -18,7 +18,8 @@ logs = [
("02ec6bea180a4d36|2019-04-17--11-21-35.bz2", Panda.SAFETY_HYUNDAI, 0), # HYUNDAI.SANTA_FE
("6fb4948a7ebe670e|2019-11-12--00-35-53.bz2", Panda.SAFETY_CHRYSLER, 0), # CHRYSLER.PACIFICA_2018_HYBRID
("791340bc01ed993d|2019-04-08--10-26-00.bz2", Panda.SAFETY_SUBARU, 0), # SUBARU.IMPREZA
("76b83eb0245de90e|2020-03-05--19-16-05.bz2", Panda.SAFETY_VOLKSWAGEN_MQB, 0), # VOLKSWAGEN.GOLF
("76b83eb0245de90e|2020-03-05--19-16-05.bz2", Panda.SAFETY_VOLKSWAGEN_MQB, 0), # VOLKSWAGEN.GOLF (MK7)
("d12cd943127f267b|2020-03-27--15-57-18.bz2", Panda.SAFETY_VOLKSWAGEN_PQ, 0), # 2009 VW Passat R36 (B6), supporting OP port not yet upstreamed
("fbbfa6af821552b9|2020-03-03--08-09-43.bz2", Panda.SAFETY_NISSAN, 0), # NISSAN.XTRAIL
]