LateralPlanner: Move lane change logic to another class (#23476)
* should work * Revert "should work" This reverts commit dabc2ea17a072a6c835e4bd1c75a2c1e2d10623d. * move lane change logic to DesireHelper class * clean up desires clean up desires * comments * Revert "clean up desires" This reverts commit 7301c921e305fcbd4746b19040631d935d50dfd9. * Update selfdrive/controls/lib/desire_helper.py Co-authored-by: Adeeb Shihadeh <adeebshihadeh@gmail.com> * add to files_common Co-authored-by: Adeeb Shihadeh <adeebshihadeh@gmail.com> Co-authored-by: Willem Melching <willem.melching@gmail.com>pull/23622/head
parent
31d147dca2
commit
6be70a063d
|
@ -230,17 +230,18 @@ selfdrive/controls/radard.py
|
|||
selfdrive/controls/lib/__init__.py
|
||||
selfdrive/controls/lib/alertmanager.py
|
||||
selfdrive/controls/lib/alerts_offroad.json
|
||||
selfdrive/controls/lib/events.py
|
||||
selfdrive/controls/lib/desire_helper.py
|
||||
selfdrive/controls/lib/drive_helpers.py
|
||||
selfdrive/controls/lib/latcontrol_pid.py
|
||||
selfdrive/controls/lib/events.py
|
||||
selfdrive/controls/lib/lane_planner.py
|
||||
selfdrive/controls/lib/latcontrol_angle.py
|
||||
selfdrive/controls/lib/latcontrol_indi.py
|
||||
selfdrive/controls/lib/latcontrol_lqr.py
|
||||
selfdrive/controls/lib/latcontrol_angle.py
|
||||
selfdrive/controls/lib/longcontrol.py
|
||||
selfdrive/controls/lib/latcontrol_pid.py
|
||||
selfdrive/controls/lib/lateral_planner.py
|
||||
selfdrive/controls/lib/lane_planner.py
|
||||
selfdrive/controls/lib/pid.py
|
||||
selfdrive/controls/lib/longcontrol.py
|
||||
selfdrive/controls/lib/longitudinal_planner.py
|
||||
selfdrive/controls/lib/pid.py
|
||||
selfdrive/controls/lib/radar_helpers.py
|
||||
selfdrive/controls/lib/vehicle_model.py
|
||||
|
||||
|
|
|
@ -0,0 +1,113 @@
|
|||
from cereal import log
|
||||
from common.realtime import DT_MDL
|
||||
from selfdrive.config import Conversions as CV
|
||||
|
||||
LaneChangeState = log.LateralPlan.LaneChangeState
|
||||
LaneChangeDirection = log.LateralPlan.LaneChangeDirection
|
||||
|
||||
LANE_CHANGE_SPEED_MIN = 30 * CV.MPH_TO_MS
|
||||
LANE_CHANGE_TIME_MAX = 10.
|
||||
|
||||
DESIRES = {
|
||||
LaneChangeDirection.none: {
|
||||
LaneChangeState.off: log.LateralPlan.Desire.none,
|
||||
LaneChangeState.preLaneChange: log.LateralPlan.Desire.none,
|
||||
LaneChangeState.laneChangeStarting: log.LateralPlan.Desire.none,
|
||||
LaneChangeState.laneChangeFinishing: log.LateralPlan.Desire.none,
|
||||
},
|
||||
LaneChangeDirection.left: {
|
||||
LaneChangeState.off: log.LateralPlan.Desire.none,
|
||||
LaneChangeState.preLaneChange: log.LateralPlan.Desire.none,
|
||||
LaneChangeState.laneChangeStarting: log.LateralPlan.Desire.laneChangeLeft,
|
||||
LaneChangeState.laneChangeFinishing: log.LateralPlan.Desire.laneChangeLeft,
|
||||
},
|
||||
LaneChangeDirection.right: {
|
||||
LaneChangeState.off: log.LateralPlan.Desire.none,
|
||||
LaneChangeState.preLaneChange: log.LateralPlan.Desire.none,
|
||||
LaneChangeState.laneChangeStarting: log.LateralPlan.Desire.laneChangeRight,
|
||||
LaneChangeState.laneChangeFinishing: log.LateralPlan.Desire.laneChangeRight,
|
||||
},
|
||||
}
|
||||
|
||||
|
||||
class DesireHelper:
|
||||
def __init__(self):
|
||||
self.lane_change_state = LaneChangeState.off
|
||||
self.lane_change_direction = LaneChangeDirection.none
|
||||
self.lane_change_timer = 0.0
|
||||
self.lane_change_ll_prob = 1.0
|
||||
self.keep_pulse_timer = 0.0
|
||||
self.prev_one_blinker = False
|
||||
self.desire = log.LateralPlan.Desire.none
|
||||
|
||||
def update(self, carstate, active, lane_change_prob):
|
||||
v_ego = carstate.vEgo
|
||||
one_blinker = carstate.leftBlinker != carstate.rightBlinker
|
||||
below_lane_change_speed = v_ego < LANE_CHANGE_SPEED_MIN
|
||||
|
||||
if not active or self.lane_change_timer > LANE_CHANGE_TIME_MAX:
|
||||
self.lane_change_state = LaneChangeState.off
|
||||
self.lane_change_direction = LaneChangeDirection.none
|
||||
else:
|
||||
# LaneChangeState.off
|
||||
if self.lane_change_state == LaneChangeState.off and one_blinker and not self.prev_one_blinker and not below_lane_change_speed:
|
||||
self.lane_change_state = LaneChangeState.preLaneChange
|
||||
self.lane_change_ll_prob = 1.0
|
||||
|
||||
# LaneChangeState.preLaneChange
|
||||
elif self.lane_change_state == LaneChangeState.preLaneChange:
|
||||
# Set lane change direction
|
||||
self.lane_change_direction = LaneChangeDirection.left if \
|
||||
carstate.leftBlinker else LaneChangeDirection.right
|
||||
|
||||
torque_applied = carstate.steeringPressed and \
|
||||
((carstate.steeringTorque > 0 and self.lane_change_direction == LaneChangeDirection.left) or
|
||||
(carstate.steeringTorque < 0 and self.lane_change_direction == LaneChangeDirection.right))
|
||||
|
||||
blindspot_detected = ((carstate.leftBlindspot and self.lane_change_direction == LaneChangeDirection.left) or
|
||||
(carstate.rightBlindspot and self.lane_change_direction == LaneChangeDirection.right))
|
||||
|
||||
if not one_blinker or below_lane_change_speed:
|
||||
self.lane_change_state = LaneChangeState.off
|
||||
elif torque_applied and not blindspot_detected:
|
||||
self.lane_change_state = LaneChangeState.laneChangeStarting
|
||||
|
||||
# LaneChangeState.laneChangeStarting
|
||||
elif self.lane_change_state == LaneChangeState.laneChangeStarting:
|
||||
# fade out over .5s
|
||||
self.lane_change_ll_prob = max(self.lane_change_ll_prob - 2 * DT_MDL, 0.0)
|
||||
|
||||
# 98% certainty
|
||||
if lane_change_prob < 0.02 and self.lane_change_ll_prob < 0.01:
|
||||
self.lane_change_state = LaneChangeState.laneChangeFinishing
|
||||
|
||||
# LaneChangeState.laneChangeFinishing
|
||||
elif self.lane_change_state == LaneChangeState.laneChangeFinishing:
|
||||
# fade in laneline over 1s
|
||||
self.lane_change_ll_prob = min(self.lane_change_ll_prob + DT_MDL, 1.0)
|
||||
|
||||
if self.lane_change_ll_prob > 0.99:
|
||||
self.lane_change_direction = LaneChangeDirection.none
|
||||
if one_blinker:
|
||||
self.lane_change_state = LaneChangeState.preLaneChange
|
||||
else:
|
||||
self.lane_change_state = LaneChangeState.off
|
||||
|
||||
if self.lane_change_state in (LaneChangeState.off, LaneChangeState.preLaneChange):
|
||||
self.lane_change_timer = 0.0
|
||||
else:
|
||||
self.lane_change_timer += DT_MDL
|
||||
|
||||
self.prev_one_blinker = one_blinker
|
||||
|
||||
self.desire = DESIRES[self.lane_change_direction][self.lane_change_state]
|
||||
|
||||
# Send keep pulse once per second during LaneChangeStart.preLaneChange
|
||||
if self.lane_change_state in (LaneChangeState.off, LaneChangeState.laneChangeStarting):
|
||||
self.keep_pulse_timer = 0.0
|
||||
elif self.lane_change_state == LaneChangeState.preLaneChange:
|
||||
self.keep_pulse_timer += DT_MDL
|
||||
if self.keep_pulse_timer > 1.0:
|
||||
self.keep_pulse_timer = 0.0
|
||||
elif self.desire in (log.LateralPlan.Desire.keepLeft, log.LateralPlan.Desire.keepRight):
|
||||
self.desire = log.LateralPlan.Desire.none
|
|
@ -5,54 +5,20 @@ from selfdrive.swaglog import cloudlog
|
|||
from selfdrive.controls.lib.lateral_mpc_lib.lat_mpc import LateralMpc
|
||||
from selfdrive.controls.lib.drive_helpers import CONTROL_N, MPC_COST_LAT, LAT_MPC_N, CAR_ROTATION_RADIUS
|
||||
from selfdrive.controls.lib.lane_planner import LanePlanner, TRAJECTORY_SIZE
|
||||
from selfdrive.config import Conversions as CV
|
||||
from selfdrive.controls.lib.desire_helper import DesireHelper
|
||||
import cereal.messaging as messaging
|
||||
from cereal import log
|
||||
|
||||
LaneChangeState = log.LateralPlan.LaneChangeState
|
||||
LaneChangeDirection = log.LateralPlan.LaneChangeDirection
|
||||
|
||||
LANE_CHANGE_SPEED_MIN = 30 * CV.MPH_TO_MS
|
||||
LANE_CHANGE_TIME_MAX = 10.
|
||||
|
||||
DESIRES = {
|
||||
LaneChangeDirection.none: {
|
||||
LaneChangeState.off: log.LateralPlan.Desire.none,
|
||||
LaneChangeState.preLaneChange: log.LateralPlan.Desire.none,
|
||||
LaneChangeState.laneChangeStarting: log.LateralPlan.Desire.none,
|
||||
LaneChangeState.laneChangeFinishing: log.LateralPlan.Desire.none,
|
||||
},
|
||||
LaneChangeDirection.left: {
|
||||
LaneChangeState.off: log.LateralPlan.Desire.none,
|
||||
LaneChangeState.preLaneChange: log.LateralPlan.Desire.none,
|
||||
LaneChangeState.laneChangeStarting: log.LateralPlan.Desire.laneChangeLeft,
|
||||
LaneChangeState.laneChangeFinishing: log.LateralPlan.Desire.laneChangeLeft,
|
||||
},
|
||||
LaneChangeDirection.right: {
|
||||
LaneChangeState.off: log.LateralPlan.Desire.none,
|
||||
LaneChangeState.preLaneChange: log.LateralPlan.Desire.none,
|
||||
LaneChangeState.laneChangeStarting: log.LateralPlan.Desire.laneChangeRight,
|
||||
LaneChangeState.laneChangeFinishing: log.LateralPlan.Desire.laneChangeRight,
|
||||
},
|
||||
}
|
||||
|
||||
|
||||
class LateralPlanner:
|
||||
def __init__(self, CP, use_lanelines=True, wide_camera=False):
|
||||
self.use_lanelines = use_lanelines
|
||||
self.LP = LanePlanner(wide_camera)
|
||||
self.DH = DesireHelper()
|
||||
|
||||
self.last_cloudlog_t = 0
|
||||
self.steer_rate_cost = CP.steerRateCost
|
||||
|
||||
self.solution_invalid_cnt = 0
|
||||
self.lane_change_state = LaneChangeState.off
|
||||
self.lane_change_direction = LaneChangeDirection.none
|
||||
self.lane_change_timer = 0.0
|
||||
self.lane_change_ll_prob = 1.0
|
||||
self.keep_pulse_timer = 0.0
|
||||
self.prev_one_blinker = False
|
||||
self.desire = log.LateralPlan.Desire.none
|
||||
|
||||
self.path_xyz = np.zeros((TRAJECTORY_SIZE, 3))
|
||||
self.path_xyz_stds = np.ones((TRAJECTORY_SIZE, 3))
|
||||
|
@ -69,11 +35,11 @@ class LateralPlanner:
|
|||
|
||||
def update(self, sm):
|
||||
v_ego = sm['carState'].vEgo
|
||||
active = sm['controlsState'].active
|
||||
measured_curvature = sm['controlsState'].curvature
|
||||
|
||||
# Parse model predictions
|
||||
md = sm['modelV2']
|
||||
self.LP.parse_model(sm['modelV2'])
|
||||
self.LP.parse_model(md)
|
||||
if len(md.position.x) == TRAJECTORY_SIZE and len(md.orientation.x) == TRAJECTORY_SIZE:
|
||||
self.path_xyz = np.column_stack([md.position.x, md.position.y, md.position.z])
|
||||
self.t_idxs = np.array(md.position.t)
|
||||
|
@ -82,84 +48,15 @@ class LateralPlanner:
|
|||
self.path_xyz_stds = np.column_stack([md.position.xStd, md.position.yStd, md.position.zStd])
|
||||
|
||||
# Lane change logic
|
||||
one_blinker = sm['carState'].leftBlinker != sm['carState'].rightBlinker
|
||||
below_lane_change_speed = v_ego < LANE_CHANGE_SPEED_MIN
|
||||
|
||||
if (not active) or (self.lane_change_timer > LANE_CHANGE_TIME_MAX):
|
||||
self.lane_change_state = LaneChangeState.off
|
||||
self.lane_change_direction = LaneChangeDirection.none
|
||||
else:
|
||||
# LaneChangeState.off
|
||||
if self.lane_change_state == LaneChangeState.off and one_blinker and not self.prev_one_blinker and not below_lane_change_speed:
|
||||
self.lane_change_state = LaneChangeState.preLaneChange
|
||||
self.lane_change_ll_prob = 1.0
|
||||
|
||||
# LaneChangeState.preLaneChange
|
||||
elif self.lane_change_state == LaneChangeState.preLaneChange:
|
||||
# Set lane change direction
|
||||
if sm['carState'].leftBlinker:
|
||||
self.lane_change_direction = LaneChangeDirection.left
|
||||
elif sm['carState'].rightBlinker:
|
||||
self.lane_change_direction = LaneChangeDirection.right
|
||||
else: # If there are no blinkers we will go back to LaneChangeState.off
|
||||
self.lane_change_direction = LaneChangeDirection.none
|
||||
|
||||
torque_applied = sm['carState'].steeringPressed and \
|
||||
((sm['carState'].steeringTorque > 0 and self.lane_change_direction == LaneChangeDirection.left) or
|
||||
(sm['carState'].steeringTorque < 0 and self.lane_change_direction == LaneChangeDirection.right))
|
||||
|
||||
blindspot_detected = ((sm['carState'].leftBlindspot and self.lane_change_direction == LaneChangeDirection.left) or
|
||||
(sm['carState'].rightBlindspot and self.lane_change_direction == LaneChangeDirection.right))
|
||||
|
||||
if not one_blinker or below_lane_change_speed:
|
||||
self.lane_change_state = LaneChangeState.off
|
||||
elif torque_applied and not blindspot_detected:
|
||||
self.lane_change_state = LaneChangeState.laneChangeStarting
|
||||
|
||||
# LaneChangeState.laneChangeStarting
|
||||
elif self.lane_change_state == LaneChangeState.laneChangeStarting:
|
||||
# fade out over .5s
|
||||
self.lane_change_ll_prob = max(self.lane_change_ll_prob - 2 * DT_MDL, 0.0)
|
||||
|
||||
# 98% certainty
|
||||
lane_change_prob = self.LP.l_lane_change_prob + self.LP.r_lane_change_prob
|
||||
if lane_change_prob < 0.02 and self.lane_change_ll_prob < 0.01:
|
||||
self.lane_change_state = LaneChangeState.laneChangeFinishing
|
||||
|
||||
# LaneChangeState.laneChangeFinishing
|
||||
elif self.lane_change_state == LaneChangeState.laneChangeFinishing:
|
||||
# fade in laneline over 1s
|
||||
self.lane_change_ll_prob = min(self.lane_change_ll_prob + DT_MDL, 1.0)
|
||||
if self.lane_change_ll_prob > 0.99:
|
||||
self.lane_change_direction = LaneChangeDirection.none
|
||||
if one_blinker:
|
||||
self.lane_change_state = LaneChangeState.preLaneChange
|
||||
else:
|
||||
self.lane_change_state = LaneChangeState.off
|
||||
|
||||
if self.lane_change_state in (LaneChangeState.off, LaneChangeState.preLaneChange):
|
||||
self.lane_change_timer = 0.0
|
||||
else:
|
||||
self.lane_change_timer += DT_MDL
|
||||
|
||||
self.prev_one_blinker = one_blinker
|
||||
|
||||
self.desire = DESIRES[self.lane_change_direction][self.lane_change_state]
|
||||
|
||||
# Send keep pulse once per second during LaneChangeStart.preLaneChange
|
||||
if self.lane_change_state in (LaneChangeState.off, LaneChangeState.laneChangeStarting):
|
||||
self.keep_pulse_timer = 0.0
|
||||
elif self.lane_change_state == LaneChangeState.preLaneChange:
|
||||
self.keep_pulse_timer += DT_MDL
|
||||
if self.keep_pulse_timer > 1.0:
|
||||
self.keep_pulse_timer = 0.0
|
||||
elif self.desire in (log.LateralPlan.Desire.keepLeft, log.LateralPlan.Desire.keepRight):
|
||||
self.desire = log.LateralPlan.Desire.none
|
||||
lane_change_prob = self.LP.l_lane_change_prob + self.LP.r_lane_change_prob
|
||||
self.DH.update(sm['carState'], sm['controlsState'].active, lane_change_prob)
|
||||
|
||||
# Turn off lanes during lane change
|
||||
if self.desire == log.LateralPlan.Desire.laneChangeRight or self.desire == log.LateralPlan.Desire.laneChangeLeft:
|
||||
self.LP.lll_prob *= self.lane_change_ll_prob
|
||||
self.LP.rll_prob *= self.lane_change_ll_prob
|
||||
if self.DH.desire == log.LateralPlan.Desire.laneChangeRight or self.DH.desire == log.LateralPlan.Desire.laneChangeLeft:
|
||||
self.LP.lll_prob *= self.DH.lane_change_ll_prob
|
||||
self.LP.rll_prob *= self.DH.lane_change_ll_prob
|
||||
|
||||
# Calculate final driving path and set MPC costs
|
||||
if self.use_lanelines:
|
||||
d_path_xyz = self.LP.get_d_path(v_ego, self.t_idxs, self.path_xyz)
|
||||
self.lat_mpc.set_weights(MPC_COST_LAT.PATH, MPC_COST_LAT.HEADING, self.steer_rate_cost)
|
||||
|
@ -169,6 +66,7 @@ class LateralPlanner:
|
|||
# Heading cost is useful at low speed, otherwise end of plan can be off-heading
|
||||
heading_cost = interp(v_ego, [5.0, 10.0], [MPC_COST_LAT.HEADING, 0.0])
|
||||
self.lat_mpc.set_weights(path_cost, heading_cost, self.steer_rate_cost)
|
||||
|
||||
y_pts = np.interp(v_ego * self.t_idxs[:LAT_MPC_N + 1], np.linalg.norm(d_path_xyz, axis=1), d_path_xyz[:, 1])
|
||||
heading_pts = np.interp(v_ego * self.t_idxs[:LAT_MPC_N + 1], np.linalg.norm(self.path_xyz, axis=1), self.plan_yaw)
|
||||
self.y_pts = y_pts
|
||||
|
@ -217,9 +115,9 @@ class LateralPlanner:
|
|||
lateralPlan.mpcSolutionValid = bool(plan_solution_valid)
|
||||
lateralPlan.solverExecutionTime = self.lat_mpc.solve_time
|
||||
|
||||
lateralPlan.desire = self.desire
|
||||
lateralPlan.desire = self.DH.desire
|
||||
lateralPlan.useLaneLines = self.use_lanelines
|
||||
lateralPlan.laneChangeState = self.lane_change_state
|
||||
lateralPlan.laneChangeDirection = self.lane_change_direction
|
||||
lateralPlan.laneChangeState = self.DH.lane_change_state
|
||||
lateralPlan.laneChangeDirection = self.DH.lane_change_direction
|
||||
|
||||
pm.send('lateralPlan', plan_send)
|
||||
|
|
Loading…
Reference in New Issue