Simplify longitudinal tests (#21337)

* first pass

* passes!

* little more cleanup

* little more

* fix sim

* remove more plot stuff

* fix crash check

* fcw

Co-authored-by: Adeeb Shihadeh <adeebshihadeh@gmail.com>
pull/21403/head
HaraldSchafer 2021-06-24 23:03:18 -05:00 committed by GitHub
parent fb2eec03e1
commit 234971203b
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 136 additions and 902 deletions

View File

@ -1,2 +0,0 @@
#!/bin/sh
docker run -v $(pwd)/docker_out:/tmp/openpilot/selfdrive/test/out openpilot_ci /bin/bash -c "rm -Rf /tmp/openpilot/selfdrive/test/out/longitudinal"

View File

@ -1,7 +1,5 @@
from collections import defaultdict
from selfdrive.test.longitudinal_maneuvers.maneuverplots import ManeuverPlot
from selfdrive.test.longitudinal_maneuvers.plant import Plant
import numpy as np
from selfdrive.test.longitudinal_maneuvers.plant import Plant
class Maneuver():
@ -11,75 +9,32 @@ class Maneuver():
self.speed = kwargs.get("initial_speed", 0.0)
self.lead_relevancy = kwargs.get("lead_relevancy", 0)
self.grade_values = kwargs.get("grade_values", [0.0, 0.0])
self.grade_breakpoints = kwargs.get("grade_breakpoints", [0.0, duration])
self.speed_lead_values = kwargs.get("speed_lead_values", [0.0, 0.0])
self.speed_lead_breakpoints = kwargs.get("speed_lead_breakpoints", [0.0, duration])
self.cruise_button_presses = kwargs.get("cruise_button_presses", [])
self.checks = kwargs.get("checks", [])
self.duration = duration
self.title = title
def evaluate(self):
"""runs the plant sim and returns (score, run_data)"""
plant = Plant(
lead_relevancy=self.lead_relevancy,
speed=self.speed,
distance_lead=self.distance_lead
)
logs = defaultdict(list)
last_controls_state = None
plot = ManeuverPlot(self.title)
buttons_sorted = sorted(self.cruise_button_presses, key=lambda a: a[1])
current_button = 0
valid = True
while plant.current_time() < self.duration:
while buttons_sorted and plant.current_time() >= buttons_sorted[0][1]:
current_button = buttons_sorted[0][0]
buttons_sorted = buttons_sorted[1:]
print("current button changed to {0}".format(current_button))
grade = np.interp(plant.current_time(), self.grade_breakpoints, self.grade_values)
speed_lead = np.interp(plant.current_time(), self.speed_lead_breakpoints, self.speed_lead_values)
# distance, speed, acceleration, distance_lead, brake, gas, steer_torque, fcw, controls_state= plant.step(speed_lead, current_button, grade)
log = plant.step(speed_lead, current_button, grade)
if log['controls_state_msgs']:
last_controls_state = log['controls_state_msgs'][-1]
log = plant.step(speed_lead)
d_rel = log['distance_lead'] - log['distance'] if self.lead_relevancy else 200.
v_rel = speed_lead - log['speed'] if self.lead_relevancy else 0.
log['d_rel'] = d_rel
log['v_rel'] = v_rel
if last_controls_state:
# print(last_controls_state)
#develop plots
plot.add_data(
time=plant.current_time(),
gas=log['gas'], brake=log['brake'], steer_torque=log['steer_torque'],
distance=log['distance'], speed=log['speed'], acceleration=log['acceleration'],
up_accel_cmd=last_controls_state.upAccelCmd, ui_accel_cmd=last_controls_state.uiAccelCmd,
uf_accel_cmd=last_controls_state.ufAccelCmd,
d_rel=d_rel, v_rel=v_rel, v_lead=speed_lead,
v_target_lead=last_controls_state.vTargetLead, pid_speed=last_controls_state.vPid,
cruise_speed=last_controls_state.vCruise,
a_target=last_controls_state.aTarget,
fcw=log['fcw'])
for k, v in log.items():
logs[k].append(v)
valid = True
for check in self.checks:
c = check(logs)
if not c:
print(check.__name__ + " not valid!")
valid = valid and c
if d_rel < 1.0:
print("Crashed!!!!")
valid = False
print("maneuver end", valid)
return (plot, valid)
return valid

View File

@ -1,139 +0,0 @@
import os
import numpy as np
import matplotlib.pyplot as plt
import pylab
from selfdrive.config import Conversions as CV
class ManeuverPlot():
def __init__(self, title=None):
self.time_array = []
self.gas_array = []
self.brake_array = []
self.steer_torque_array = []
self.distance_array = []
self.speed_array = []
self.acceleration_array = []
self.up_accel_cmd_array = []
self.ui_accel_cmd_array = []
self.uf_accel_cmd_array = []
self.d_rel_array = []
self.v_rel_array = []
self.v_lead_array = []
self.v_target_lead_array = []
self.pid_speed_array = []
self.cruise_speed_array = []
self.a_target_array = []
self.v_target_array = []
self.fcw_array = []
self.title = title
def add_data(self, time, gas, brake, steer_torque, distance, speed,
acceleration, up_accel_cmd, ui_accel_cmd, uf_accel_cmd, d_rel, v_rel,
v_lead, v_target_lead, pid_speed, cruise_speed, a_target, fcw):
self.time_array.append(time)
self.gas_array.append(gas)
self.brake_array.append(brake)
self.steer_torque_array.append(steer_torque)
self.distance_array.append(distance)
self.speed_array.append(speed)
self.acceleration_array.append(acceleration)
self.up_accel_cmd_array.append(up_accel_cmd)
self.ui_accel_cmd_array.append(ui_accel_cmd)
self.uf_accel_cmd_array.append(uf_accel_cmd)
self.d_rel_array.append(d_rel)
self.v_rel_array.append(v_rel)
self.v_lead_array.append(v_lead)
self.v_target_lead_array.append(v_target_lead)
self.pid_speed_array.append(pid_speed)
self.cruise_speed_array.append(cruise_speed)
self.a_target_array.append(a_target)
self.fcw_array.append(fcw)
def write_plot(self, path, maneuver_name):
# title = self.title or maneuver_name
# TODO: Missing plots from the old one:
# long_control_state
# proportional_gb, intergral_gb
if not os.path.exists(path + "/" + maneuver_name):
os.makedirs(path + "/" + maneuver_name)
plt_num = 0
# speed chart ===================
plt_num += 1
plt.figure(plt_num)
plt.plot(
np.array(self.time_array), np.array(self.speed_array) * CV.MS_TO_MPH, 'r',
np.array(self.time_array), np.array(self.pid_speed_array) * CV.MS_TO_MPH, 'y--',
np.array(self.time_array), np.array(self.v_target_lead_array) * CV.MS_TO_MPH, 'b',
np.array(self.time_array), np.array(self.cruise_speed_array) * CV.KPH_TO_MPH, 'k',
np.array(self.time_array), np.array(self.v_lead_array) * CV.MS_TO_MPH, 'm'
)
plt.xlabel('Time [s]')
plt.ylabel('Speed [mph]')
plt.legend(['speed', 'pid speed', 'Target (lead) speed', 'Cruise speed', 'Lead speed'], loc=0)
plt.grid()
pylab.savefig("/".join([path, maneuver_name, 'speeds.svg']), dpi=1000)
# acceleration chart ============
plt_num += 1
plt.figure(plt_num)
plt.plot(
np.array(self.time_array), np.array(self.acceleration_array), 'g',
np.array(self.time_array), np.array(self.a_target_array), 'k--',
np.array(self.time_array), np.array(self.fcw_array), 'ro',
)
plt.xlabel('Time [s]')
plt.ylabel('Acceleration [m/s^2]')
plt.legend(['ego-plant', 'target', 'fcw'], loc=0)
plt.grid()
pylab.savefig("/".join([path, maneuver_name, 'acceleration.svg']), dpi=1000)
# pedal chart ===================
plt_num += 1
plt.figure(plt_num)
plt.plot(
np.array(self.time_array), np.array(self.gas_array), 'g',
np.array(self.time_array), np.array(self.brake_array), 'r',
)
plt.xlabel('Time [s]')
plt.ylabel('Pedal []')
plt.legend(['Gas pedal', 'Brake pedal'], loc=0)
plt.grid()
pylab.savefig("/".join([path, maneuver_name, 'pedals.svg']), dpi=1000)
# pid chart ======================
plt_num += 1
plt.figure(plt_num)
plt.plot(
np.array(self.time_array), np.array(self.up_accel_cmd_array), 'g',
np.array(self.time_array), np.array(self.ui_accel_cmd_array), 'b',
np.array(self.time_array), np.array(self.uf_accel_cmd_array), 'r'
)
plt.xlabel("Time, [s]")
plt.ylabel("Accel Cmd [m/s^2]")
plt.grid()
plt.legend(["Proportional", "Integral", "feedforward"], loc=0)
pylab.savefig("/".join([path, maneuver_name, "pid.svg"]), dpi=1000)
# relative distances chart =======
plt_num += 1
plt.figure(plt_num)
plt.plot(
np.array(self.time_array), np.array(self.d_rel_array), 'g',
)
plt.xlabel('Time [s]')
plt.ylabel('Relative Distance [m]')
plt.grid()
pylab.savefig("/".join([path, maneuver_name, 'distance.svg']), dpi=1000)
plt.close("all")

View File

@ -1,467 +1,137 @@
#!/usr/bin/env python3
import binascii
import os
import struct
import time
from collections import namedtuple
import numpy as np
from opendbc import DBC_PATH
from cereal import car, log
from common.realtime import Ratekeeper
from selfdrive.config import Conversions as CV
from cereal import log
import cereal.messaging as messaging
from selfdrive.car import crc8_pedal
from selfdrive.car.honda.values import CAR
from selfdrive.car.honda.carstate import get_can_signals
from selfdrive.boardd.boardd import can_list_to_can_capnp
from common.realtime import Ratekeeper, DT_MDL
from selfdrive.controls.lib.longcontrol import LongCtrlState
from opendbc.can.parser import CANParser
from selfdrive.car.honda.interface import CarInterface
from opendbc.can.dbc import dbc
honda = dbc(os.path.join(DBC_PATH, "honda_civic_touring_2016_can_generated.dbc"))
# Trick: set 0x201 (interceptor) in fingerprints for gas is controlled like if there was an interceptor
CP = CarInterface.get_params(CAR.CIVIC, {0: {0x201: 6}, 1: {}, 2: {}, 3: {}})
# Honda checksum
def can_cksum(mm):
s = 0
for c in mm:
s += (c >> 4)
s += c & 0xF
s = 8-s
s %= 0x10
return s
def fix(msg, addr):
msg2 = msg[0:-1] + (msg[-1] | can_cksum(struct.pack("I", addr)+msg)).to_bytes(1, 'little')
return msg2
def car_plant(pos, speed, grade, gas, brake):
# vehicle parameters
mass = 1700
aero_cd = 0.3
force_peak = mass*3.
force_brake_peak = -mass*10. # 1g
power_peak = 100000 # 100kW
speed_base = power_peak/force_peak
rolling_res = 0.01
gravity = 9.81
frontal_area = 2.2
air_density = 1.225
gas_to_peak_linear_slope = 3.33
brake_to_peak_linear_slope = 0.3
creep_accel_v = [1., 0.]
creep_accel_bp = [0., 1.5]
#*** longitudinal model ***
# find speed where peak torque meets peak power
force_brake = brake * force_brake_peak * brake_to_peak_linear_slope
if speed < speed_base: # torque control
force_gas = gas * force_peak * gas_to_peak_linear_slope
else: # power control
force_gas = gas * power_peak / speed * gas_to_peak_linear_slope
force_grade = - np.sin(np.arctan(grade)) * mass * gravity
creep_accel = np.interp(speed, creep_accel_bp, creep_accel_v)
force_creep = creep_accel * mass
force_resistance = -(rolling_res * mass * gravity + 0.5 * speed**2 * aero_cd * air_density * frontal_area)
force = force_gas + force_brake + force_resistance + force_grade + force_creep
acceleration = force / mass
# TODO: lateral model
return speed, acceleration
def get_car_can_parser():
dbc_f = 'honda_civic_touring_2016_can_generated'
signals = [
("STEER_TORQUE", 0xe4, 0),
("STEER_TORQUE_REQUEST", 0xe4, 0),
("COMPUTER_BRAKE", 0x1fa, 0),
("COMPUTER_BRAKE_REQUEST", 0x1fa, 0),
("GAS_COMMAND", 0x200, 0),
]
checks = [
(0xe4, 100),
(0x1fa, 50),
(0x200, 50),
]
return CANParser(dbc_f, signals, checks, 0)
def to_3_byte(x):
# Convert into 12 bit value
s = struct.pack("!H", int(x))
return binascii.hexlify(s)[1:]
def to_3s_byte(x):
s = struct.pack("!h", int(x))
return binascii.hexlify(s)[1:]
class Plant():
messaging_initialized = False
def __init__(self, lead_relevancy=False, rate=100, speed=0.0, distance_lead=2.0):
self.rate = rate
def __init__(self, lead_relevancy=False, speed=0.0, distance_lead=2.0):
self.rate = 1. / DT_MDL
if not Plant.messaging_initialized:
Plant.pm = messaging.PubMaster(['roadCameraState', 'driverCameraState', 'ubloxRaw'])
Plant.logcan = messaging.pub_sock('can')
Plant.sendcan = messaging.sub_sock('sendcan')
Plant.model = messaging.pub_sock('modelV2')
Plant.live_params = messaging.pub_sock('liveParameters')
Plant.live_location_kalman = messaging.pub_sock('liveLocationKalman')
Plant.pandaState = messaging.pub_sock('pandaState')
Plant.deviceState = messaging.pub_sock('deviceState')
Plant.driverMonitoringState = messaging.pub_sock('driverMonitoringState')
Plant.cal = messaging.pub_sock('liveCalibration')
Plant.controls_state = messaging.sub_sock('controlsState')
Plant.radar = messaging.pub_sock('radarState')
Plant.controls_state = messaging.pub_sock('controlsState')
Plant.car_state = messaging.pub_sock('carState')
Plant.plan = messaging.sub_sock('longitudinalPlan')
Plant.messaging_initialized = True
self.frame = 0
self.angle_steer = 0.
self.gear_choice = 0
self.speed, self.speed_prev = 0., 0.
self.v_lead_prev = 0.0
self.esp_disabled = 0
self.main_on = 1
self.user_gas = 0
self.computer_brake, self.user_brake = 0, 0
self.brake_pressed = 0
self.angle_steer_rate = 0
self.distance, self.distance_prev = 0., 0.
self.speed, self.speed_prev = speed, speed
self.steer_error, self.brake_error, self.steer_not_allowed = 0, 0, 0
self.gear_shifter = 8 # D gear
self.pedal_gas = 0
self.cruise_setting = 0
self.seatbelt, self.door_all_closed = True, True
self.steer_torque, self.v_cruise, self.acc_status = 0, 0, 0 # v_cruise is reported from can, not the one used for controls
self.lead_relevancy = lead_relevancy
self.distance = 0.
self.speed = speed
self.acceleration = 0.0
# lead car
self.distance_lead, self.distance_lead_prev = distance_lead , distance_lead
self.rk = Ratekeeper(rate, print_delay_threshold=100.0)
self.ts = 1./rate
self.cp = get_car_can_parser()
self.response_seen = False
self.distance_lead = distance_lead
self.lead_relevancy = lead_relevancy
self.rk = Ratekeeper(self.rate, print_delay_threshold=100.0)
self.ts = 1. / self.rate
time.sleep(1)
messaging.drain_sock(Plant.sendcan)
messaging.drain_sock(Plant.controls_state)
def close(self):
Plant.logcan.close()
Plant.model.close()
Plant.live_params.close()
Plant.live_location_kalman.close()
def speed_sensor(self, speed):
if speed < 0.3:
return 0
else:
return speed * CV.MS_TO_KPH
self.sm = messaging.SubMaster(['longitudinalPlan'])
def current_time(self):
return float(self.rk.frame) / self.rate
def step(self, v_lead=0.0, cruise_buttons=None, grade=0.0, publish_model=True):
gen_signals, _ = get_can_signals(CP)
sgs = [s[0] for s in gen_signals]
msgs = [s[1] for s in gen_signals]
def step(self, v_lead=0.0):
# ******** publish a fake model going straight and fake calibration ********
# note that this is worst case for MPC, since model will delay long mpc by one time step
radar = messaging.new_message('radarState')
control = messaging.new_message('controlsState')
car_state = messaging.new_message('carState')
a_lead = (v_lead - self.v_lead_prev)/self.ts
self.v_lead_prev = v_lead
# ******** get messages sent to the car ********
can_strings = messaging.drain_sock_raw(Plant.sendcan, wait_for_one=self.response_seen)
# After the first response the car is done fingerprinting, so we can run in lockstep with controlsd
if can_strings:
self.response_seen = True
self.cp.update_strings(can_strings, sendcan=True)
# ******** get controlsState messages for plotting ***
controls_state_msgs = []
for a in messaging.drain_sock(Plant.controls_state, wait_for_one=self.response_seen):
controls_state_msgs.append(a.controlsState)
fcw = None
for a in messaging.drain_sock(Plant.plan):
if a.longitudinalPlan.fcw:
fcw = True
if self.cp.vl[0x1fa]['COMPUTER_BRAKE_REQUEST']:
brake = self.cp.vl[0x1fa]['COMPUTER_BRAKE'] * 0.003906248
else:
brake = 0.0
if self.cp.vl[0x200]['GAS_COMMAND'] > 0:
gas = self.cp.vl[0x200]['GAS_COMMAND'] / 256.0
else:
gas = 0.0
if self.cp.vl[0xe4]['STEER_TORQUE_REQUEST']:
steer_torque = self.cp.vl[0xe4]['STEER_TORQUE']*1.0/0xf00
else:
steer_torque = 0.0
distance_lead = self.distance_lead_prev + v_lead * self.ts
# ******** run the car ********
speed, acceleration = car_plant(self.distance_prev, self.speed_prev, grade, gas, brake)
distance = self.distance_prev + speed * self.ts
speed = self.speed_prev + self.ts * acceleration
if speed <= 0:
speed = 0
acceleration = 0
# ******** lateral ********
self.angle_steer -= (steer_torque/10.0) * self.ts
# *** radar model ***
if self.lead_relevancy:
d_rel = np.maximum(0., distance_lead - distance)
v_rel = v_lead - speed
d_rel = np.maximum(0., self.distance_lead - self.distance)
v_rel = v_lead - self.speed
prob = 1.0
else:
d_rel = 200.
v_rel = 0.
prob = 0.0
lead = log.RadarState.LeadData.new_message()
lead.dRel = float(d_rel)
lead.yRel = float(0.0)
lead.vRel = float(v_rel)
lead.aRel = float(a_lead - self.acceleration)
lead.vLead = float(v_lead)
lead.vLeadK = float(v_lead)
lead.aLeadK = float(a_lead)
lead.aLeadTau = float(1.5)
lead.status = True
lead.modelProb = prob
radar.radarState.leadOne = lead
radar.radarState.leadTwo = lead
control.controlsState.longControlState = LongCtrlState.pid
control.controlsState.vCruise = 130
car_state.carState.vEgo = self.speed
Plant.radar.send(radar.to_bytes())
Plant.controls_state.send(control.to_bytes())
Plant.car_state.send(car_state.to_bytes())
# ******** get controlsState messages for plotting ***
self.sm.update()
while True:
time.sleep(0.01)
if self.sm.updated['longitudinalPlan']:
plan = self.sm['longitudinalPlan']
self.acceleration = plan.aTarget
fcw = plan.fcw
break
self.speed += self.ts*self.acceleration
self.distance_lead = self.distance_lead + v_lead * self.ts
# ******** run the car ********
#print(self.distance, speed)
if self.speed <= 0:
self.speed = 0
self.acceleration = 0
self.distance = self.distance + self.speed * self.ts
# *** radar model ***
if self.lead_relevancy:
d_rel = np.maximum(0., self.distance_lead - self.distance)
v_rel = v_lead - self.speed
else:
d_rel = 200.
v_rel = 0.
lateral_pos_rel = 0.
# print at 5hz
if (self.frame % (self.rate//5)) == 0:
print("%6.2f m %6.2f m/s %6.2f m/s2 %.2f ang gas: %.2f brake: %.2f steer: %5.2f lead_rel: %6.2f m %6.2f m/s"
% (distance, speed, acceleration, self.angle_steer, gas, brake, steer_torque, d_rel, v_rel))
if (self.rk.frame % (self.rate // 5)) == 0:
print("%2.2f sec %6.2f m %6.2f m/s %6.2f m/s2 lead_rel: %6.2f m %6.2f m/s"
% (self.current_time(), self.distance, self.speed, self.acceleration, d_rel, v_rel))
# ******** publish the car ********
vls_tuple = namedtuple('vls', [
'XMISSION_SPEED',
'WHEEL_SPEED_FL', 'WHEEL_SPEED_FR', 'WHEEL_SPEED_RL', 'WHEEL_SPEED_RR',
'STEER_ANGLE', 'STEER_ANGLE_RATE', 'STEER_TORQUE_SENSOR', 'STEER_TORQUE_MOTOR',
'LEFT_BLINKER', 'RIGHT_BLINKER',
'GEAR',
'WHEELS_MOVING',
'BRAKE_ERROR_1', 'BRAKE_ERROR_2',
'SEATBELT_DRIVER_LAMP', 'SEATBELT_DRIVER_LATCHED',
'BRAKE_PRESSED', 'BRAKE_SWITCH',
'CRUISE_BUTTONS',
'ESP_DISABLED',
'HUD_LEAD',
'USER_BRAKE',
'STEER_STATUS',
'GEAR_SHIFTER',
'PEDAL_GAS',
'CRUISE_SETTING',
'ACC_STATUS',
'CRUISE_SPEED_PCM',
'CRUISE_SPEED_OFFSET',
'DOOR_OPEN_FL', 'DOOR_OPEN_FR', 'DOOR_OPEN_RL', 'DOOR_OPEN_RR',
'CAR_GAS',
'MAIN_ON',
'EPB_STATE',
'BRAKE_HOLD_ACTIVE',
'INTERCEPTOR_GAS',
'INTERCEPTOR_GAS2',
'IMPERIAL_UNIT',
'MOTOR_TORQUE',
])
vls = vls_tuple(
self.speed_sensor(speed),
self.speed_sensor(speed), self.speed_sensor(speed), self.speed_sensor(speed), self.speed_sensor(speed),
self.angle_steer, self.angle_steer_rate, 0, 0, # Steer torque sensor
0, 0, # Blinkers
self.gear_choice,
speed != 0,
self.brake_error, self.brake_error,
not self.seatbelt, self.seatbelt, # Seatbelt
self.brake_pressed, 0., # Brake pressed, Brake switch
cruise_buttons,
self.esp_disabled,
0, # HUD lead
self.user_brake,
self.steer_error,
self.gear_shifter,
self.pedal_gas,
self.cruise_setting,
self.acc_status,
self.v_cruise,
0, # Cruise speed offset
0, 0, 0, 0, # Doors
self.user_gas,
self.main_on,
0, # EPB State
0, # Brake hold
0, # Interceptor feedback
0, # Interceptor 2 feedback
False,
0,
)
# TODO: publish each message at proper frequency
can_msgs = []
for msg in set(msgs):
msg_struct = {}
indxs = [i for i, x in enumerate(msgs) if msg == msgs[i]]
for i in indxs:
msg_struct[sgs[i]] = getattr(vls, sgs[i])
if "COUNTER" in honda.get_signals(msg):
msg_struct["COUNTER"] = self.frame % 4
if "COUNTER_PEDAL" in honda.get_signals(msg):
msg_struct["COUNTER_PEDAL"] = self.frame % 0xf
msg = honda.lookup_msg_id(msg)
msg_data = honda.encode(msg, msg_struct)
if "CHECKSUM" in honda.get_signals(msg):
msg_data = fix(msg_data, msg)
if "CHECKSUM_PEDAL" in honda.get_signals(msg):
msg_struct["CHECKSUM_PEDAL"] = crc8_pedal(msg_data[:-1])
msg_data = honda.encode(msg, msg_struct)
can_msgs.append([msg, 0, msg_data, 0])
# add the radar message
# TODO: use the DBC
if self.frame % 5 == 0:
radar_state_msg = b'\x79\x00\x00\x00\x00\x00\x00\x00'
radar_msg = to_3_byte(d_rel * 16.0) + \
to_3_byte(int(lateral_pos_rel * 16.0) & 0x3ff) + \
to_3s_byte(int(v_rel * 32.0)) + \
b"0f00000"
radar_msg = binascii.unhexlify(radar_msg)
can_msgs.append([0x400, 0, radar_state_msg, 1])
can_msgs.append([0x445, 0, radar_msg, 1])
radar_messages = list(range(0x430, 0x43A)) + list(range(0x440, 0x445))
for m in radar_messages:
can_msgs.append([m, 0, b'\x00'*8, 1])
msg_struct["COUNTER"] = self.frame % 4
camera_messages = [0x1FA, 0x30C, 0xE4]
for m in camera_messages:
msg = honda.lookup_msg_id(m)
msg_data = honda.encode(msg, msg_struct)
msg_data = fix(msg_data, m)
can_msgs.append([m, 0, msg_data, 2])
# Fake sockets that controlsd subscribes to
live_parameters = messaging.new_message('liveParameters')
live_parameters.liveParameters.valid = True
live_parameters.liveParameters.sensorValid = True
live_parameters.liveParameters.posenetValid = True
live_parameters.liveParameters.steerRatio = CP.steerRatio
live_parameters.liveParameters.stiffnessFactor = 1.0
Plant.live_params.send(live_parameters.to_bytes())
dmon_state = messaging.new_message('driverMonitoringState')
dmon_state.driverMonitoringState.isDistracted = False
Plant.driverMonitoringState.send(dmon_state.to_bytes())
pandaState = messaging.new_message('pandaState')
pandaState.pandaState.safetyModel = car.CarParams.SafetyModel.hondaNidec
pandaState.pandaState.controlsAllowed = True
Plant.pandaState.send(pandaState.to_bytes())
deviceState = messaging.new_message('deviceState')
deviceState.deviceState.freeSpacePercent = 100
deviceState.deviceState.batteryPercent = 100
Plant.deviceState.send(deviceState.to_bytes())
live_location_kalman = messaging.new_message('liveLocationKalman')
live_location_kalman.liveLocationKalman.inputsOK = True
live_location_kalman.liveLocationKalman.gpsOK = True
Plant.live_location_kalman.send(live_location_kalman.to_bytes())
# ******** publish a fake model going straight and fake calibration ********
# note that this is worst case for MPC, since model will delay long mpc by one time step
if publish_model and self.frame % 5 == 0:
md = messaging.new_message('modelV2')
cal = messaging.new_message('liveCalibration')
md.modelV2.frameId = 0
if self.lead_relevancy:
d_rel = np.maximum(0., distance_lead - distance)
v_rel = v_lead - speed
prob = 1.0
else:
d_rel = 200.
v_rel = 0.
prob = 0.0
lead = log.ModelDataV2.LeadDataV2.new_message()
lead.xyva = [float(d_rel), 0.0, float(v_rel), 0.0]
lead.xyvaStd = [1.0, 1.0, 1.0, 1.0]
lead.prob = prob
md.modelV2.leads = [lead, lead]
cal.liveCalibration.calStatus = 1
cal.liveCalibration.calPerc = 100
cal.liveCalibration.rpyCalib = [0.] * 3
# fake values?
Plant.model.send(md.to_bytes())
Plant.cal.send(cal.to_bytes())
for s in Plant.pm.sock.keys():
try:
Plant.pm.send(s, messaging.new_message(s))
except Exception:
Plant.pm.send(s, messaging.new_message(s, 1))
Plant.logcan.send(can_list_to_can_capnp(can_msgs))
# ******** update prevs ********
self.frame += 1
if self.response_seen:
self.rk.monitor_time()
self.speed = speed
self.distance = distance
self.distance_lead = distance_lead
self.speed_prev = speed
self.distance_prev = distance
self.distance_lead_prev = distance_lead
else:
# Don't advance time when controlsd is not yet ready
self.rk.keep_time()
self.rk._frame = 0
self.rk.monitor_time()
return {
"distance": distance,
"speed": speed,
"acceleration": acceleration,
"distance_lead": distance_lead,
"brake": brake,
"gas": gas,
"steer_torque": steer_torque,
"distance": self.distance,
"speed": self.speed,
"acceleration": self.acceleration,
"distance_lead": self.distance_lead,
"fcw": fcw,
"controls_state_msgs": controls_state_msgs,
}
# simple engage in standalone mode
def plant_thread(rate=100):
plant = Plant(rate)
def plant_thread():
plant = Plant()
while 1:
plant.step()
if __name__ == "__main__":
plant_thread()

View File

@ -1,90 +1,21 @@
#!/usr/bin/env python3
import os
os.environ['NOCRASH'] = '1'
import unittest
import matplotlib
matplotlib.use('svg')
from selfdrive.config import Conversions as CV
from selfdrive.car.honda.values import CruiseButtons as CB
from common.params import Params
from selfdrive.test.longitudinal_maneuvers.maneuver import Maneuver
from selfdrive.manager.process_config import managed_processes
from common.file_helpers import mkdirs_exists_ok
from common.params import Params
def check_no_collision(log):
return min(log['d_rel']) > 0
def check_fcw(log):
return any(log['fcw'])
def check_engaged(log):
return log['controls_state_msgs'][-1][-1].active
def put_default_car_params():
from selfdrive.car.honda.values import CAR
from selfdrive.car.honda.interface import CarInterface
cp = CarInterface.get_params(CAR.CIVIC)
Params().put("CarParams", cp.to_bytes())
# TODO: make new FCW tests
maneuvers = [
Maneuver(
'while cruising at 40 mph, change cruise speed to 50mph',
duration=30.,
initial_speed=40. * CV.MPH_TO_MS,
cruise_button_presses=[(CB.DECEL_SET, 2.), (0, 2.3),
(CB.RES_ACCEL, 10.), (0, 10.1),
(CB.RES_ACCEL, 10.2), (0, 10.3)],
checks=[check_engaged],
),
Maneuver(
'while cruising at 60 mph, change cruise speed to 50mph',
duration=30.,
initial_speed=60. * CV.MPH_TO_MS,
cruise_button_presses=[(CB.DECEL_SET, 2.), (0, 2.3),
(CB.DECEL_SET, 10.), (0, 10.1),
(CB.DECEL_SET, 10.2), (0, 10.3)],
checks=[check_engaged],
),
Maneuver(
'while cruising at 20mph, uphill grade of 10%',
duration=25.,
initial_speed=20. * CV.MPH_TO_MS,
cruise_button_presses=[(CB.DECEL_SET, 1.2), (0, 1.3)],
grade_values=[0., 0., .1],
grade_breakpoints=[0., 10., 11.],
checks=[check_engaged],
),
Maneuver(
'while cruising at 20mph, downhill grade of -10%',
duration=25.,
initial_speed=20. * CV.MPH_TO_MS,
cruise_button_presses=[(CB.DECEL_SET, 1.2), (0, 1.3)],
grade_values=[0., 0., -.1],
grade_breakpoints=[0., 10., 11.],
checks=[check_engaged],
),
Maneuver(
'approaching a 40mph car while cruising at 60mph from 100m away',
duration=30.,
initial_speed=60. * CV.MPH_TO_MS,
lead_relevancy=True,
initial_distance_lead=100.,
speed_lead_values=[40. * CV.MPH_TO_MS, 40. * CV.MPH_TO_MS],
speed_lead_breakpoints=[0., 100.],
cruise_button_presses=[(CB.DECEL_SET, 1.2), (0, 1.3)],
checks=[check_engaged, check_no_collision],
),
Maneuver(
'approaching a 0mph car while cruising at 40mph from 150m away',
duration=30.,
initial_speed=40. * CV.MPH_TO_MS,
lead_relevancy=True,
initial_distance_lead=150.,
speed_lead_values=[0. * CV.MPH_TO_MS, 0. * CV.MPH_TO_MS],
speed_lead_breakpoints=[0., 100.],
cruise_button_presses=[(CB.DECEL_SET, 1.2), (0, 1.3)],
checks=[check_engaged, check_no_collision],
),
Maneuver(
'steady state following a car at 20m/s, then lead decel to 0mph at 1m/s^2',
duration=50.,
@ -93,8 +24,6 @@ maneuvers = [
initial_distance_lead=35.,
speed_lead_values=[20., 20., 0.],
speed_lead_breakpoints=[0., 15., 35.0],
cruise_button_presses=[(CB.DECEL_SET, 1.2), (0, 1.3)],
checks=[check_engaged, check_no_collision],
),
Maneuver(
'steady state following a car at 20m/s, then lead decel to 0mph at 2m/s^2',
@ -104,8 +33,6 @@ maneuvers = [
initial_distance_lead=35.,
speed_lead_values=[20., 20., 0.],
speed_lead_breakpoints=[0., 15., 25.0],
cruise_button_presses=[(CB.DECEL_SET, 1.2), (0, 1.3)],
checks=[check_engaged, check_no_collision],
),
Maneuver(
'steady state following a car at 20m/s, then lead decel to 0mph at 3m/s^2',
@ -115,8 +42,6 @@ maneuvers = [
initial_distance_lead=35.,
speed_lead_values=[20., 20., 0.],
speed_lead_breakpoints=[0., 15., 21.66],
cruise_button_presses=[(CB.DECEL_SET, 1.2), (0, 1.3)],
checks=[check_engaged, check_fcw],
),
Maneuver(
'steady state following a car at 20m/s, then lead decel to 0mph at 5m/s^2',
@ -126,195 +51,19 @@ maneuvers = [
initial_distance_lead=35.,
speed_lead_values=[20., 20., 0.],
speed_lead_breakpoints=[0., 15., 19.],
cruise_button_presses=[(CB.DECEL_SET, 1.2), (0, 1.3)],
checks=[check_engaged, check_fcw],
),
Maneuver(
'starting at 0mph, approaching a stopped car 100m away',
"approach stopped car at 20m/s",
duration=30.,
initial_speed=0.,
initial_speed=20.,
lead_relevancy=True,
initial_distance_lead=100.,
cruise_button_presses=[(CB.DECEL_SET, 1.2), (0, 1.3),
(CB.RES_ACCEL, 1.4), (0.0, 1.5),
(CB.RES_ACCEL, 1.6), (0.0, 1.7),
(CB.RES_ACCEL, 1.8), (0.0, 1.9)],
checks=[check_engaged, check_no_collision],
),
Maneuver(
"following a car at 60mph, lead accel and decel at 0.5m/s^2 every 2s",
duration=25.,
initial_speed=30.,
lead_relevancy=True,
initial_distance_lead=49.,
speed_lead_values=[30., 30., 29., 31., 29., 31., 29.],
speed_lead_breakpoints=[0., 6., 8., 12., 16., 20., 24.],
cruise_button_presses=[(CB.DECEL_SET, 1.2), (0, 1.3),
(CB.RES_ACCEL, 1.4), (0.0, 1.5),
(CB.RES_ACCEL, 1.6), (0.0, 1.7)],
checks=[check_engaged, check_no_collision],
),
Maneuver(
"following a car at 10mph, stop and go at 1m/s2 lead dece1 and accel",
duration=70.,
initial_speed=10.,
lead_relevancy=True,
initial_distance_lead=20.,
speed_lead_values=[10., 0., 0., 10., 0., 10.],
speed_lead_breakpoints=[10., 20., 30., 40., 50., 60.],
cruise_button_presses=[(CB.DECEL_SET, 1.2), (0, 1.3),
(CB.RES_ACCEL, 1.4), (0.0, 1.5),
(CB.RES_ACCEL, 1.6), (0.0, 1.7)],
checks=[check_engaged, check_no_collision],
),
Maneuver(
"green light: stopped behind lead car, lead car accelerates at 1.5 m/s",
duration=30.,
initial_speed=0.,
lead_relevancy=True,
initial_distance_lead=4.,
speed_lead_values=[0, 0, 45],
speed_lead_breakpoints=[0, 10., 40.],
cruise_button_presses=[(CB.DECEL_SET, 1.2), (0, 1.3),
(CB.RES_ACCEL, 1.4), (0.0, 1.5),
(CB.RES_ACCEL, 1.6), (0.0, 1.7),
(CB.RES_ACCEL, 1.8), (0.0, 1.9),
(CB.RES_ACCEL, 2.0), (0.0, 2.1),
(CB.RES_ACCEL, 2.2), (0.0, 2.3)],
checks=[check_engaged, check_no_collision],
),
Maneuver(
"stop and go with 1m/s2 lead decel and accel, with full stops",
duration=70.,
initial_speed=0.,
lead_relevancy=True,
initial_distance_lead=20.,
speed_lead_values=[10., 0., 0., 10., 0., 0.],
speed_lead_breakpoints=[10., 20., 30., 40., 50., 60.],
cruise_button_presses=[(CB.DECEL_SET, 1.2), (0, 1.3),
(CB.RES_ACCEL, 1.4), (0.0, 1.5),
(CB.RES_ACCEL, 1.6), (0.0, 1.7)],
checks=[check_engaged, check_no_collision],
),
Maneuver(
"stop and go with 1.5m/s2 lead accel and 3.3m/s^2 lead decel, with full stops",
duration=45.,
initial_speed=0.,
lead_relevancy=True,
initial_distance_lead=20.,
speed_lead_values=[10., 0., 0., 10., 0., 0.],
speed_lead_breakpoints=[10., 13., 26., 33., 36., 45.],
cruise_button_presses=[(CB.DECEL_SET, 1.2), (0, 1.3),
(CB.RES_ACCEL, 1.4), (0.0, 1.5),
(CB.RES_ACCEL, 1.6), (0.0, 1.7)],
checks=[check_engaged, check_no_collision],
),
Maneuver(
"accelerate from 20 while lead vehicle decelerates from 40 to 20 at 1m/s2",
duration=30.,
initial_speed=10.,
lead_relevancy=True,
initial_distance_lead=10.,
speed_lead_values=[20., 10.],
initial_distance_lead=50.,
speed_lead_values=[0., 0.],
speed_lead_breakpoints=[1., 11.],
cruise_button_presses=[(CB.DECEL_SET, 1.2), (0, 1.3),
(CB.RES_ACCEL, 1.4), (0.0, 1.5),
(CB.RES_ACCEL, 1.6), (0.0, 1.7),
(CB.RES_ACCEL, 1.8), (0.0, 1.9),
(CB.RES_ACCEL, 2.0), (0.0, 2.1),
(CB.RES_ACCEL, 2.2), (0.0, 2.3)],
checks=[check_engaged, check_no_collision],
),
Maneuver(
"accelerate from 20 while lead vehicle decelerates from 40 to 0 at 2m/s2",
duration=30.,
initial_speed=10.,
lead_relevancy=True,
initial_distance_lead=10.,
speed_lead_values=[20., 0.],
speed_lead_breakpoints=[1., 11.],
cruise_button_presses=[(CB.DECEL_SET, 1.2), (0, 1.3),
(CB.RES_ACCEL, 1.4), (0.0, 1.5),
(CB.RES_ACCEL, 1.6), (0.0, 1.7),
(CB.RES_ACCEL, 1.8), (0.0, 1.9),
(CB.RES_ACCEL, 2.0), (0.0, 2.1),
(CB.RES_ACCEL, 2.2), (0.0, 2.3)],
checks=[check_engaged, check_no_collision],
),
Maneuver(
"fcw: traveling at 30 m/s and approaching lead traveling at 20m/s",
duration=15.,
initial_speed=30.,
lead_relevancy=True,
initial_distance_lead=100.,
speed_lead_values=[20.],
speed_lead_breakpoints=[1.],
cruise_button_presses=[],
checks=[check_fcw],
),
Maneuver(
"fcw: traveling at 20 m/s following a lead that decels from 20m/s to 0 at 1m/s2",
duration=18.,
initial_speed=20.,
lead_relevancy=True,
initial_distance_lead=35.,
speed_lead_values=[20., 0.],
speed_lead_breakpoints=[3., 23.],
cruise_button_presses=[],
checks=[check_fcw],
),
Maneuver(
"fcw: traveling at 20 m/s following a lead that decels from 20m/s to 0 at 3m/s2",
duration=13.,
initial_speed=20.,
lead_relevancy=True,
initial_distance_lead=35.,
speed_lead_values=[20., 0.],
speed_lead_breakpoints=[3., 9.6],
cruise_button_presses=[],
checks=[check_fcw],
),
Maneuver(
"fcw: traveling at 20 m/s following a lead that decels from 20m/s to 0 at 5m/s2",
duration=8.,
initial_speed=20.,
lead_relevancy=True,
initial_distance_lead=35.,
speed_lead_values=[20., 0.],
speed_lead_breakpoints=[3., 7.],
cruise_button_presses=[],
checks=[check_fcw],
)
]
def setup_output():
output_dir = os.path.join(os.getcwd(), 'out/longitudinal')
if not os.path.exists(os.path.join(output_dir, "index.html")):
# write test output header
css_style = """
.maneuver_title {
font-size: 24px;
text-align: center;
}
.maneuver_graph {
width: 100%;
}
"""
view_html = "<html><head><style>%s</style></head><body><table>" % (css_style,)
for i, man in enumerate(maneuvers):
view_html += "<tr><td class='maneuver_title' colspan=5><div>%s</div></td></tr><tr>" % (man.title,)
for c in ['distance.svg', 'speeds.svg', 'acceleration.svg', 'pedals.svg', 'pid.svg']:
view_html += "<td><img class='maneuver_graph' src='%s'/></td>" % (os.path.join("maneuver" + str(i + 1).zfill(2), c), )
view_html += "</tr>"
mkdirs_exists_ok(output_dir)
with open(os.path.join(output_dir, "index.html"), "w") as f:
f.write(view_html)
class LongitudinalControl(unittest.TestCase):
@classmethod
def setUpClass(cls):
@ -322,8 +71,6 @@ class LongitudinalControl(unittest.TestCase):
os.environ['SKIP_FW_QUERY'] = "1"
os.environ['NO_CAN_TIMEOUT'] = "1"
setup_output()
params = Params()
params.clear_all()
params.put_bool("Passive", bool(os.getenv("PASSIVE")))
@ -336,35 +83,22 @@ class LongitudinalControl(unittest.TestCase):
def run_maneuver_worker(k):
man = maneuvers[k]
output_dir = os.path.join(os.getcwd(), 'out/longitudinal')
def run(self):
man = maneuvers[k]
print(man.title)
valid = False
put_default_car_params()
managed_processes['plannerd'].start()
for _ in range(3):
managed_processes['radard'].start()
managed_processes['controlsd'].start()
managed_processes['plannerd'].start()
plot, valid = man.evaluate()
plot.write_plot(output_dir, "maneuver" + str(k + 1).zfill(2))
managed_processes['radard'].stop()
managed_processes['controlsd'].stop()
managed_processes['plannerd'].stop()
if valid:
break
valid = man.evaluate()
managed_processes['plannerd'].stop()
self.assertTrue(valid)
return run
for k in range(len(maneuvers)):
setattr(LongitudinalControl, "test_longitudinal_maneuvers_%d" % (k + 1), run_maneuver_worker(k))
setattr(LongitudinalControl, f"test_longitudinal_maneuvers_{k+1}",
run_maneuver_worker(k))
if __name__ == "__main__":
unittest.main(failfast=True)

View File

@ -1,16 +1,32 @@
#!/usr/bin/env python3
import cereal.messaging as messaging
from opendbc.can.packer import CANPacker
from opendbc.can.parser import CANParser
from selfdrive.boardd.boardd_api_impl import can_list_to_can_capnp # pylint: disable=no-name-in-module,import-error
from selfdrive.car.honda.values import FINGERPRINTS, CAR
from selfdrive.car import crc8_pedal
from selfdrive.test.longitudinal_maneuvers.plant import get_car_can_parser
cp = get_car_can_parser()
packer = CANPacker("honda_civic_touring_2016_can_generated")
rpacker = CANPacker("acura_ilx_2016_nidec")
def get_car_can_parser():
dbc_f = 'honda_civic_touring_2016_can_generated'
signals = [
("STEER_TORQUE", 0xe4, 0),
("STEER_TORQUE_REQUEST", 0xe4, 0),
("COMPUTER_BRAKE", 0x1fa, 0),
("COMPUTER_BRAKE_REQUEST", 0x1fa, 0),
("GAS_COMMAND", 0x200, 0),
]
checks = [
(0xe4, 100),
(0x1fa, 50),
(0x200, 50),
]
return CANParser(dbc_f, signals, checks, 0)
cp = get_car_can_parser()
def can_function(pm, speed, angle, idx, cruise_button, is_engaged):
msg = []