diff --git a/selfdrive/locationd/calibrationd.py b/selfdrive/locationd/calibrationd.py index bf6fe26ec..174100517 100755 --- a/selfdrive/locationd/calibrationd.py +++ b/selfdrive/locationd/calibrationd.py @@ -8,8 +8,9 @@ and the image input into the neural network is not corrected for roll. import gc import os +import capnp import numpy as np -from typing import NoReturn +from typing import List, NoReturn, Optional from cereal import log import cereal.messaging as messaging @@ -46,11 +47,11 @@ class Calibration: INVALID = 2 -def is_calibration_valid(rpy): +def is_calibration_valid(rpy: np.ndarray) -> bool: return (PITCH_LIMITS[0] < rpy[1] < PITCH_LIMITS[1]) and (YAW_LIMITS[0] < rpy[2] < YAW_LIMITS[1]) -def sanity_clip(rpy): +def sanity_clip(rpy: np.ndarray) -> np.ndarray: if np.isnan(rpy).any(): rpy = RPY_INIT return np.array([rpy[0], @@ -58,8 +59,8 @@ def sanity_clip(rpy): np.clip(rpy[2], YAW_LIMITS[0] - .005, YAW_LIMITS[1] + .005)]) -class Calibrator(): - def __init__(self, param_put=False): +class Calibrator: + def __init__(self, param_put: bool = False): self.param_put = param_put # Read saved calibration @@ -80,7 +81,7 @@ class Calibrator(): self.reset(rpy_init, valid_blocks) self.update_status() - def reset(self, rpy_init=RPY_INIT, valid_blocks=0, smooth_from=None): + def reset(self, rpy_init: np.ndarray = RPY_INIT, valid_blocks: int = 0, smooth_from: Optional[np.ndarray] = None) -> None: if not np.isfinite(rpy_init).all(): self.rpy = RPY_INIT.copy() else: @@ -95,7 +96,7 @@ class Calibrator(): self.idx = 0 self.block_idx = 0 - self.v_ego = 0 + self.v_ego = 0.0 if smooth_from is None: self.old_rpy = RPY_INIT @@ -104,13 +105,13 @@ class Calibrator(): self.old_rpy = smooth_from self.old_rpy_weight = 1.0 - def get_valid_idxs(self): + def get_valid_idxs(self) -> List[int]: # exclude current block_idx from validity window before_current = list(range(self.block_idx)) after_current = list(range(min(self.valid_blocks, self.block_idx + 1), self.valid_blocks)) return before_current + after_current - def update_status(self): + def update_status(self) -> None: valid_idxs = self.get_valid_idxs() if valid_idxs: rpys = self.rpys[valid_idxs] @@ -137,16 +138,16 @@ class Calibrator(): if self.param_put and write_this_cycle: put_nonblocking("CalibrationParams", self.get_msg().to_bytes()) - def handle_v_ego(self, v_ego): + def handle_v_ego(self, v_ego: float) -> None: self.v_ego = v_ego - def get_smooth_rpy(self): + def get_smooth_rpy(self) -> np.ndarray: if self.old_rpy_weight > 0: return self.old_rpy_weight * self.old_rpy + (1.0 - self.old_rpy_weight) * self.rpy else: return self.rpy - def handle_cam_odom(self, trans, rot, trans_std): + def handle_cam_odom(self, trans: List[float], rot: List[float], trans_std: List[float]) -> Optional[np.ndarray]: self.old_rpy_weight = min(0.0, self.old_rpy_weight - 1/SMOOTH_CYCLES) straight_and_fast = ((self.v_ego > MIN_SPEED_FILTER) and (trans[0] > MIN_SPEED_FILTER) and (abs(rot[2]) < MAX_YAW_RATE_FILTER)) @@ -176,7 +177,7 @@ class Calibrator(): return new_rpy - def get_msg(self): + def get_msg(self) -> capnp.lib.capnp._DynamicStructBuilder: smooth_rpy = self.get_smooth_rpy() extrinsic_matrix = get_view_frame_from_road_frame(0, smooth_rpy[1], smooth_rpy[2], model_height) @@ -189,11 +190,11 @@ class Calibrator(): msg.liveCalibration.rpyCalibSpread = self.calib_spread.tolist() return msg - def send_data(self, pm) -> None: + def send_data(self, pm: messaging.PubMaster) -> None: pm.send('liveCalibration', self.get_msg()) -def calibrationd_thread(sm=None, pm=None) -> NoReturn: +def calibrationd_thread(sm: Optional[messaging.SubMaster] = None, pm: Optional[messaging.PubMaster] = None) -> NoReturn: gc.disable() set_realtime_priority(1) @@ -223,7 +224,7 @@ def calibrationd_thread(sm=None, pm=None) -> NoReturn: calibrator.send_data(pm) -def main(sm=None, pm=None) -> NoReturn: +def main(sm: Optional[messaging.SubMaster] = None, pm: Optional[messaging.PubMaster] = None) -> NoReturn: calibrationd_thread(sm, pm)