small calibration refactor + tests (#2641)
* calibration tests * read from capnp * save using cereal struct * reset calibration if new car * car paramsalbatross
parent
b9c14c92a7
commit
ee43eb552b
|
@ -7,16 +7,18 @@ and the image input into the neural network is not corrected for roll.
|
|||
'''
|
||||
|
||||
import os
|
||||
import capnp
|
||||
import copy
|
||||
import json
|
||||
import numpy as np
|
||||
import cereal.messaging as messaging
|
||||
from selfdrive.config import Conversions as CV
|
||||
from selfdrive.swaglog import cloudlog
|
||||
from cereal import car, log
|
||||
from common.params import Params, put_nonblocking
|
||||
from common.transformations.model import model_height
|
||||
from common.transformations.camera import get_view_frame_from_road_frame
|
||||
from common.transformations.orientation import rot_from_euler, euler_from_rot
|
||||
from selfdrive.config import Conversions as CV
|
||||
from selfdrive.swaglog import cloudlog
|
||||
|
||||
MIN_SPEED_FILTER = 15 * CV.MPH_TO_MS
|
||||
MAX_VEL_ANGLE_STD = np.radians(0.25)
|
||||
|
@ -59,18 +61,32 @@ class Calibrator():
|
|||
self.param_put = param_put
|
||||
|
||||
# Read saved calibration
|
||||
calibration_params = Params().get("CalibrationParams")
|
||||
params = Params()
|
||||
calibration_params = params.get("CalibrationParams")
|
||||
|
||||
rpy_init = RPY_INIT
|
||||
valid_blocks = 0
|
||||
|
||||
cached_params = params.get("CarParamsCache")
|
||||
if cached_params is not None:
|
||||
CP = car.CarParams.from_bytes(params.get("CarParams", block=True))
|
||||
cached_params = car.CarParams.from_bytes(cached_params)
|
||||
if cached_params.carFingerprint != CP.carFingerprint:
|
||||
calibration_params = None
|
||||
|
||||
if param_put and calibration_params:
|
||||
try:
|
||||
msg = log.Event.from_bytes(calibration_params)
|
||||
rpy_init = list(msg.liveCalibration.rpyCalib)
|
||||
valid_blocks = msg.liveCalibration.validBlocks
|
||||
except (ValueError, capnp.lib.capnp.KjException):
|
||||
# TODO: remove this after next release
|
||||
calibration_params = json.loads(calibration_params)
|
||||
rpy_init = calibration_params["calib_radians"]
|
||||
valid_blocks = calibration_params['valid_blocks']
|
||||
except Exception:
|
||||
cloudlog.exception("CalibrationParams file found but error encountered")
|
||||
|
||||
self.reset(rpy_init, valid_blocks)
|
||||
self.update_status()
|
||||
|
||||
|
@ -118,10 +134,7 @@ class Calibrator():
|
|||
|
||||
write_this_cycle = (self.idx == 0) and (self.block_idx % (INPUTS_WANTED//5) == 5)
|
||||
if self.param_put and write_this_cycle:
|
||||
# TODO: this should use the liveCalibration struct from cereal
|
||||
cal_params = {"calib_radians": list(self.rpy),
|
||||
"valid_blocks": int(self.valid_blocks)}
|
||||
put_nonblocking("CalibrationParams", json.dumps(cal_params).encode('utf8'))
|
||||
put_nonblocking("CalibrationParams", self.get_msg().to_bytes())
|
||||
|
||||
def handle_v_ego(self, v_ego):
|
||||
self.v_ego = v_ego
|
||||
|
@ -138,42 +151,44 @@ class Calibrator():
|
|||
straight_and_fast = ((self.v_ego > MIN_SPEED_FILTER) and (trans[0] > MIN_SPEED_FILTER) and (abs(rot[2]) < MAX_YAW_RATE_FILTER))
|
||||
certain_if_calib = ((np.arctan2(trans_std[1], trans[0]) < MAX_VEL_ANGLE_STD) or
|
||||
(self.valid_blocks < INPUTS_NEEDED))
|
||||
if straight_and_fast and certain_if_calib:
|
||||
observed_rpy = np.array([0,
|
||||
-np.arctan2(trans[2], trans[0]),
|
||||
np.arctan2(trans[1], trans[0])])
|
||||
new_rpy = euler_from_rot(rot_from_euler(self.get_smooth_rpy()).dot(rot_from_euler(observed_rpy)))
|
||||
new_rpy = sanity_clip(new_rpy)
|
||||
|
||||
self.rpys[self.block_idx] = (self.idx*self.rpys[self.block_idx] + (BLOCK_SIZE - self.idx) * new_rpy) / float(BLOCK_SIZE)
|
||||
self.idx = (self.idx + 1) % BLOCK_SIZE
|
||||
if self.idx == 0:
|
||||
self.block_idx += 1
|
||||
self.valid_blocks = max(self.block_idx, self.valid_blocks)
|
||||
self.block_idx = self.block_idx % INPUTS_WANTED
|
||||
if self.valid_blocks > 0:
|
||||
self.rpy = np.mean(self.rpys[:self.valid_blocks], axis=0)
|
||||
|
||||
|
||||
self.update_status()
|
||||
|
||||
return new_rpy
|
||||
else:
|
||||
if not (straight_and_fast and certain_if_calib):
|
||||
return None
|
||||
|
||||
def send_data(self, pm):
|
||||
observed_rpy = np.array([0,
|
||||
-np.arctan2(trans[2], trans[0]),
|
||||
np.arctan2(trans[1], trans[0])])
|
||||
new_rpy = euler_from_rot(rot_from_euler(self.get_smooth_rpy()).dot(rot_from_euler(observed_rpy)))
|
||||
new_rpy = sanity_clip(new_rpy)
|
||||
|
||||
self.rpys[self.block_idx] = (self.idx*self.rpys[self.block_idx] + (BLOCK_SIZE - self.idx) * new_rpy) / float(BLOCK_SIZE)
|
||||
self.idx = (self.idx + 1) % BLOCK_SIZE
|
||||
if self.idx == 0:
|
||||
self.block_idx += 1
|
||||
self.valid_blocks = max(self.block_idx, self.valid_blocks)
|
||||
self.block_idx = self.block_idx % INPUTS_WANTED
|
||||
if self.valid_blocks > 0:
|
||||
self.rpy = np.mean(self.rpys[:self.valid_blocks], axis=0)
|
||||
|
||||
self.update_status()
|
||||
|
||||
return new_rpy
|
||||
|
||||
def get_msg(self):
|
||||
smooth_rpy = self.get_smooth_rpy()
|
||||
extrinsic_matrix = get_view_frame_from_road_frame(0, smooth_rpy[1], smooth_rpy[2], model_height)
|
||||
|
||||
cal_send = messaging.new_message('liveCalibration')
|
||||
cal_send.liveCalibration.validBlocks = self.valid_blocks
|
||||
cal_send.liveCalibration.calStatus = self.cal_status
|
||||
cal_send.liveCalibration.calPerc = min(100 * (self.valid_blocks * BLOCK_SIZE + self.idx) // (INPUTS_NEEDED * BLOCK_SIZE), 100)
|
||||
cal_send.liveCalibration.extrinsicMatrix = [float(x) for x in extrinsic_matrix.flatten()]
|
||||
cal_send.liveCalibration.rpyCalib = [float(x) for x in smooth_rpy]
|
||||
cal_send.liveCalibration.rpyCalibSpread = [float(x) for x in self.calib_spread]
|
||||
msg = messaging.new_message('liveCalibration')
|
||||
msg.liveCalibration.validBlocks = self.valid_blocks
|
||||
msg.liveCalibration.calStatus = self.cal_status
|
||||
msg.liveCalibration.calPerc = min(100 * (self.valid_blocks * BLOCK_SIZE + self.idx) // (INPUTS_NEEDED * BLOCK_SIZE), 100)
|
||||
msg.liveCalibration.extrinsicMatrix = [float(x) for x in extrinsic_matrix.flatten()]
|
||||
msg.liveCalibration.rpyCalib = [float(x) for x in smooth_rpy]
|
||||
msg.liveCalibration.rpyCalibSpread = [float(x) for x in self.calib_spread]
|
||||
return msg
|
||||
|
||||
pm.send('liveCalibration', cal_send)
|
||||
def send_data(self, pm):
|
||||
pm.send('liveCalibration', self.get_msg())
|
||||
|
||||
|
||||
def calibrationd_thread(sm=None, pm=None):
|
||||
|
|
|
@ -0,0 +1,36 @@
|
|||
#!/usr/bin/env python3
|
||||
import json
|
||||
import random
|
||||
import unittest
|
||||
|
||||
import cereal.messaging as messaging
|
||||
from common.params import Params
|
||||
from selfdrive.locationd.calibrationd import Calibrator
|
||||
|
||||
|
||||
class TestCalibrationd(unittest.TestCase):
|
||||
|
||||
def test_read_saved_params_json(self):
|
||||
r = [random.random() for _ in range(3)]
|
||||
b = random.randint(1, 10)
|
||||
cal_params = {"calib_radians": r,
|
||||
"valid_blocks": b}
|
||||
Params().put("CalibrationParams", json.dumps(cal_params).encode('utf8'))
|
||||
c = Calibrator(param_put=True)
|
||||
|
||||
self.assertEqual(r, c.rpy)
|
||||
self.assertEqual(b, c.valid_blocks)
|
||||
|
||||
def test_read_saved_params(self):
|
||||
msg = messaging.new_message('liveCalibration')
|
||||
msg.liveCalibration.validBlocks = random.randint(1, 10)
|
||||
msg.liveCalibration.rpyCalib = [random.random() for _ in range(3)]
|
||||
Params().put("CalibrationParams", msg.to_bytes())
|
||||
c = Calibrator(param_put=True)
|
||||
|
||||
self.assertEqual(list(msg.liveCalibration.rpyCalib), c.rpy)
|
||||
self.assertEqual(msg.liveCalibration.validBlocks, c.valid_blocks)
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
unittest.main()
|
Loading…
Reference in New Issue