from cereal import car from common.numpy_fast import clip from selfdrive.car import apply_toyota_steer_torque_limits from selfdrive.car.ocelot.ocelotcan import create_steer_command, create_gas_command, create_brake_cmd from selfdrive.car.ocelot.values import SteerLimitParams from opendbc.can.packer import CANPacker VisualAlert = car.CarControl.HUDControl.VisualAlert # Accel limits ACCEL_HYST_GAP = 0.02 # don't change accel command for small oscilalitons within this value ACCEL_MAX = 1.5 # 1.5 m/s2 ACCEL_MIN = -3.0 # 3 m/s2 ACCEL_SCALE = max(ACCEL_MAX, -ACCEL_MIN) def accel_hysteresis(accel, accel_steady, enabled): # for small accel oscillations within ACCEL_HYST_GAP, don't change the accel command if not enabled: # send 0 when disabled, otherwise acc faults accel_steady = 0. elif accel > accel_steady + ACCEL_HYST_GAP: accel_steady = accel - ACCEL_HYST_GAP elif accel < accel_steady - ACCEL_HYST_GAP: accel_steady = accel + ACCEL_HYST_GAP accel = accel_steady return accel, accel_steady class CarController(): def __init__(self, dbc_name, CP, VM): self.last_steer = 0 self.accel_steady = 0. self.alert_active = False self.last_standstill = False self.standstill_req = False self.last_fault_frame = -200 self.steer_rate_limited = False self.last_brake = 0. self.packer = CANPacker(dbc_name) def update(self, enabled, CS, frame, actuators, pcm_cancel_cmd, hud_alert, left_line, right_line, lead, left_lane_depart, right_lane_depart): # *** compute control surfaces *** # gas and brake apply_gas = clip(actuators.gas, 0., 1.) if CS.CP.enableGasInterceptor: # send only negative accel if interceptor is detected. otherwise, send the regular value # +0.06 offset to reduce ABS pump usage when OP is engaged apply_accel = 0.06 - actuators.brake else: apply_accel = actuators.gas - actuators.brake apply_accel, self.accel_steady = accel_hysteresis(apply_accel, self.accel_steady, enabled) apply_accel = clip(apply_accel * ACCEL_SCALE, ACCEL_MIN, ACCEL_MAX) # steer torque new_steer = int(round(actuators.steer * SteerLimitParams.STEER_MAX)) apply_steer = apply_toyota_steer_torque_limits(new_steer, self.last_steer, CS.out.steeringTorqueEps, SteerLimitParams) self.steer_rate_limited = new_steer != apply_steer # # only cut torque when steer state is a known fault # if CS.steer_state in [9, 25]: # self.last_fault_frame = frame # Cut steering for 2s after fault if not enabled: #or (frame - self.last_fault_frame < 200): apply_steer = 0 apply_steer_req = 0 else: apply_steer_req = 1 # on entering standstill, send standstill request if CS.out.standstill and not self.last_standstill: self.standstill_req = True self.last_steer = apply_steer self.last_accel = apply_accel self.last_standstill = CS.out.standstill can_sends = [] #*** control msgs *** #print("steer {0} {1} {2} {3}".format(apply_steer, min_lim, max_lim, CS.steer_torque_motor) # toyota can trace shows this message at 42Hz, with counter adding alternatively 1 and 2; # sending it at 100Hz seem to allow a higher rate limit, as the rate limit seems imposed # on consecutive messages can_sends.append(create_steer_command(self.packer, apply_steer, apply_steer_req, frame)) if (frame % 2 == 0): # send exactly zero if apply_gas is zero. Interceptor will send the max between read value and apply_gas. # This prevents unexpected pedal range rescaling can_sends.append(create_gas_command(self.packer, apply_gas, frame//2)) can_sends.append(create_brake_cmd(self.packer, enabled, actuators.brake, frame)) # ui mesg is at 100Hz but we send asap if: # - there is something to display # - there is something to stop displaying fcw_alert = hud_alert == VisualAlert.fcw steer_alert = hud_alert == VisualAlert.steerRequired if ((fcw_alert or steer_alert) and not self.alert_active) or \ (not (fcw_alert or steer_alert) and self.alert_active): self.alert_active = not self.alert_active # #*** static msgs *** # for (addr, ecu, cars, bus, fr_step, vl) in STATIC_MSGS: # if frame % fr_step == 0 and ecu in self.fake_ecus and CS.CP.carFingerprint in cars: # can_sends.append(make_can_msg(addr, vl, bus)) return can_sends