Clean up planner files (#23031)
* clean up planner files clean up planner files * fix plant instancepull/23053/head
parent
1d323e0fd6
commit
93fd662adf
|
@ -38,7 +38,7 @@ DESIRES = {
|
|||
}
|
||||
|
||||
|
||||
class LateralPlanner():
|
||||
class LateralPlanner:
|
||||
def __init__(self, CP, use_lanelines=True, wide_camera=False):
|
||||
self.use_lanelines = use_lanelines
|
||||
self.LP = LanePlanner(wide_camera)
|
||||
|
@ -55,8 +55,8 @@ class LateralPlanner():
|
|||
self.prev_one_blinker = False
|
||||
self.desire = log.LateralPlan.Desire.none
|
||||
|
||||
self.path_xyz = np.zeros((TRAJECTORY_SIZE,3))
|
||||
self.path_xyz_stds = np.ones((TRAJECTORY_SIZE,3))
|
||||
self.path_xyz = np.zeros((TRAJECTORY_SIZE, 3))
|
||||
self.path_xyz_stds = np.ones((TRAJECTORY_SIZE, 3))
|
||||
self.plan_yaw = np.zeros((TRAJECTORY_SIZE,))
|
||||
self.t_idxs = np.arange(TRAJECTORY_SIZE)
|
||||
self.y_pts = np.zeros(TRAJECTORY_SIZE)
|
||||
|
@ -67,12 +67,8 @@ class LateralPlanner():
|
|||
def reset_mpc(self, x0=np.zeros(6)):
|
||||
self.x0 = x0
|
||||
self.lat_mpc.reset(x0=self.x0)
|
||||
self.desired_curvature = 0.0
|
||||
self.safe_desired_curvature = 0.0
|
||||
self.desired_curvature_rate = 0.0
|
||||
self.safe_desired_curvature_rate = 0.0
|
||||
|
||||
def update(self, sm, CP):
|
||||
def update(self, sm):
|
||||
v_ego = sm['carState'].vEgo
|
||||
active = sm['controlsState'].active
|
||||
measured_curvature = sm['controlsState'].curvature
|
||||
|
@ -110,7 +106,7 @@ class LateralPlanner():
|
|||
self.lane_change_direction = LaneChangeDirection.none
|
||||
|
||||
torque_applied = sm['carState'].steeringPressed and \
|
||||
((sm['carState'].steeringTorque > 0 and self.lane_change_direction == LaneChangeDirection.left) or
|
||||
((sm['carState'].steeringTorque > 0 and self.lane_change_direction == LaneChangeDirection.left) or
|
||||
(sm['carState'].steeringTorque < 0 and self.lane_change_direction == LaneChangeDirection.right))
|
||||
|
||||
blindspot_detected = ((sm['carState'].leftBlindspot and self.lane_change_direction == LaneChangeDirection.left) or
|
||||
|
@ -124,7 +120,7 @@ class LateralPlanner():
|
|||
# LaneChangeState.laneChangeStarting
|
||||
elif self.lane_change_state == LaneChangeState.laneChangeStarting:
|
||||
# fade out over .5s
|
||||
self.lane_change_ll_prob = max(self.lane_change_ll_prob - 2*DT_MDL, 0.0)
|
||||
self.lane_change_ll_prob = max(self.lane_change_ll_prob - 2 * DT_MDL, 0.0)
|
||||
|
||||
# 98% certainty
|
||||
lane_change_prob = self.LP.l_lane_change_prob + self.LP.r_lane_change_prob
|
||||
|
@ -167,14 +163,14 @@ class LateralPlanner():
|
|||
self.LP.rll_prob *= self.lane_change_ll_prob
|
||||
if self.use_lanelines:
|
||||
d_path_xyz = self.LP.get_d_path(v_ego, self.t_idxs, self.path_xyz)
|
||||
self.lat_mpc.set_weights(MPC_COST_LAT.PATH, MPC_COST_LAT.HEADING, CP.steerRateCost)
|
||||
self.lat_mpc.set_weights(MPC_COST_LAT.PATH, MPC_COST_LAT.HEADING, self.steer_rate_cost)
|
||||
else:
|
||||
d_path_xyz = self.path_xyz
|
||||
path_cost = np.clip(abs(self.path_xyz[0,1]/self.path_xyz_stds[0,1]), 0.5, 1.5) * MPC_COST_LAT.PATH
|
||||
path_cost = np.clip(abs(self.path_xyz[0, 1] / self.path_xyz_stds[0, 1]), 0.5, 1.5) * MPC_COST_LAT.PATH
|
||||
# Heading cost is useful at low speed, otherwise end of plan can be off-heading
|
||||
heading_cost = interp(v_ego, [5.0, 10.0], [MPC_COST_LAT.HEADING, 0.0])
|
||||
self.lat_mpc.set_weights(path_cost, heading_cost, CP.steerRateCost)
|
||||
y_pts = np.interp(v_ego * self.t_idxs[:LAT_MPC_N + 1], np.linalg.norm(d_path_xyz, axis=1), d_path_xyz[:,1])
|
||||
self.lat_mpc.set_weights(path_cost, heading_cost, self.steer_rate_cost)
|
||||
y_pts = np.interp(v_ego * self.t_idxs[:LAT_MPC_N + 1], np.linalg.norm(d_path_xyz, axis=1), d_path_xyz[:, 1])
|
||||
heading_pts = np.interp(v_ego * self.t_idxs[:LAT_MPC_N + 1], np.linalg.norm(self.path_xyz, axis=1), self.plan_yaw)
|
||||
self.y_pts = y_pts
|
||||
|
||||
|
@ -187,11 +183,10 @@ class LateralPlanner():
|
|||
y_pts,
|
||||
heading_pts)
|
||||
# init state for next
|
||||
self.x0[3] = interp(DT_MDL, self.t_idxs[:LAT_MPC_N + 1], self.lat_mpc.x_sol[:,3])
|
||||
self.x0[3] = interp(DT_MDL, self.t_idxs[:LAT_MPC_N + 1], self.lat_mpc.x_sol[:, 3])
|
||||
|
||||
|
||||
# Check for infeasable MPC solution
|
||||
mpc_nans = any(math.isnan(x) for x in self.lat_mpc.x_sol[:,3])
|
||||
# Check for infeasible MPC solution
|
||||
mpc_nans = any(math.isnan(x) for x in self.lat_mpc.x_sol[:, 3])
|
||||
t = sec_since_boot()
|
||||
if mpc_nans or self.lat_mpc.solution_status != 0:
|
||||
self.reset_mpc()
|
||||
|
@ -212,8 +207,8 @@ class LateralPlanner():
|
|||
plan_send.lateralPlan.laneWidth = float(self.LP.lane_width)
|
||||
plan_send.lateralPlan.dPathPoints = [float(x) for x in self.y_pts]
|
||||
plan_send.lateralPlan.psis = [float(x) for x in self.lat_mpc.x_sol[0:CONTROL_N, 2]]
|
||||
plan_send.lateralPlan.curvatures = [float(x) for x in self.lat_mpc.x_sol[0:CONTROL_N,3]]
|
||||
plan_send.lateralPlan.curvatureRates = [float(x) for x in self.lat_mpc.u_sol[0:CONTROL_N-1]] +[0.0]
|
||||
plan_send.lateralPlan.curvatures = [float(x) for x in self.lat_mpc.x_sol[0:CONTROL_N, 3]]
|
||||
plan_send.lateralPlan.curvatureRates = [float(x) for x in self.lat_mpc.u_sol[0:CONTROL_N - 1]] + [0.0]
|
||||
plan_send.lateralPlan.lProb = float(self.LP.lll_prob)
|
||||
plan_send.lateralPlan.rProb = float(self.LP.rll_prob)
|
||||
plan_send.lateralPlan.dProb = float(self.LP.d_prob)
|
||||
|
|
|
@ -14,7 +14,7 @@ from selfdrive.controls.lib.drive_helpers import V_CRUISE_MAX, CONTROL_N
|
|||
from selfdrive.swaglog import cloudlog
|
||||
|
||||
LON_MPC_STEP = 0.2 # first step is 0.2s
|
||||
AWARENESS_DECEL = -0.2 # car smoothly decel at .2m/s^2 when user is distracted
|
||||
AWARENESS_DECEL = -0.2 # car smoothly decel at .2m/s^2 when user is distracted
|
||||
A_CRUISE_MIN = -1.2
|
||||
A_CRUISE_MAX_VALS = [1.2, 1.2, 0.8, 0.6]
|
||||
A_CRUISE_MAX_BP = [0., 15., 25., 40.]
|
||||
|
@ -35,13 +35,13 @@ def limit_accel_in_turns(v_ego, angle_steers, a_target, CP):
|
|||
"""
|
||||
|
||||
a_total_max = interp(v_ego, _A_TOTAL_MAX_BP, _A_TOTAL_MAX_V)
|
||||
a_y = v_ego**2 * angle_steers * CV.DEG_TO_RAD / (CP.steerRatio * CP.wheelbase)
|
||||
a_x_allowed = math.sqrt(max(a_total_max**2 - a_y**2, 0.))
|
||||
a_y = v_ego ** 2 * angle_steers * CV.DEG_TO_RAD / (CP.steerRatio * CP.wheelbase)
|
||||
a_x_allowed = math.sqrt(max(a_total_max ** 2 - a_y ** 2, 0.))
|
||||
|
||||
return [a_target[0], min(a_target[1], a_x_allowed)]
|
||||
|
||||
|
||||
class Planner():
|
||||
class Planner:
|
||||
def __init__(self, CP, init_v=0.0, init_a=0.0):
|
||||
self.CP = CP
|
||||
self.mpc = LongitudinalMpc()
|
||||
|
@ -50,14 +50,13 @@ class Planner():
|
|||
|
||||
self.v_desired = init_v
|
||||
self.a_desired = init_a
|
||||
self.alpha = np.exp(-DT_MDL/2.0)
|
||||
self.alpha = np.exp(-DT_MDL / 2.0)
|
||||
|
||||
self.v_desired_trajectory = np.zeros(CONTROL_N)
|
||||
self.a_desired_trajectory = np.zeros(CONTROL_N)
|
||||
self.j_desired_trajectory = np.zeros(CONTROL_N)
|
||||
|
||||
|
||||
def update(self, sm, CP):
|
||||
def update(self, sm):
|
||||
v_ego = sm['carState'].vEgo
|
||||
a_ego = sm['carState'].aEgo
|
||||
|
||||
|
@ -93,7 +92,7 @@ class Planner():
|
|||
self.a_desired_trajectory = np.interp(T_IDXS[:CONTROL_N], T_IDXS_MPC, self.mpc.a_solution)
|
||||
self.j_desired_trajectory = np.interp(T_IDXS[:CONTROL_N], T_IDXS_MPC[:-1], self.mpc.j_solution)
|
||||
|
||||
#TODO counter is only needed because radar is glitchy, remove once radar is gone
|
||||
# TODO counter is only needed because radar is glitchy, remove once radar is gone
|
||||
self.fcw = self.mpc.crash_cnt > 5
|
||||
if self.fcw:
|
||||
cloudlog.info("FCW triggered")
|
||||
|
@ -101,7 +100,7 @@ class Planner():
|
|||
# Interpolate 0.05 seconds and save as starting point for next iteration
|
||||
a_prev = self.a_desired
|
||||
self.a_desired = float(interp(DT_MDL, T_IDXS[:CONTROL_N], self.a_desired_trajectory))
|
||||
self.v_desired = self.v_desired + DT_MDL * (self.a_desired + a_prev)/2.0
|
||||
self.v_desired = self.v_desired + DT_MDL * (self.a_desired + a_prev) / 2.0
|
||||
|
||||
def publish(self, sm, pm):
|
||||
plan_send = messaging.new_message('longitudinalPlan')
|
||||
|
|
|
@ -36,9 +36,9 @@ def plannerd_thread(sm=None, pm=None):
|
|||
sm.update()
|
||||
|
||||
if sm.updated['modelV2']:
|
||||
lateral_planner.update(sm, CP)
|
||||
lateral_planner.update(sm)
|
||||
lateral_planner.publish(sm, pm)
|
||||
longitudinal_planner.update(sm, CP)
|
||||
longitudinal_planner.update(sm)
|
||||
longitudinal_planner.publish(sm, pm)
|
||||
|
||||
|
||||
|
|
|
@ -42,8 +42,7 @@ class Plant():
|
|||
|
||||
from selfdrive.car.honda.values import CAR
|
||||
from selfdrive.car.honda.interface import CarInterface
|
||||
self.CP = CarInterface.get_params(CAR.CIVIC)
|
||||
self.planner = Planner(self.CP, init_v=self.speed)
|
||||
self.planner = Planner(CarInterface.get_params(CAR.CIVIC), init_v=self.speed)
|
||||
|
||||
def current_time(self):
|
||||
return float(self.rk.frame) / self.rate
|
||||
|
@ -97,7 +96,7 @@ class Plant():
|
|||
sm = {'radarState': radar.radarState,
|
||||
'carState': car_state.carState,
|
||||
'controlsState': control.controlsState}
|
||||
self.planner.update(sm, self.CP)
|
||||
self.planner.update(sm)
|
||||
self.speed = self.planner.v_desired
|
||||
self.acceleration = self.planner.a_desired
|
||||
fcw = self.planner.fcw
|
||||
|
|
Loading…
Reference in New Issue