#!/usr/bin/env python3 import os import sys import signal import itertools import math from typing import NoReturn from struct import unpack_from, calcsize import cereal.messaging as messaging from cereal import log from selfdrive.swaglog import cloudlog from selfdrive.sensord.rawgps.modemdiag import ModemDiag, DIAG_LOG_F, setup_logs from selfdrive.sensord.rawgps.structs import dict_unpacker from selfdrive.sensord.rawgps.structs import gps_measurement_report, gps_measurement_report_sv from selfdrive.sensord.rawgps.structs import glonass_measurement_report, glonass_measurement_report_sv from selfdrive.sensord.rawgps.structs import LOG_GNSS_GPS_MEASUREMENT_REPORT, LOG_GNSS_GLONASS_MEASUREMENT_REPORT from selfdrive.sensord.rawgps.structs import position_report, LOG_GNSS_POSITION_REPORT miscStatusFields = { "multipathEstimateIsValid": 0, "directionIsValid": 1, } measurementStatusFields = { "subMillisecondIsValid": 0, "subBitTimeIsKnown": 1, "satelliteTimeIsKnown": 2, "bitEdgeConfirmedFromSignal": 3, "measuredVelocity": 4, "fineOrCoarseVelocity": 5, "lockPointValid": 6, "lockPointPositive": 7, "lastUpdateFromDifference": 9, "lastUpdateFromVelocityDifference": 10, "strongIndicationOfCrossCorelation": 11, "tentativeMeasurement": 12, "measurementNotUsable": 13, "sirCheckIsNeeded": 14, "probationMode": 15, "multipathIndicator": 24, "imdJammingIndicator": 25, "lteB13TxJammingIndicator": 26, "freshMeasurementIndicator": 27, } measurementStatusGPSFields = { "gpsRoundRobinRxDiversity": 18, "gpsRxDiversity": 19, "gpsLowBandwidthRxDiversityCombined": 20, "gpsHighBandwidthNu4": 21, "gpsHighBandwidthNu8": 22, "gpsHighBandwidthUniform": 23, } measurementStatusGlonassFields = { "glonassMeanderBitEdgeValid": 16, "glonassTimeMarkValid": 17 } def main() -> NoReturn: unpack_gps_meas, size_gps_meas = dict_unpacker(gps_measurement_report, True) unpack_gps_meas_sv, size_gps_meas_sv = dict_unpacker(gps_measurement_report_sv, True) unpack_glonass_meas, size_glonass_meas = dict_unpacker(glonass_measurement_report, True) unpack_glonass_meas_sv, size_glonass_meas_sv = dict_unpacker(glonass_measurement_report_sv, True) log_types = [ LOG_GNSS_GPS_MEASUREMENT_REPORT, LOG_GNSS_GLONASS_MEASUREMENT_REPORT, ] pub_types = ['qcomGnss'] if int(os.getenv("PUBLISH_EXTERNAL", "0")) == 1: unpack_position, _ = dict_unpacker(position_report) log_types.append(LOG_GNSS_POSITION_REPORT) pub_types.append("gpsLocationExternal") os.system("mmcli -m 0 --location-enable-gps-raw --location-enable-gps-nmea") diag = ModemDiag() def try_setup_logs(diag, log_types): for _ in range(5): try: setup_logs(diag, log_types) break except Exception: pass def disable_logs(sig, frame): os.system("mmcli -m 0 --location-disable-gps-raw --location-disable-gps-nmea") cloudlog.warning("rawgpsd: shutting down") try_setup_logs(diag, []) cloudlog.warning("rawgpsd: logs disabled") sys.exit(0) signal.signal(signal.SIGINT, disable_logs) try_setup_logs(diag, log_types) cloudlog.warning("rawgpsd: setup logs done") pm = messaging.PubMaster(pub_types) while 1: opcode, payload = diag.recv() assert opcode == DIAG_LOG_F (pending_msgs, log_outer_length), inner_log_packet = unpack_from(' 0: cloudlog.debug("have %d pending messages" % pending_msgs) assert log_outer_length == len(inner_log_packet) (log_inner_length, log_type, log_time), log_payload = unpack_from('