athena: version endpoint (#21045)

* athena: version endpoint

* get_version function

* more explicit version validation
pull/21053/head
Greg Hogan 2021-05-26 18:33:27 -07:00 committed by GitHub
parent a844c60f14
commit ea0f7e2797
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 24 additions and 3 deletions

View File

@ -30,7 +30,7 @@ from selfdrive.loggerd.config import ROOT
from selfdrive.loggerd.xattr_cache import getxattr, setxattr
from selfdrive.swaglog import cloudlog, SWAGLOG_DIR
import selfdrive.crash as crash
from selfdrive.version import dirty, origin, branch, commit
from selfdrive.version import dirty, origin, branch, commit, get_version, get_git_remote, get_git_branch, get_git_commit
ATHENA_HOST = os.getenv('ATHENA_HOST', 'wss://athena.comma.ai')
HANDLER_THREADS = int(os.getenv('HANDLER_THREADS', "4"))
@ -132,6 +132,16 @@ def getMessage(service=None, timeout=1000):
return ret.to_dict()
@dispatcher.add_method
def getVersion():
return {
"version": get_version(),
"remote": get_git_remote(),
"branch": get_git_branch(),
"commit": get_git_commit(),
}
@dispatcher.add_method
def setNavDestination(latitude=0, longitude=0):
destination = {

View File

@ -174,6 +174,14 @@ class TestAthenadMethods(unittest.TestCase):
keys = dispatcher["getSshAuthorizedKeys"]()
self.assertEqual(keys, MockParams().params["GithubSshKeys"].decode('utf-8'))
def test_getVersion(self):
resp = dispatcher["getVersion"]()
keys = ["version", "remote", "branch", "commit"]
self.assertEqual(list(resp.keys()), keys)
for k in keys:
self.assertIsInstance(resp[k], str, f"{k} is not a string")
self.assertTrue(len(resp[k]) > 0, f"{k} has no value")
def test_jsonrpc_handler(self):
end_event = threading.Event()
thread = threading.Thread(target=athenad.jsonrpc_handler, args=(end_event,))

View File

@ -39,9 +39,12 @@ def get_git_remote(default: Optional[str] = None) -> Optional[str]:
return run_cmd_default(["git", "config", "--get", "remote.origin.url"], default=default)
with open(os.path.join(os.path.dirname(os.path.abspath(__file__)), "common", "version.h")) as _versionf:
version = _versionf.read().split('"')[1]
def get_version():
with open(os.path.join(os.path.dirname(os.path.abspath(__file__)), "common", "version.h")) as _versionf:
version = _versionf.read().split('"')[1]
return version
version = get_version()
prebuilt = os.path.exists(os.path.join(BASEDIR, 'prebuilt'))
training_version: bytes = b"0.2.0"