calibrationd: add function typing (#24018)
parent
4a8fffd0f4
commit
9f1c663ae2
|
@ -8,8 +8,9 @@ and the image input into the neural network is not corrected for roll.
|
|||
|
||||
import gc
|
||||
import os
|
||||
import capnp
|
||||
import numpy as np
|
||||
from typing import NoReturn
|
||||
from typing import List, NoReturn, Optional
|
||||
|
||||
from cereal import log
|
||||
import cereal.messaging as messaging
|
||||
|
@ -46,11 +47,11 @@ class Calibration:
|
|||
INVALID = 2
|
||||
|
||||
|
||||
def is_calibration_valid(rpy):
|
||||
def is_calibration_valid(rpy: np.ndarray) -> bool:
|
||||
return (PITCH_LIMITS[0] < rpy[1] < PITCH_LIMITS[1]) and (YAW_LIMITS[0] < rpy[2] < YAW_LIMITS[1])
|
||||
|
||||
|
||||
def sanity_clip(rpy):
|
||||
def sanity_clip(rpy: np.ndarray) -> np.ndarray:
|
||||
if np.isnan(rpy).any():
|
||||
rpy = RPY_INIT
|
||||
return np.array([rpy[0],
|
||||
|
@ -58,8 +59,8 @@ def sanity_clip(rpy):
|
|||
np.clip(rpy[2], YAW_LIMITS[0] - .005, YAW_LIMITS[1] + .005)])
|
||||
|
||||
|
||||
class Calibrator():
|
||||
def __init__(self, param_put=False):
|
||||
class Calibrator:
|
||||
def __init__(self, param_put: bool = False):
|
||||
self.param_put = param_put
|
||||
|
||||
# Read saved calibration
|
||||
|
@ -80,7 +81,7 @@ class Calibrator():
|
|||
self.reset(rpy_init, valid_blocks)
|
||||
self.update_status()
|
||||
|
||||
def reset(self, rpy_init=RPY_INIT, valid_blocks=0, smooth_from=None):
|
||||
def reset(self, rpy_init: np.ndarray = RPY_INIT, valid_blocks: int = 0, smooth_from: Optional[np.ndarray] = None) -> None:
|
||||
if not np.isfinite(rpy_init).all():
|
||||
self.rpy = RPY_INIT.copy()
|
||||
else:
|
||||
|
@ -95,7 +96,7 @@ class Calibrator():
|
|||
|
||||
self.idx = 0
|
||||
self.block_idx = 0
|
||||
self.v_ego = 0
|
||||
self.v_ego = 0.0
|
||||
|
||||
if smooth_from is None:
|
||||
self.old_rpy = RPY_INIT
|
||||
|
@ -104,13 +105,13 @@ class Calibrator():
|
|||
self.old_rpy = smooth_from
|
||||
self.old_rpy_weight = 1.0
|
||||
|
||||
def get_valid_idxs(self):
|
||||
def get_valid_idxs(self) -> List[int]:
|
||||
# exclude current block_idx from validity window
|
||||
before_current = list(range(self.block_idx))
|
||||
after_current = list(range(min(self.valid_blocks, self.block_idx + 1), self.valid_blocks))
|
||||
return before_current + after_current
|
||||
|
||||
def update_status(self):
|
||||
def update_status(self) -> None:
|
||||
valid_idxs = self.get_valid_idxs()
|
||||
if valid_idxs:
|
||||
rpys = self.rpys[valid_idxs]
|
||||
|
@ -137,16 +138,16 @@ class Calibrator():
|
|||
if self.param_put and write_this_cycle:
|
||||
put_nonblocking("CalibrationParams", self.get_msg().to_bytes())
|
||||
|
||||
def handle_v_ego(self, v_ego):
|
||||
def handle_v_ego(self, v_ego: float) -> None:
|
||||
self.v_ego = v_ego
|
||||
|
||||
def get_smooth_rpy(self):
|
||||
def get_smooth_rpy(self) -> np.ndarray:
|
||||
if self.old_rpy_weight > 0:
|
||||
return self.old_rpy_weight * self.old_rpy + (1.0 - self.old_rpy_weight) * self.rpy
|
||||
else:
|
||||
return self.rpy
|
||||
|
||||
def handle_cam_odom(self, trans, rot, trans_std):
|
||||
def handle_cam_odom(self, trans: List[float], rot: List[float], trans_std: List[float]) -> Optional[np.ndarray]:
|
||||
self.old_rpy_weight = min(0.0, self.old_rpy_weight - 1/SMOOTH_CYCLES)
|
||||
|
||||
straight_and_fast = ((self.v_ego > MIN_SPEED_FILTER) and (trans[0] > MIN_SPEED_FILTER) and (abs(rot[2]) < MAX_YAW_RATE_FILTER))
|
||||
|
@ -176,7 +177,7 @@ class Calibrator():
|
|||
|
||||
return new_rpy
|
||||
|
||||
def get_msg(self):
|
||||
def get_msg(self) -> capnp.lib.capnp._DynamicStructBuilder:
|
||||
smooth_rpy = self.get_smooth_rpy()
|
||||
extrinsic_matrix = get_view_frame_from_road_frame(0, smooth_rpy[1], smooth_rpy[2], model_height)
|
||||
|
||||
|
@ -189,11 +190,11 @@ class Calibrator():
|
|||
msg.liveCalibration.rpyCalibSpread = self.calib_spread.tolist()
|
||||
return msg
|
||||
|
||||
def send_data(self, pm) -> None:
|
||||
def send_data(self, pm: messaging.PubMaster) -> None:
|
||||
pm.send('liveCalibration', self.get_msg())
|
||||
|
||||
|
||||
def calibrationd_thread(sm=None, pm=None) -> NoReturn:
|
||||
def calibrationd_thread(sm: Optional[messaging.SubMaster] = None, pm: Optional[messaging.PubMaster] = None) -> NoReturn:
|
||||
gc.disable()
|
||||
set_realtime_priority(1)
|
||||
|
||||
|
@ -223,7 +224,7 @@ def calibrationd_thread(sm=None, pm=None) -> NoReturn:
|
|||
calibrator.send_data(pm)
|
||||
|
||||
|
||||
def main(sm=None, pm=None) -> NoReturn:
|
||||
def main(sm: Optional[messaging.SubMaster] = None, pm: Optional[messaging.PubMaster] = None) -> NoReturn:
|
||||
calibrationd_thread(sm, pm)
|
||||
|
||||
|
||||
|
|
Loading…
Reference in New Issue