openpilot/selfdrive/car/ocelot/carcontroller.py

130 lines
4.5 KiB
Python

from cereal import car
from common.numpy_fast import clip
from selfdrive.car import apply_toyota_steer_torque_limits, make_can_msg
from selfdrive.car.ocelot.ocelotcan import create_steer_command, create_gas_command, create_brake_cmd
from selfdrive.car.ocelot.values import CAR, SteerLimitParams
from opendbc.can.packer import CANPacker
VisualAlert = car.CarControl.HUDControl.VisualAlert
# Accel limits
ACCEL_HYST_GAP = 0.02 # don't change accel command for small oscilalitons within this value
ACCEL_MAX = 1.5 # 1.5 m/s2
ACCEL_MIN = -3.0 # 3 m/s2
ACCEL_SCALE = max(ACCEL_MAX, -ACCEL_MIN)
def accel_hysteresis(accel, accel_steady, enabled):
# for small accel oscillations within ACCEL_HYST_GAP, don't change the accel command
if not enabled:
# send 0 when disabled, otherwise acc faults
accel_steady = 0.
elif accel > accel_steady + ACCEL_HYST_GAP:
accel_steady = accel - ACCEL_HYST_GAP
elif accel < accel_steady - ACCEL_HYST_GAP:
accel_steady = accel + ACCEL_HYST_GAP
accel = accel_steady
return accel, accel_steady
class CarController():
def __init__(self, dbc_name, CP, VM):
self.last_steer = 0
self.accel_steady = 0.
self.alert_active = False
self.last_standstill = False
self.standstill_req = False
self.last_fault_frame = -200
self.steer_rate_limited = False
self.last_brake = 0.
self.packer = CANPacker(dbc_name)
def update(self, enabled, CS, frame, actuators, pcm_cancel_cmd, hud_alert,
left_line, right_line, lead, left_lane_depart, right_lane_depart):
# *** compute control surfaces ***
# gas and brake
apply_gas = clip(actuators.gas, 0., 1.)
if CS.CP.enableGasInterceptor:
# send only negative accel if interceptor is detected. otherwise, send the regular value
# +0.06 offset to reduce ABS pump usage when OP is engaged
apply_accel = 0.06 - actuators.brake
else:
apply_accel = actuators.gas - actuators.brake
apply_accel, self.accel_steady = accel_hysteresis(apply_accel, self.accel_steady, enabled)
apply_accel = clip(apply_accel * ACCEL_SCALE, ACCEL_MIN, ACCEL_MAX)
# steer torque
new_steer = int(round(actuators.steer * SteerLimitParams.STEER_MAX))
apply_steer = apply_toyota_steer_torque_limits(new_steer, self.last_steer, CS.out.steeringTorqueEps, SteerLimitParams)
self.steer_rate_limited = new_steer != apply_steer
# # only cut torque when steer state is a known fault
# if CS.steer_state in [9, 25]:
# self.last_fault_frame = frame
# Cut steering for 2s after fault
if not enabled: #or (frame - self.last_fault_frame < 200):
apply_steer = 0
apply_steer_req = 0
else:
apply_steer_req = 1
# on entering standstill, send standstill request
if CS.out.standstill and not self.last_standstill:
self.standstill_req = True
self.last_steer = apply_steer
self.last_accel = apply_accel
self.last_standstill = CS.out.standstill
can_sends = []
#*** control msgs ***
#print("steer {0} {1} {2} {3}".format(apply_steer, min_lim, max_lim, CS.steer_torque_motor)
# toyota can trace shows this message at 42Hz, with counter adding alternatively 1 and 2;
# sending it at 100Hz seem to allow a higher rate limit, as the rate limit seems imposed
# on consecutive messages
can_sends.append(create_steer_command(self.packer, apply_steer, apply_steer_req, frame))
if (frame % 2 == 0):
# send exactly zero if apply_gas is zero. Interceptor will send the max between read value and apply_gas.
# This prevents unexpected pedal range rescaling
can_sends.append(create_gas_command(self.packer, apply_gas, frame//2))
can_sends.append(create_brake_cmd(self.packer, enabled, actuators.brake, frame//2))
# ui mesg is at 100Hz but we send asap if:
# - there is something to display
# - there is something to stop displaying
fcw_alert = hud_alert == VisualAlert.fcw
steer_alert = hud_alert == VisualAlert.steerRequired
send_ui = False
if ((fcw_alert or steer_alert) and not self.alert_active) or \
(not (fcw_alert or steer_alert) and self.alert_active):
send_ui = True
self.alert_active = not self.alert_active
elif pcm_cancel_cmd:
# forcing the pcm to disengage causes a bad fault sound so play a good sound instead
send_ui = True
# #*** static msgs ***
# for (addr, ecu, cars, bus, fr_step, vl) in STATIC_MSGS:
# if frame % fr_step == 0 and ecu in self.fake_ecus and CS.CP.carFingerprint in cars:
# can_sends.append(make_can_msg(addr, vl, bus))
return can_sends