offroad alert for unregistered devices (#20870)
* offroad alert for unregistered devices * update tests * add param * fix stretchalbatross
parent
0400f05bd2
commit
b93ccc465d
|
@ -10,10 +10,14 @@ from common.params import Params
|
|||
from common.spinner import Spinner
|
||||
from common.file_helpers import mkdirs_exists_ok
|
||||
from common.basedir import PERSIST
|
||||
from selfdrive.hardware import HARDWARE, PC
|
||||
from selfdrive.controls.lib.alertmanager import set_offroad_alert
|
||||
from selfdrive.hardware import HARDWARE
|
||||
from selfdrive.swaglog import cloudlog
|
||||
|
||||
|
||||
UNREGISTERED_DONGLE_ID = "UnregisteredDevice"
|
||||
|
||||
|
||||
def register(show_spinner=False) -> str:
|
||||
params = Params()
|
||||
params.put("SubscriberInfo", HARDWARE.get_subscriber_info())
|
||||
|
@ -68,9 +72,7 @@ def register(show_spinner=False) -> str:
|
|||
|
||||
if resp.status_code in (402, 403):
|
||||
cloudlog.info(f"Unable to register device, got {resp.status_code}")
|
||||
dongle_id = None
|
||||
if PC:
|
||||
dongle_id = "UnofficialDevice"
|
||||
dongle_id = UNREGISTERED_DONGLE_ID
|
||||
else:
|
||||
dongleauth = json.loads(resp.text)
|
||||
dongle_id = dongleauth["dongle_id"]
|
||||
|
@ -85,6 +87,7 @@ def register(show_spinner=False) -> str:
|
|||
|
||||
if dongle_id:
|
||||
params.put("DongleId", dongle_id)
|
||||
set_offroad_alert("Offroad_UnofficialHardware", dongle_id == UNREGISTERED_DONGLE_ID)
|
||||
return dongle_id
|
||||
|
||||
|
||||
|
|
|
@ -8,7 +8,7 @@ from pathlib import Path
|
|||
from unittest import mock
|
||||
|
||||
from common.params import Params
|
||||
from selfdrive.athena.registration import register
|
||||
from selfdrive.athena.registration import register, UNREGISTERED_DONGLE_ID
|
||||
from selfdrive.athena.tests.helpers import MockResponse
|
||||
|
||||
|
||||
|
@ -64,24 +64,15 @@ class TestRegistration(unittest.TestCase):
|
|||
self.assertEqual(m.call_count, 1)
|
||||
self.assertEqual(self.params.get("DongleId", encoding='utf-8'), dongle)
|
||||
|
||||
def test_unregistered_pc(self):
|
||||
def test_unregistered(self):
|
||||
# no keys, no dongle id
|
||||
with mock.patch("selfdrive.athena.registration.api_get", autospec=True) as m, \
|
||||
mock.patch("selfdrive.athena.registration.PC", new=True):
|
||||
with mock.patch("selfdrive.athena.registration.api_get", autospec=True) as m:
|
||||
m.return_value = MockResponse(None, 402)
|
||||
dongle = register()
|
||||
self.assertGreater(len(dongle), 0)
|
||||
self.assertEqual(m.call_count, 1)
|
||||
self.assertEqual(dongle, UNREGISTERED_DONGLE_ID)
|
||||
self.assertEqual(self.params.get("DongleId", encoding='utf-8'), dongle)
|
||||
|
||||
def test_unregistered_non_pc(self):
|
||||
# no keys, no dongle id
|
||||
with mock.patch("selfdrive.athena.registration.api_get", autospec=True) as m, \
|
||||
mock.patch("selfdrive.athena.registration.PC", new=False):
|
||||
m.return_value = MockResponse(None, 402)
|
||||
self.assertIs(register(), None)
|
||||
self.assertEqual(m.call_count, 1)
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
unittest.main()
|
||||
|
|
|
@ -211,6 +211,7 @@ std::unordered_map<std::string, uint32_t> keys = {
|
|||
{"Offroad_NeosUpdate", CLEAR_ON_MANAGER_START},
|
||||
{"Offroad_UpdateFailed", CLEAR_ON_MANAGER_START},
|
||||
{"Offroad_HardwareUnsupported", CLEAR_ON_MANAGER_START},
|
||||
{"Offroad_UnofficialHardware", CLEAR_ON_MANAGER_START},
|
||||
{"ForcePowerDown", CLEAR_ON_MANAGER_START},
|
||||
};
|
||||
|
||||
|
|
|
@ -40,5 +40,9 @@
|
|||
"Offroad_HardwareUnsupported": {
|
||||
"text": "White and grey panda are unsupported. Upgrade to comma two or black panda.",
|
||||
"severity": 0
|
||||
},
|
||||
"Offroad_UnofficialHardware": {
|
||||
"text": "Device failed to register. It will not connect to or upload to comma.ai servers, and receives no support from comma.ai. If this is an official device, contact support@comma.ai.",
|
||||
"severity": 1
|
||||
}
|
||||
}
|
||||
|
|
|
@ -16,7 +16,7 @@ from selfdrive.hardware import HARDWARE, PC, TICI
|
|||
from selfdrive.manager.helpers import unblock_stdout
|
||||
from selfdrive.manager.process import ensure_running
|
||||
from selfdrive.manager.process_config import managed_processes
|
||||
from selfdrive.athena.registration import register
|
||||
from selfdrive.athena.registration import register, UNREGISTERED_DONGLE_ID
|
||||
from selfdrive.swaglog import cloudlog, add_file_handler
|
||||
from selfdrive.version import dirty, get_git_commit, version, origin, branch, commit, \
|
||||
terms_version, training_version, comma_remote, \
|
||||
|
@ -117,6 +117,8 @@ def manager_thread():
|
|||
params = Params()
|
||||
|
||||
ignore = []
|
||||
if params.get("DongleId") == UNREGISTERED_DONGLE_ID:
|
||||
ignore += ["manage_athenad", "uploader"]
|
||||
if os.getenv("NOBOARD") is not None:
|
||||
ignore.append("pandad")
|
||||
if os.getenv("BLOCK") is not None:
|
||||
|
|
|
@ -17,7 +17,6 @@ OffroadAlert::OffroadAlert(QWidget* parent) : QFrame(parent) {
|
|||
alerts_layout = new QVBoxLayout;
|
||||
alerts_layout->setMargin(0);
|
||||
alerts_layout->setSpacing(30);
|
||||
alerts_layout->addStretch(1);
|
||||
alerts_widget->setLayout(alerts_layout);
|
||||
alerts_widget->setStyleSheet("background-color: transparent;");
|
||||
|
||||
|
|
Loading…
Reference in New Issue