#!/usr/bin/env python3 import json import os import requests import tempfile import time import threading import queue import unittest from multiprocessing import Process from pathlib import Path from unittest import mock from websocket import ABNF from websocket._exceptions import WebSocketConnectionClosedException from selfdrive.athena import athenad from selfdrive.athena.athenad import dispatcher from selfdrive.athena.tests.helpers import MockWebsocket, MockParams, MockApi, EchoSocket, with_http_server from cereal import messaging class TestAthenadMethods(unittest.TestCase): @classmethod def setUpClass(cls): cls.SOCKET_PORT = 45454 athenad.ROOT = tempfile.mkdtemp() athenad.Params = MockParams athenad.Api = MockApi athenad.LOCAL_PORT_WHITELIST = set([cls.SOCKET_PORT]) def test_echo(self): assert dispatcher["echo"]("bob") == "bob" def test_getMessage(self): with self.assertRaises(TimeoutError) as _: dispatcher["getMessage"]("controlsState") def send_deviceState(): messaging.context = messaging.Context() pub_sock = messaging.pub_sock("deviceState") start = time.time() while time.time() - start < 1: msg = messaging.new_message('deviceState') pub_sock.send(msg.to_bytes()) time.sleep(0.01) p = Process(target=send_deviceState) p.start() time.sleep(0.1) try: deviceState = dispatcher["getMessage"]("deviceState") assert deviceState['deviceState'] finally: p.terminate() def test_listDataDirectory(self): print(dispatcher["listDataDirectory"]()) @with_http_server def test_do_upload(self, host): fn = os.path.join(athenad.ROOT, 'qlog.bz2') Path(fn).touch() try: item = athenad.UploadItem(path=fn, url="http://localhost:1238", headers={}, created_at=int(time.time()*1000), id='') with self.assertRaises(requests.exceptions.ConnectionError): athenad._do_upload(item) item = athenad.UploadItem(path=fn, url=f"{host}/qlog.bz2", headers={}, created_at=int(time.time()*1000), id='') resp = athenad._do_upload(item) self.assertEqual(resp.status_code, 201) finally: os.unlink(fn) @with_http_server def test_uploadFileToUrl(self, host): not_exists_resp = dispatcher["uploadFileToUrl"]("does_not_exist.bz2", "http://localhost:1238", {}) self.assertEqual(not_exists_resp, 404) fn = os.path.join(athenad.ROOT, 'qlog.bz2') Path(fn).touch() try: resp = dispatcher["uploadFileToUrl"]("qlog.bz2", f"{host}/qlog.bz2", {}) self.assertEqual(resp['enqueued'], 1) self.assertDictContainsSubset({"path": fn, "url": f"{host}/qlog.bz2", "headers": {}}, resp['item']) self.assertIsNotNone(resp['item'].get('id')) self.assertEqual(athenad.upload_queue.qsize(), 1) finally: athenad.upload_queue = queue.Queue() os.unlink(fn) @with_http_server def test_upload_handler(self, host): fn = os.path.join(athenad.ROOT, 'qlog.bz2') Path(fn).touch() item = athenad.UploadItem(path=fn, url=f"{host}/qlog.bz2", headers={}, created_at=int(time.time()*1000), id='') end_event = threading.Event() thread = threading.Thread(target=athenad.upload_handler, args=(end_event,)) thread.start() athenad.upload_queue.put_nowait(item) try: time.sleep(1) # give it time to process to prevent shutdown before upload completes now = time.time() while time.time() - now < 5: if athenad.upload_queue.qsize() == 0: break self.assertEqual(athenad.upload_queue.qsize(), 0) finally: end_event.set() athenad.upload_queue = queue.Queue() os.unlink(fn) def test_cancelUpload(self): item = athenad.UploadItem(path="qlog.bz2", url="http://localhost:44444/qlog.bz2", headers={}, created_at=int(time.time()*1000), id='id') athenad.upload_queue.put_nowait(item) dispatcher["cancelUpload"](item.id) self.assertIn(item.id, athenad.cancelled_uploads) end_event = threading.Event() thread = threading.Thread(target=athenad.upload_handler, args=(end_event,)) thread.start() try: now = time.time() while time.time() - now < 5: if athenad.upload_queue.qsize() == 0 and len(athenad.cancelled_uploads) == 0: break self.assertEqual(athenad.upload_queue.qsize(), 0) self.assertEqual(len(athenad.cancelled_uploads), 0) finally: end_event.set() athenad.upload_queue = queue.Queue() def test_listUploadQueue(self): item = athenad.UploadItem(path="qlog.bz2", url="http://localhost:44444/qlog.bz2", headers={}, created_at=int(time.time()*1000), id='id') athenad.upload_queue.put_nowait(item) try: items = dispatcher["listUploadQueue"]() self.assertEqual(len(items), 1) self.assertDictEqual(items[0], item._asdict()) finally: athenad.upload_queue = queue.Queue() @mock.patch('selfdrive.athena.athenad.create_connection') def test_startLocalProxy(self, mock_create_connection): end_event = threading.Event() ws_recv = queue.Queue() ws_send = queue.Queue() mock_ws = MockWebsocket(ws_recv, ws_send) mock_create_connection.return_value = mock_ws echo_socket = EchoSocket(self.SOCKET_PORT) socket_thread = threading.Thread(target=echo_socket.run) socket_thread.start() athenad.startLocalProxy(end_event, 'ws://localhost:1234', self.SOCKET_PORT) ws_recv.put_nowait(b'ping') try: recv = ws_send.get(timeout=5) assert recv == (b'ping', ABNF.OPCODE_BINARY), recv finally: # signal websocket close to athenad.ws_proxy_recv ws_recv.put_nowait(WebSocketConnectionClosedException()) socket_thread.join() def test_getSshAuthorizedKeys(self): keys = dispatcher["getSshAuthorizedKeys"]() self.assertEqual(keys, MockParams().params["GithubSshKeys"].decode('utf-8')) def test_jsonrpc_handler(self): end_event = threading.Event() thread = threading.Thread(target=athenad.jsonrpc_handler, args=(end_event,)) thread.daemon = True thread.start() try: # with params athenad.recv_queue.put_nowait(json.dumps({"method": "echo", "params": ["hello"], "jsonrpc": "2.0", "id": 0})) resp = athenad.send_queue.get(timeout=3) self.assertDictEqual(json.loads(resp), {'result': 'hello', 'id': 0, 'jsonrpc': '2.0'}) # without params athenad.recv_queue.put_nowait(json.dumps({"method": "getNetworkType", "jsonrpc": "2.0", "id": 0})) resp = athenad.send_queue.get(timeout=3) self.assertDictEqual(json.loads(resp), {'result': 1, 'id': 0, 'jsonrpc': '2.0'}) # log forwarding athenad.recv_queue.put_nowait(json.dumps({'result': {'success': 1}, 'id': 0, 'jsonrpc': '2.0'})) resp = athenad.log_recv_queue.get(timeout=3) self.assertDictEqual(json.loads(resp), {'result': {'success': 1}, 'id': 0, 'jsonrpc': '2.0'}) finally: end_event.set() thread.join() if __name__ == '__main__': unittest.main()