From 6be70a063dfc96b9e9f097f439bdc2e0be54d6d9 Mon Sep 17 00:00:00 2001 From: Shane Smiskol Date: Tue, 25 Jan 2022 04:40:03 -0800 Subject: [PATCH] LateralPlanner: Move lane change logic to another class (#23476) * should work * Revert "should work" This reverts commit dabc2ea17a072a6c835e4bd1c75a2c1e2d10623d. * move lane change logic to DesireHelper class * clean up desires clean up desires * comments * Revert "clean up desires" This reverts commit 7301c921e305fcbd4746b19040631d935d50dfd9. * Update selfdrive/controls/lib/desire_helper.py Co-authored-by: Adeeb Shihadeh * add to files_common Co-authored-by: Adeeb Shihadeh Co-authored-by: Willem Melching --- release/files_common | 13 ++- selfdrive/controls/lib/desire_helper.py | 113 ++++++++++++++++++ selfdrive/controls/lib/lateral_planner.py | 132 +++------------------- 3 files changed, 135 insertions(+), 123 deletions(-) create mode 100644 selfdrive/controls/lib/desire_helper.py diff --git a/release/files_common b/release/files_common index 126bb3b37..c789fc010 100644 --- a/release/files_common +++ b/release/files_common @@ -230,17 +230,18 @@ selfdrive/controls/radard.py selfdrive/controls/lib/__init__.py selfdrive/controls/lib/alertmanager.py selfdrive/controls/lib/alerts_offroad.json -selfdrive/controls/lib/events.py +selfdrive/controls/lib/desire_helper.py selfdrive/controls/lib/drive_helpers.py -selfdrive/controls/lib/latcontrol_pid.py +selfdrive/controls/lib/events.py +selfdrive/controls/lib/lane_planner.py +selfdrive/controls/lib/latcontrol_angle.py selfdrive/controls/lib/latcontrol_indi.py selfdrive/controls/lib/latcontrol_lqr.py -selfdrive/controls/lib/latcontrol_angle.py -selfdrive/controls/lib/longcontrol.py +selfdrive/controls/lib/latcontrol_pid.py selfdrive/controls/lib/lateral_planner.py -selfdrive/controls/lib/lane_planner.py -selfdrive/controls/lib/pid.py +selfdrive/controls/lib/longcontrol.py selfdrive/controls/lib/longitudinal_planner.py +selfdrive/controls/lib/pid.py selfdrive/controls/lib/radar_helpers.py selfdrive/controls/lib/vehicle_model.py diff --git a/selfdrive/controls/lib/desire_helper.py b/selfdrive/controls/lib/desire_helper.py new file mode 100644 index 000000000..c34d143a5 --- /dev/null +++ b/selfdrive/controls/lib/desire_helper.py @@ -0,0 +1,113 @@ +from cereal import log +from common.realtime import DT_MDL +from selfdrive.config import Conversions as CV + +LaneChangeState = log.LateralPlan.LaneChangeState +LaneChangeDirection = log.LateralPlan.LaneChangeDirection + +LANE_CHANGE_SPEED_MIN = 30 * CV.MPH_TO_MS +LANE_CHANGE_TIME_MAX = 10. + +DESIRES = { + LaneChangeDirection.none: { + LaneChangeState.off: log.LateralPlan.Desire.none, + LaneChangeState.preLaneChange: log.LateralPlan.Desire.none, + LaneChangeState.laneChangeStarting: log.LateralPlan.Desire.none, + LaneChangeState.laneChangeFinishing: log.LateralPlan.Desire.none, + }, + LaneChangeDirection.left: { + LaneChangeState.off: log.LateralPlan.Desire.none, + LaneChangeState.preLaneChange: log.LateralPlan.Desire.none, + LaneChangeState.laneChangeStarting: log.LateralPlan.Desire.laneChangeLeft, + LaneChangeState.laneChangeFinishing: log.LateralPlan.Desire.laneChangeLeft, + }, + LaneChangeDirection.right: { + LaneChangeState.off: log.LateralPlan.Desire.none, + LaneChangeState.preLaneChange: log.LateralPlan.Desire.none, + LaneChangeState.laneChangeStarting: log.LateralPlan.Desire.laneChangeRight, + LaneChangeState.laneChangeFinishing: log.LateralPlan.Desire.laneChangeRight, + }, +} + + +class DesireHelper: + def __init__(self): + self.lane_change_state = LaneChangeState.off + self.lane_change_direction = LaneChangeDirection.none + self.lane_change_timer = 0.0 + self.lane_change_ll_prob = 1.0 + self.keep_pulse_timer = 0.0 + self.prev_one_blinker = False + self.desire = log.LateralPlan.Desire.none + + def update(self, carstate, active, lane_change_prob): + v_ego = carstate.vEgo + one_blinker = carstate.leftBlinker != carstate.rightBlinker + below_lane_change_speed = v_ego < LANE_CHANGE_SPEED_MIN + + if not active or self.lane_change_timer > LANE_CHANGE_TIME_MAX: + self.lane_change_state = LaneChangeState.off + self.lane_change_direction = LaneChangeDirection.none + else: + # LaneChangeState.off + if self.lane_change_state == LaneChangeState.off and one_blinker and not self.prev_one_blinker and not below_lane_change_speed: + self.lane_change_state = LaneChangeState.preLaneChange + self.lane_change_ll_prob = 1.0 + + # LaneChangeState.preLaneChange + elif self.lane_change_state == LaneChangeState.preLaneChange: + # Set lane change direction + self.lane_change_direction = LaneChangeDirection.left if \ + carstate.leftBlinker else LaneChangeDirection.right + + torque_applied = carstate.steeringPressed and \ + ((carstate.steeringTorque > 0 and self.lane_change_direction == LaneChangeDirection.left) or + (carstate.steeringTorque < 0 and self.lane_change_direction == LaneChangeDirection.right)) + + blindspot_detected = ((carstate.leftBlindspot and self.lane_change_direction == LaneChangeDirection.left) or + (carstate.rightBlindspot and self.lane_change_direction == LaneChangeDirection.right)) + + if not one_blinker or below_lane_change_speed: + self.lane_change_state = LaneChangeState.off + elif torque_applied and not blindspot_detected: + self.lane_change_state = LaneChangeState.laneChangeStarting + + # LaneChangeState.laneChangeStarting + elif self.lane_change_state == LaneChangeState.laneChangeStarting: + # fade out over .5s + self.lane_change_ll_prob = max(self.lane_change_ll_prob - 2 * DT_MDL, 0.0) + + # 98% certainty + if lane_change_prob < 0.02 and self.lane_change_ll_prob < 0.01: + self.lane_change_state = LaneChangeState.laneChangeFinishing + + # LaneChangeState.laneChangeFinishing + elif self.lane_change_state == LaneChangeState.laneChangeFinishing: + # fade in laneline over 1s + self.lane_change_ll_prob = min(self.lane_change_ll_prob + DT_MDL, 1.0) + + if self.lane_change_ll_prob > 0.99: + self.lane_change_direction = LaneChangeDirection.none + if one_blinker: + self.lane_change_state = LaneChangeState.preLaneChange + else: + self.lane_change_state = LaneChangeState.off + + if self.lane_change_state in (LaneChangeState.off, LaneChangeState.preLaneChange): + self.lane_change_timer = 0.0 + else: + self.lane_change_timer += DT_MDL + + self.prev_one_blinker = one_blinker + + self.desire = DESIRES[self.lane_change_direction][self.lane_change_state] + + # Send keep pulse once per second during LaneChangeStart.preLaneChange + if self.lane_change_state in (LaneChangeState.off, LaneChangeState.laneChangeStarting): + self.keep_pulse_timer = 0.0 + elif self.lane_change_state == LaneChangeState.preLaneChange: + self.keep_pulse_timer += DT_MDL + if self.keep_pulse_timer > 1.0: + self.keep_pulse_timer = 0.0 + elif self.desire in (log.LateralPlan.Desire.keepLeft, log.LateralPlan.Desire.keepRight): + self.desire = log.LateralPlan.Desire.none diff --git a/selfdrive/controls/lib/lateral_planner.py b/selfdrive/controls/lib/lateral_planner.py index 31bd42a84..a9c641139 100644 --- a/selfdrive/controls/lib/lateral_planner.py +++ b/selfdrive/controls/lib/lateral_planner.py @@ -5,54 +5,20 @@ from selfdrive.swaglog import cloudlog from selfdrive.controls.lib.lateral_mpc_lib.lat_mpc import LateralMpc from selfdrive.controls.lib.drive_helpers import CONTROL_N, MPC_COST_LAT, LAT_MPC_N, CAR_ROTATION_RADIUS from selfdrive.controls.lib.lane_planner import LanePlanner, TRAJECTORY_SIZE -from selfdrive.config import Conversions as CV +from selfdrive.controls.lib.desire_helper import DesireHelper import cereal.messaging as messaging from cereal import log -LaneChangeState = log.LateralPlan.LaneChangeState -LaneChangeDirection = log.LateralPlan.LaneChangeDirection - -LANE_CHANGE_SPEED_MIN = 30 * CV.MPH_TO_MS -LANE_CHANGE_TIME_MAX = 10. - -DESIRES = { - LaneChangeDirection.none: { - LaneChangeState.off: log.LateralPlan.Desire.none, - LaneChangeState.preLaneChange: log.LateralPlan.Desire.none, - LaneChangeState.laneChangeStarting: log.LateralPlan.Desire.none, - LaneChangeState.laneChangeFinishing: log.LateralPlan.Desire.none, - }, - LaneChangeDirection.left: { - LaneChangeState.off: log.LateralPlan.Desire.none, - LaneChangeState.preLaneChange: log.LateralPlan.Desire.none, - LaneChangeState.laneChangeStarting: log.LateralPlan.Desire.laneChangeLeft, - LaneChangeState.laneChangeFinishing: log.LateralPlan.Desire.laneChangeLeft, - }, - LaneChangeDirection.right: { - LaneChangeState.off: log.LateralPlan.Desire.none, - LaneChangeState.preLaneChange: log.LateralPlan.Desire.none, - LaneChangeState.laneChangeStarting: log.LateralPlan.Desire.laneChangeRight, - LaneChangeState.laneChangeFinishing: log.LateralPlan.Desire.laneChangeRight, - }, -} - class LateralPlanner: def __init__(self, CP, use_lanelines=True, wide_camera=False): self.use_lanelines = use_lanelines self.LP = LanePlanner(wide_camera) + self.DH = DesireHelper() self.last_cloudlog_t = 0 self.steer_rate_cost = CP.steerRateCost - self.solution_invalid_cnt = 0 - self.lane_change_state = LaneChangeState.off - self.lane_change_direction = LaneChangeDirection.none - self.lane_change_timer = 0.0 - self.lane_change_ll_prob = 1.0 - self.keep_pulse_timer = 0.0 - self.prev_one_blinker = False - self.desire = log.LateralPlan.Desire.none self.path_xyz = np.zeros((TRAJECTORY_SIZE, 3)) self.path_xyz_stds = np.ones((TRAJECTORY_SIZE, 3)) @@ -69,11 +35,11 @@ class LateralPlanner: def update(self, sm): v_ego = sm['carState'].vEgo - active = sm['controlsState'].active measured_curvature = sm['controlsState'].curvature + # Parse model predictions md = sm['modelV2'] - self.LP.parse_model(sm['modelV2']) + self.LP.parse_model(md) if len(md.position.x) == TRAJECTORY_SIZE and len(md.orientation.x) == TRAJECTORY_SIZE: self.path_xyz = np.column_stack([md.position.x, md.position.y, md.position.z]) self.t_idxs = np.array(md.position.t) @@ -82,84 +48,15 @@ class LateralPlanner: self.path_xyz_stds = np.column_stack([md.position.xStd, md.position.yStd, md.position.zStd]) # Lane change logic - one_blinker = sm['carState'].leftBlinker != sm['carState'].rightBlinker - below_lane_change_speed = v_ego < LANE_CHANGE_SPEED_MIN - - if (not active) or (self.lane_change_timer > LANE_CHANGE_TIME_MAX): - self.lane_change_state = LaneChangeState.off - self.lane_change_direction = LaneChangeDirection.none - else: - # LaneChangeState.off - if self.lane_change_state == LaneChangeState.off and one_blinker and not self.prev_one_blinker and not below_lane_change_speed: - self.lane_change_state = LaneChangeState.preLaneChange - self.lane_change_ll_prob = 1.0 - - # LaneChangeState.preLaneChange - elif self.lane_change_state == LaneChangeState.preLaneChange: - # Set lane change direction - if sm['carState'].leftBlinker: - self.lane_change_direction = LaneChangeDirection.left - elif sm['carState'].rightBlinker: - self.lane_change_direction = LaneChangeDirection.right - else: # If there are no blinkers we will go back to LaneChangeState.off - self.lane_change_direction = LaneChangeDirection.none - - torque_applied = sm['carState'].steeringPressed and \ - ((sm['carState'].steeringTorque > 0 and self.lane_change_direction == LaneChangeDirection.left) or - (sm['carState'].steeringTorque < 0 and self.lane_change_direction == LaneChangeDirection.right)) - - blindspot_detected = ((sm['carState'].leftBlindspot and self.lane_change_direction == LaneChangeDirection.left) or - (sm['carState'].rightBlindspot and self.lane_change_direction == LaneChangeDirection.right)) - - if not one_blinker or below_lane_change_speed: - self.lane_change_state = LaneChangeState.off - elif torque_applied and not blindspot_detected: - self.lane_change_state = LaneChangeState.laneChangeStarting - - # LaneChangeState.laneChangeStarting - elif self.lane_change_state == LaneChangeState.laneChangeStarting: - # fade out over .5s - self.lane_change_ll_prob = max(self.lane_change_ll_prob - 2 * DT_MDL, 0.0) - - # 98% certainty - lane_change_prob = self.LP.l_lane_change_prob + self.LP.r_lane_change_prob - if lane_change_prob < 0.02 and self.lane_change_ll_prob < 0.01: - self.lane_change_state = LaneChangeState.laneChangeFinishing - - # LaneChangeState.laneChangeFinishing - elif self.lane_change_state == LaneChangeState.laneChangeFinishing: - # fade in laneline over 1s - self.lane_change_ll_prob = min(self.lane_change_ll_prob + DT_MDL, 1.0) - if self.lane_change_ll_prob > 0.99: - self.lane_change_direction = LaneChangeDirection.none - if one_blinker: - self.lane_change_state = LaneChangeState.preLaneChange - else: - self.lane_change_state = LaneChangeState.off - - if self.lane_change_state in (LaneChangeState.off, LaneChangeState.preLaneChange): - self.lane_change_timer = 0.0 - else: - self.lane_change_timer += DT_MDL - - self.prev_one_blinker = one_blinker - - self.desire = DESIRES[self.lane_change_direction][self.lane_change_state] - - # Send keep pulse once per second during LaneChangeStart.preLaneChange - if self.lane_change_state in (LaneChangeState.off, LaneChangeState.laneChangeStarting): - self.keep_pulse_timer = 0.0 - elif self.lane_change_state == LaneChangeState.preLaneChange: - self.keep_pulse_timer += DT_MDL - if self.keep_pulse_timer > 1.0: - self.keep_pulse_timer = 0.0 - elif self.desire in (log.LateralPlan.Desire.keepLeft, log.LateralPlan.Desire.keepRight): - self.desire = log.LateralPlan.Desire.none + lane_change_prob = self.LP.l_lane_change_prob + self.LP.r_lane_change_prob + self.DH.update(sm['carState'], sm['controlsState'].active, lane_change_prob) # Turn off lanes during lane change - if self.desire == log.LateralPlan.Desire.laneChangeRight or self.desire == log.LateralPlan.Desire.laneChangeLeft: - self.LP.lll_prob *= self.lane_change_ll_prob - self.LP.rll_prob *= self.lane_change_ll_prob + if self.DH.desire == log.LateralPlan.Desire.laneChangeRight or self.DH.desire == log.LateralPlan.Desire.laneChangeLeft: + self.LP.lll_prob *= self.DH.lane_change_ll_prob + self.LP.rll_prob *= self.DH.lane_change_ll_prob + + # Calculate final driving path and set MPC costs if self.use_lanelines: d_path_xyz = self.LP.get_d_path(v_ego, self.t_idxs, self.path_xyz) self.lat_mpc.set_weights(MPC_COST_LAT.PATH, MPC_COST_LAT.HEADING, self.steer_rate_cost) @@ -169,6 +66,7 @@ class LateralPlanner: # Heading cost is useful at low speed, otherwise end of plan can be off-heading heading_cost = interp(v_ego, [5.0, 10.0], [MPC_COST_LAT.HEADING, 0.0]) self.lat_mpc.set_weights(path_cost, heading_cost, self.steer_rate_cost) + y_pts = np.interp(v_ego * self.t_idxs[:LAT_MPC_N + 1], np.linalg.norm(d_path_xyz, axis=1), d_path_xyz[:, 1]) heading_pts = np.interp(v_ego * self.t_idxs[:LAT_MPC_N + 1], np.linalg.norm(self.path_xyz, axis=1), self.plan_yaw) self.y_pts = y_pts @@ -217,9 +115,9 @@ class LateralPlanner: lateralPlan.mpcSolutionValid = bool(plan_solution_valid) lateralPlan.solverExecutionTime = self.lat_mpc.solve_time - lateralPlan.desire = self.desire + lateralPlan.desire = self.DH.desire lateralPlan.useLaneLines = self.use_lanelines - lateralPlan.laneChangeState = self.lane_change_state - lateralPlan.laneChangeDirection = self.lane_change_direction + lateralPlan.laneChangeState = self.DH.lane_change_state + lateralPlan.laneChangeDirection = self.DH.lane_change_direction pm.send('lateralPlan', plan_send)